Skip to content

Commit

Permalink
Fix handling not consumed request's payload (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 29, 2024
1 parent 9c29de1 commit 34142e1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 33 deletions.
30 changes: 8 additions & 22 deletions ntex-service/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@ where
let mut ready2 = false;

poll_fn(move |cx| {
if !ready1 {
match pin::Pin::new(&mut fut1).poll(cx) {
Poll::Ready(_) => ready1 = true,
Poll::Pending => (),
}
if !ready1 && pin::Pin::new(&mut fut1).poll(cx).is_ready() {
ready1 = true;
}
if !ready2 {
match pin::Pin::new(&mut fut2).poll(cx) {
Poll::Ready(_) => ready2 = true,
Poll::Pending => (),
}
if !ready2 && pin::Pin::new(&mut fut2).poll(cx).is_ready() {
ready2 = true
}
if ready1 && ready2 {
Poll::Ready(())
Expand All @@ -51,19 +45,11 @@ where
let mut ready2 = false;

poll_fn(move |cx| {
if !ready1 {
match pin::Pin::new(&mut fut1).poll(cx) {
Poll::Ready(Ok(())) => ready1 = true,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => (),
}
if !ready1 && pin::Pin::new(&mut fut1).poll(cx)?.is_ready() {
ready1 = true;
}
if !ready2 {
match pin::Pin::new(&mut fut2).poll(cx) {
Poll::Ready(Ok(())) => ready2 = true,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => (),
};
if !ready2 && pin::Pin::new(&mut fut2).poll(cx)?.is_ready() {
ready2 = true;
}
if ready1 && ready2 {
Poll::Ready(Ok(()))
Expand Down
2 changes: 2 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

* http: Fix handling payload timer after payload got consumed

* http: Fix handling not consumed request's payload

## [2.0.0] - 2024-05-28

* Use "async fn" for Service::ready() and Service::shutdown()
Expand Down
11 changes: 9 additions & 2 deletions ntex/src/http/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ where
State::ReadRequest => ready!(inner.poll_read_request(cx)),
// consume request's payload
State::ReadPayload => {
ready!(inner.poll_request_payload(cx)).unwrap_or(State::ReadRequest)
let result = inner.poll_request_payload(cx);
if inner.flags.contains(Flags::SENDPAYLOAD_AND_STOP) {
inner.stop()
} else {
ready!(result).unwrap_or(State::ReadRequest)
}
}
// send response body
State::SendPayload { body } => {
Expand All @@ -224,6 +229,7 @@ where
let _ = ready!(Pin::new(f).poll(cx));
fut.take();
}
log::debug!("{}: Dispatcher is stopped", inner.io.tag());

return Poll::Ready(
if let Some(io) = io {
Expand Down Expand Up @@ -654,7 +660,8 @@ where
// wait until future completes and then close
// connection
self.payload = None;
Poll::Ready(Err(Either::Left(ProtocolError::PayloadIsNotConsumed)))
self.flags.insert(Flags::SENDPAYLOAD_AND_STOP);
Poll::Pending
}
}
}
Expand Down
10 changes: 1 addition & 9 deletions ntex/src/http/h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ pub enum ProtocolError {
#[error("Payload did not complete within the specified timeout")]
SlowPayloadTimeout,

/// Payload is not consumed
#[error("Task is completed but request's payload is not consumed")]
PayloadIsNotConsumed,

/// Response body processing error
#[error("Response body processing error: {0}")]
ResponsePayload(Box<dyn std::error::Error>),
Expand All @@ -89,14 +85,10 @@ impl super::ResponseError for ProtocolError {
fn error_response(&self) -> super::Response {
match self {
ProtocolError::Decode(_) => super::Response::BadRequest().into(),

ProtocolError::SlowRequestTimeout | ProtocolError::SlowPayloadTimeout => {
super::Response::RequestTimeout().into()
}

ProtocolError::Encode(_)
| ProtocolError::PayloadIsNotConsumed
| ProtocolError::ResponsePayload(_) => {
ProtocolError::Encode(_) | ProtocolError::ResponsePayload(_) => {
super::Response::InternalServerError().into()
}
}
Expand Down
23 changes: 23 additions & 0 deletions ntex/tests/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,29 @@ async fn test_http1_disable_payload_timer_after_whole_pl_has_been_read() {
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
}

/// Handle not consumed payload
#[ntex::test]
async fn test_http1_handle_not_consumed_payload() {
let srv = test_server(|| {
HttpService::build()
.h1_control(fn_service(move |msg: Control<_, _>| {
if matches!(msg, Control::ProtocolError(_)) {
panic!()
}
async move { Ok::<_, io::Error>(msg.ack()) }
}))
.h1(|_| async move { Ok::<_, io::Error>(Response::Ok().finish()) })
});

let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\ncontent-length: 4\r\n\r\n");
sleep(Millis(250)).await;
let _ = stream.write_all(b"1234");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
}

#[ntex::test]
async fn test_content_length() {
let srv = test_server(|| {
Expand Down

0 comments on commit 34142e1

Please sign in to comment.