Skip to content

Commit

Permalink
Simplify Sink polling logic.
Browse files Browse the repository at this point in the history
This removes unnecessary `match`es and `map` from the code, and moves the usage of `poll_recover` into `poll_flush`, so as not to block new requests while trying to recover a connection.
  • Loading branch information
nihohit committed Mar 7, 2024
1 parent afd0e0d commit 97b5be1
Showing 1 changed file with 51 additions and 95 deletions.
146 changes: 51 additions & 95 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,42 +1039,30 @@ where
Ok((addr, conn))
}

fn poll_recover(
&mut self,
cx: &mut task::Context<'_>,
future: RecoverFuture,
) -> Poll<Result<(), RedisError>> {
match future {
RecoverFuture::RecoverSlots(mut future) => match future.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => {
fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
let recover_future = match &mut self.state {
ConnectionState::PollComplete => return Poll::Ready(Ok(())),
ConnectionState::Recover(future) => future,
};
match recover_future {
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
Ok(_) => {
trace!("Recovered!");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
}
Poll::Pending => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(future));
trace!("Recover not ready");
Poll::Pending
}
Poll::Ready(Err(err)) => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
Self::refresh_slots(self.inner.clone()),
)));
Err(err) => {
trace!("Recover slots failed!");
*future = Box::pin(Self::refresh_slots(self.inner.clone()));
Poll::Ready(Err(err))
}
},
RecoverFuture::Reconnect(mut future) => match future.as_mut().poll(cx) {
Poll::Ready(_) => {
trace!("Reconnected connections");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
}
Poll::Pending => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(future));
trace!("Recover not ready");
Poll::Pending
}
},
RecoverFuture::Reconnect(ref mut future) => {
ready!(future.as_mut().poll(cx));
trace!("Reconnected connections");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
}
}
}

Expand Down Expand Up @@ -1235,32 +1223,8 @@ where
{
type Error = ();

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<(), Self::Error>> {
match mem::replace(&mut self.state, ConnectionState::PollComplete) {
ConnectionState::PollComplete => Poll::Ready(Ok(())),
ConnectionState::Recover(future) => {
match ready!(self.as_mut().poll_recover(cx, future)) {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => {
// We failed to reconnect, while we will try again we will report the
// error if we can to avoid getting trapped in an infinite loop of
// trying to reconnect
if let Some(mut request) = Pin::new(&mut self.in_flight_requests)
.iter_pin_mut()
.find(|request| request.request.is_some())
{
(*request).as_mut().respond(Err(err));
} else {
self.refresh_error = Some(err);
}
Poll::Ready(Ok(()))
}
}
}
}
fn poll_ready(self: Pin<&mut Self>, _cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn start_send(self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
Expand All @@ -1285,46 +1249,40 @@ where
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<(), Self::Error>> {
trace!("poll_complete: {:?}", self.state);
trace!("poll_flush: {:?}", self.state);
loop {
self.send_refresh_error();

match mem::replace(&mut self.state, ConnectionState::PollComplete) {
ConnectionState::Recover(future) => {
match ready!(self.as_mut().poll_recover(cx, future)) {
Ok(()) => (),
Err(err) => {
// We failed to reconnect, while we will try again we will report the
// error if we can to avoid getting trapped in an infinite loop of
// trying to reconnect
self.refresh_error = Some(err);

// Give other tasks a chance to progress before we try to recover
// again. Since the future may not have registered a wake up we do so
// now so the task is not forgotten
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
if let Err(err) = ready!(self.as_mut().poll_recover(cx)) {
// We failed to reconnect, while we will try again we will report the
// error if we can to avoid getting trapped in an infinite loop of
// trying to reconnect
self.refresh_error = Some(err);

// Give other tasks a chance to progress before we try to recover
// again. Since the future may not have registered a wake up we do so
// now so the task is not forgotten
cx.waker().wake_by_ref();
return Poll::Pending;
}

match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
Self::refresh_slots(self.inner.clone()),
)));
}
PollFlushAction::Reconnect(addrs) => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.refresh_connections(addrs),
)));
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.reconnect_to_initial_nodes(),
)));
}
ConnectionState::PollComplete => match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(
Box::pin(Self::refresh_slots(self.inner.clone())),
));
}
PollFlushAction::Reconnect(addrs) => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.refresh_connections(addrs),
)));
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
self.reconnect_to_initial_nodes(),
)));
}
},
}
}
}
Expand All @@ -1335,10 +1293,8 @@ where
) -> Poll<Result<(), Self::Error>> {
// Try to drive any in flight requests to completion
match self.poll_complete(cx) {
Poll::Ready(poll_flush_action) => match poll_flush_action {
PollFlushAction::None => (),
_ => Err(()).map_err(|_| ())?,
},
Poll::Ready(PollFlushAction::None) => (),
Poll::Ready(_) => Err(())?,
Poll::Pending => (),
};
// If we no longer have any requests in flight we are done (skips any reconnection
Expand Down

0 comments on commit 97b5be1

Please sign in to comment.