Skip to content

Commit

Permalink
Gracefully handle client-side send errors.
Browse files Browse the repository at this point in the history
Previously, a client channel would immediately disconnect when
encountering an error in Transport::try_send. One kind of error that can
occur in try_send is message validation, e.g. validating a message is
not larger than a configured frame size. The problem with shutting down
the client immediately is that debuggability suffers: it can be hard to
understand what caused the client to fail. Also, these errors are not
always fatal, as with frame size limits, so complete shutdown was
extreme.

By bubbling up errors, it's now possible for the caller to
programmatically handle them. For example, the error could be walked
via anyhow::Error:

    2023-01-10T02:49:32.528939Z  WARN client: the client failed to send the request

    Caused by:
        0: could not write to the transport
        1: frame size too big
  • Loading branch information
tikue committed Jan 10, 2023
1 parent cb931be commit e809fc8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
8 changes: 6 additions & 2 deletions example-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ async fn main() -> anyhow::Result<()> {
let flags = Flags::parse();
init_tracing("Tarpc Example Client")?;

let transport = tarpc::serde_transport::tcp::connect(flags.server_addr, Json::default);
let mut transport = tarpc::serde_transport::tcp::connect(flags.server_addr, Json::default);
transport.config_mut().max_frame_length(usize::MAX);

// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input.
Expand All @@ -42,7 +43,10 @@ async fn main() -> anyhow::Result<()> {
.instrument(tracing::info_span!("Two Hellos"))
.await;

tracing::info!("{:?}", hello);
match hello {
Ok(hello) => tracing::info!("{hello:?}"),
Err(e) => tracing::warn!("{:?}", anyhow::Error::from(e)),
}

// Let the background span processor finish.
sleep(Duration::from_micros(1)).await;
Expand Down
15 changes: 9 additions & 6 deletions tarpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ where
Some(dispatch_request) => dispatch_request,
None => return Poll::Ready(None),
};
let entered = span.enter();
let _entered = span.enter();
// poll_next_request only returns Ready if there is room to buffer another request.
// Therefore, we can call write_request without fear of erroring due to a full
// buffer.
Expand All @@ -528,13 +528,16 @@ where
trace_context: ctx.trace_context,
},
});
self.start_send(request)?;
tracing::info!("SendRequest");
drop(entered);

self.in_flight_requests()
.insert_request(request_id, ctx, span, response_completion)
.insert_request(request_id, ctx, span.clone(), response_completion)
.expect("Request IDs should be unique");
match self.start_send(request) {
Ok(()) => tracing::info!("SendRequest"),
Err(e) => {
self.in_flight_requests()
.complete_request(request_id, Err(RpcError::Send(Box::new(e))));
}
}
Poll::Ready(Some(Ok(())))
}

Expand Down
57 changes: 35 additions & 22 deletions tarpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,24 @@ pub enum ChannelError<E>
where
E: Error + Send + Sync + 'static,
{
/// An error occurred reading from, or writing to, the transport.
#[error("an error occurred in the transport")]
Transport(#[source] E),
/// An error occurred while polling expired requests.
#[error("an error occurred while polling expired requests")]
Timer(#[source] ::tokio::time::error::Error),
/// Could not read from the transport.
#[error("could not read from the transport")]
Read(#[source] E),
/// Could not ready the transport for writes.
#[error("could not ready the transport for writes")]
Ready(#[source] E),
/// Could not write to the transport.
#[error("could not write to the transport")]
Write(#[source] E),
/// Could not flush the transport.
#[error("could not flush the transport")]
Flush(#[source] E),
/// Could not close the write end of the transport.
#[error("could not close the write end of the transport")]
Close(#[source] E),
/// Could not poll expired requests.
#[error("could not poll expired requests")]
Timer(#[source] tokio::time::error::Error),
}

impl<Req, Resp, T> Stream for BaseChannel<Req, Resp, T>
Expand Down Expand Up @@ -635,7 +647,7 @@ where
let request_status = match self
.transport_pin_mut()
.poll_next(cx)
.map_err(ChannelError::Transport)?
.map_err(ChannelError::Read)?
{
Poll::Ready(Some(message)) => match message {
ClientMessage::Request(request) => {
Expand Down Expand Up @@ -697,7 +709,7 @@ where
self.project()
.transport
.poll_ready(cx)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Ready)
}

fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> Result<(), Self::Error> {
Expand All @@ -710,7 +722,7 @@ where
self.project()
.transport
.start_send(response)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Write)
} else {
// If the request isn't tracked anymore, there's no need to send the response.
Ok(())
Expand All @@ -722,14 +734,14 @@ where
self.project()
.transport
.poll_flush(cx)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Flush)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.project()
.transport
.poll_close(cx)
.map_err(ChannelError::Transport)
.map_err(ChannelError::Close)
}
}

Expand Down Expand Up @@ -920,17 +932,18 @@ where
where
S: Serve<Req = C::Req, Resp = C::Resp> + Clone,
{
self.take_while(|result| {
if let Err(e) = result {
tracing::warn!("Requests stream errored out: {}", e);
}
futures::future::ready(result.is_ok())
})
.filter_map(|result| async move { result.ok() })
.map(move |request| {
let serve = serve.clone();
request.execute(serve)
})
self.map_err(anyhow::Error::from)
.take_while(|result| {
if let Err(e) = result {
tracing::warn!("Requests stream errored out: {:?}", e)
}
futures::future::ready(result.is_ok())
})
.filter_map(|result| async move { result.ok() })
.map(move |request| {
let serve = serve.clone();
request.execute(serve)
})
}
}

Expand Down

0 comments on commit e809fc8

Please sign in to comment.