diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 549040ff1c..3a0b5b6c62 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -23,7 +23,8 @@ use crate::{ builders::EndpointBuilder, connection::Connecting, platform::{RecvMeta, UdpSocket, BATCH_SIZE}, - ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, + work_limiter::WorkLimiter, + ConnectionEvent, EndpointEvent, VarInt, IO_LOOP_BOUND, RECV_TIME_BOUND, }; /// A QUIC endpoint. @@ -265,6 +266,7 @@ where /// Number of live handles that can be used to initiate or handle I/O; excludes the driver ref_count: usize, driver_lost: bool, + recv_limiter: WorkLimiter, recv_buf: Box<[u8]>, idle: Broadcast, } @@ -274,7 +276,7 @@ where S: proto::crypto::Session + 'static, { fn drive_recv<'a>(&'a mut self, cx: &mut Context, now: Instant) -> Result { - let mut recvd = 0; + self.recv_limiter.start_cycle(); let mut metas = [RecvMeta::default(); BATCH_SIZE]; let mut iovs = MaybeUninit::<[IoSliceMut<'a>; BATCH_SIZE]>::uninit(); self.recv_buf @@ -290,7 +292,7 @@ where loop { match self.socket.poll_recv(cx, &mut iovs, &mut metas) { Poll::Ready(Ok(msgs)) => { - recvd += msgs; + self.recv_limiter.record_work(msgs); for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) { let data = buf[0..meta.len].into(); match self @@ -326,10 +328,13 @@ where return Err(e); } } - if recvd >= IO_LOOP_BOUND { + if !self.recv_limiter.allow_work() { + self.recv_limiter.finish_cycle(); return Ok(true); } } + + self.recv_limiter.finish_cycle(); Ok(false) } @@ -518,6 +523,7 @@ where ref_count: 0, driver_lost: false, recv_buf: recv_buf.into(), + recv_limiter: WorkLimiter::new(RECV_TIME_BOUND), idle: Broadcast::new(), }))) } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 044115e52b..7e676df044 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -50,6 +50,8 @@ let (endpoint, _) = builder.bind(&\"[::]:0\".parse().unwrap()).unwrap(); //! encryption alone. #![warn(missing_docs)] +use std::time::Duration; + mod broadcast; mod builders; mod connection; @@ -161,3 +163,11 @@ enum EndpointEvent { /// This helps ensure we don't starve anything when the CPU is slower than the link. /// Value is selected by picking a low number which didn't degrade throughput in benchmarks. const IO_LOOP_BOUND: usize = 160; + +/// The maximum amount of time that should be spent in `recvmsg()` calls per endpoint iteration +/// +/// 50us are chosen so that an endpoint iteration with a 50us sendmsg limit blocks +/// the runtime for a maximum of about 100us. +/// Going much lower does not yield any noticeable difference, since a single `recvmmsg` +/// batch of size 32 was observed to take 30us on some systems. +const RECV_TIME_BOUND: Duration = Duration::from_micros(50);