From 00f1d567e561ec16bfb9c8e04fde23f4056f1358 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 2 Feb 2024 14:07:30 +0100 Subject: [PATCH] Use poll_recv_many for receiving messages to publish to the connection --- async-nats/Cargo.toml | 2 +- async-nats/src/lib.rs | 24 ++++++++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index b1f10af40..567a09208 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -23,7 +23,7 @@ regex = "1.9.1" serde = { version = "1.0.184", features = ["derive"] } serde_json = "1.0.104" serde_repr = "0.1.16" -tokio = { version = "1.29.0", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] } +tokio = { version = "1.36", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] } url = { version = "2"} tokio-rustls = "0.25" rustls-pemfile = "2" diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index e76d13712..58d5ff2d9 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -443,6 +443,7 @@ impl ConnectionHandler { struct ProcessFut<'a> { handler: &'a mut ConnectionHandler, receiver: &'a mut mpsc::Receiver, + recv_buf: &'a mut Vec, } enum ExitReason { @@ -451,6 +452,8 @@ impl ConnectionHandler { } impl<'a> ProcessFut<'a> { + const RECV_CHUNK_SIZE: usize = 16; + #[cold] fn ping(&mut self) -> Poll { self.handler.pending_pings += 1; @@ -519,13 +522,24 @@ impl ConnectionHandler { let mut made_progress = true; loop { while !self.handler.connection.is_write_buf_full() { - match self.receiver.poll_recv(cx) { + debug_assert!(self.recv_buf.is_empty()); + + let Self { + recv_buf, + handler, + receiver, + } = &mut *self; + match receiver.poll_recv_many(cx, recv_buf, Self::RECV_CHUNK_SIZE) { Poll::Pending => break, - Poll::Ready(Some(cmd)) => { + Poll::Ready(1..) => { made_progress = true; - self.handler.handle_command(cmd); + + for cmd in recv_buf.drain(..) { + handler.handle_command(cmd); + } } - Poll::Ready(None) => return Poll::Ready(ExitReason::Closed), + // TODO: replace `_` with `0` after bumping MSRV to 1.75 + Poll::Ready(_) => return Poll::Ready(ExitReason::Closed), } } @@ -578,10 +592,12 @@ impl ConnectionHandler { } } + let mut recv_buf = Vec::with_capacity(ProcessFut::RECV_CHUNK_SIZE); loop { let process = ProcessFut { handler: self, receiver, + recv_buf: &mut recv_buf, }; match process.await { ExitReason::Disconnected(err) => {