Skip to content

Commit

Permalink
Fix HTTP/2: retry requests rejected by a graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jan 7, 2022
1 parent eb8e9d5 commit a03ca50
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
http-body = "0.4.0"
hyper = { version = "0.14", default-features = false, features = ["tcp", "http1", "http2", "client", "runtime"] }
h2 = "0.3.10"
lazy_static = "1.4"
log = "0.4"
mime = "0.3.16"
Expand Down
57 changes: 56 additions & 1 deletion src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::Sleep;

use log::debug;
use log::{debug, trace};

use super::decoder::Accepts;
use super::request::{Request, RequestBuilder};
Expand Down Expand Up @@ -1418,6 +1418,8 @@ impl Client {

urls: Vec::new(),

retry_count: 0,

client: self.inner.clone(),

in_flight,
Expand Down Expand Up @@ -1628,6 +1630,8 @@ pin_project! {

urls: Vec<Url>,

retry_count: usize,

client: Arc<ClientRef>,

#[pin]
Expand All @@ -1653,6 +1657,54 @@ impl PendingRequest {
fn headers(self: Pin<&mut Self>) -> &mut HeaderMap {
self.project().headers
}

fn retry_error(mut self: Pin<&mut Self>, err: &(dyn std::error::Error + 'static)) -> bool {
if !is_retryable_error(err) {
return false;
}

trace!("can retry {:?}", err);

let body = match self.body {
Some(Some(ref body)) => Body::reusable(body.clone()),
Some(None) => {
debug!("error was retryable, but body not reusable");
return false;
}
None => Body::empty(),
};

if self.retry_count >= 2 {
trace!("retry count too high");
return false;
}
self.retry_count += 1;

let uri = expect_uri(&self.url);
let mut req = hyper::Request::builder()
.method(self.method.clone())
.uri(uri)
.body(body.into_stream())
.expect("valid request parts");

*req.headers_mut() = self.headers.clone();

*self.as_mut().in_flight().get_mut() = self.client.hyper.request(req);

true
}
}

fn is_retryable_error(err: &(dyn std::error::Error + 'static)) -> bool {
if let Some(cause) = err.source() {
if let Some(err) = cause.downcast_ref::<h2::Error>() {
// They sent us a graceful shutdown, try with a new connection!
return err.is_go_away()
&& err.is_remote()
&& err.reason() == Some(h2::Reason::NO_ERROR);
}
}
false
}

impl Pending {
Expand Down Expand Up @@ -1696,6 +1748,9 @@ impl Future for PendingRequest {
loop {
let res = match self.as_mut().in_flight().as_mut().poll(cx) {
Poll::Ready(Err(e)) => {
if self.as_mut().retry_error(&e) {
continue;
}
return Poll::Ready(Err(crate::error::request(e).with_url(self.url.clone())));
}
Poll::Ready(Ok(res)) => res,
Expand Down

0 comments on commit a03ca50

Please sign in to comment.