Skip to content

Commit

Permalink
Move idle Notify handle into endpoint Shared
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith authored and djc committed Oct 23, 2022
1 parent 7d51d2e commit 00fb347
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions quinn/src/endpoint.rs
Expand Up @@ -266,21 +266,13 @@ impl Endpoint {
/// [`close()`]: Endpoint::close
pub async fn wait_idle(&self) {
loop {
let idle;
{
let endpoint = &mut *self.inner.state.lock().unwrap();
if endpoint.connections.is_empty() {
break;
}
// Clone the `Arc<Notify>` so we can wait on the underlying `Notify` without holding
// the lock. Store it in the outer scope to ensure it outlives the lock guard.
idle = endpoint.idle.clone();
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
idle.notified()
}
.await;
self.inner.shared.idle.notified().await;
}
}
}
Expand Down Expand Up @@ -312,7 +304,7 @@ impl Future for EndpointDriver {
let now = Instant::now();
let mut keep_going = false;
keep_going |= endpoint.drive_recv(cx, now)?;
keep_going |= endpoint.handle_events(cx);
keep_going |= endpoint.handle_events(cx, &self.0.shared);
keep_going |= endpoint.drive_send(cx)?;

if !endpoint.incoming.is_empty() {
Expand Down Expand Up @@ -368,13 +360,13 @@ pub(crate) struct State {
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
idle: Arc<Notify>,
runtime: Arc<dyn Runtime>,
}

#[derive(Debug)]
pub(crate) struct Shared {
incoming: Notify,
idle: Notify,
}

impl State {
Expand Down Expand Up @@ -491,7 +483,7 @@ impl State {
result
}

fn handle_events(&mut self, cx: &mut Context) -> bool {
fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {
use EndpointEvent::*;

for _ in 0..IO_LOOP_BOUND {
Expand All @@ -501,7 +493,7 @@ impl State {
if e.is_drained() {
self.connections.senders.remove(&ch);
if self.connections.is_empty() {
self.idle.notify_waiters();
shared.idle.notify_waiters();
}
}
if let Some(event) = self.inner.handle_event(ch, e) {
Expand Down Expand Up @@ -626,6 +618,7 @@ impl EndpointRef {
Self(Arc::new(EndpointInner {
shared: Shared {
incoming: Notify::new(),
idle: Notify::new(),
},
state: Mutex::new(State {
socket,
Expand All @@ -646,7 +639,6 @@ impl EndpointRef {
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
idle: Arc::new(Notify::new()),
runtime,
}),
}))
Expand Down

0 comments on commit 00fb347

Please sign in to comment.