Skip to content

Commit

Permalink
Don't force flush if write buffer isn't empty
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini authored and Jarema committed Apr 29, 2024
1 parent 8c92e8e commit 6406aca
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
21 changes: 19 additions & 2 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ pub enum State {
Disconnected,
}

#[derive(Debug, Eq, PartialEq, Clone)]
pub enum ShouldFlush {
/// Write buffers are empty, but the connection hasn't been flushed yet
Yes,
/// The connection hasn't been flushed yet, but write buffers aren't empty
May,
/// Flushing would just be a no-op
No,
}

impl Display for State {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -92,8 +102,15 @@ impl Connection {
}

/// Returns `true` if [`Self::poll_flush`] should be polled.
pub(crate) fn should_flush(&self) -> bool {
self.can_flush && self.write_buf.is_empty() && self.flattened_writes.is_empty()
pub(crate) fn should_flush(&self) -> ShouldFlush {
match (
self.can_flush,
self.write_buf.is_empty() && self.flattened_writes.is_empty(),
) {
(true, true) => ShouldFlush::Yes,
(true, false) => ShouldFlush::May,
(false, _) => ShouldFlush::No,
}
}

/// Attempts to read a server operation from the read buffer.
Expand Down
12 changes: 6 additions & 6 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ pub(crate) struct ConnectionHandler {
pending_pings: usize,
info_sender: tokio::sync::watch::Sender<ServerInfo>,
ping_interval: Interval,
is_flushing: bool,
should_reconnect: bool,
flush_observers: Vec<oneshot::Sender<()>>,
}
Expand All @@ -439,7 +438,6 @@ impl ConnectionHandler {
pending_pings: 0,
info_sender,
ping_interval,
is_flushing: false,
should_reconnect: false,
flush_observers: Vec::new(),
}
Expand Down Expand Up @@ -579,12 +577,13 @@ impl ConnectionHandler {
}
}

if self.handler.is_flushing || self.handler.connection.should_flush() {
if let (ShouldFlush::Yes, _) | (ShouldFlush::No, false) = (
self.handler.connection.should_flush(),
self.handler.flush_observers.is_empty(),
) {
match self.handler.connection.poll_flush(cx) {
Poll::Pending => {}
Poll::Ready(Ok(())) => {
self.handler.is_flushing = false;

for observer in self.handler.flush_observers.drain(..) {
let _ = observer.send(());
}
Expand Down Expand Up @@ -754,7 +753,6 @@ impl ConnectionHandler {
}
}
Command::Flush { observer } => {
self.is_flushing = true;
self.flush_observers.push(observer);
}
Command::Subscribe {
Expand Down Expand Up @@ -1569,6 +1567,8 @@ macro_rules! from_with_timeout {
}
pub(crate) use from_with_timeout;

use crate::connection::ShouldFlush;

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 6406aca

Please sign in to comment.