Skip to content

Commit

Permalink
Read remaining bytes when cleaning dropped payload actix#2764
Browse files Browse the repository at this point in the history
  • Loading branch information
squidpickles committed Jun 2, 2022
1 parent dce57a7 commit b381052
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
19 changes: 18 additions & 1 deletion actix-http/src/h1/dispatcher.rs
Expand Up @@ -706,6 +706,9 @@ where
debug!("handler dropped payload early; attempt to clean connection");
// ...in which case poll request payload a few times
loop {
if this.read_buf.is_empty() {
Self::read_available_projected(&mut this, cx)?;
}
match this.codec.decode(this.read_buf)? {
Some(msg) => {
match msg {
Expand Down Expand Up @@ -1010,8 +1013,22 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
let this = self.project();
let mut this = self.project();
Self::read_available_projected(&mut this, cx)
}

/// Returns true when I/O stream can be disconnected after write to it.
/// Meant to be called when there is already access to a projected
/// `InnerDispatcher` available.
///
/// It covers these conditions:
/// - `std::io::ErrorKind::ConnectionReset` after partial read;
/// - all data read done.
#[inline(always)] // TODO: bench this inline
fn read_available_projected(
this: &mut InnerDispatcherProj<'_, T, S, B, X, U>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
if this.flags.contains(Flags::READ_DISCONNECT) {
return Ok(false);
};
Expand Down
68 changes: 68 additions & 0 deletions actix-http/src/h1/dispatcher_tests.rs
Expand Up @@ -783,6 +783,74 @@ async fn upgrade_handling() {
.await;
}

#[actix_rt::test]
async fn handler_drop_large_payload() {
let _ = env_logger::try_init();

const CONTENT_LENGTH: usize = 256 * 1024;
let content = str::from_utf8(&[b'x'; CONTENT_LENGTH]).unwrap();
let buf = TestBuffer::new(http_msg(format!(
r"
POST /drop-payload HTTP/1.1
Content-Length: {}
{}",
CONTENT_LENGTH, content
)));

let services = HttpFlow::new(
drop_payload_service(),
ExpectHandler,
None::<UpgradeHandler>,
);

let h1 = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(h1);

lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
assert!(h1.as_mut().poll(cx).is_pending());

// polls: manual
assert_eq!(h1.poll_count, 2);

let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];

let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);

assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);

if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
assert!(inner.state.is_none());
}
})
.await;
}

#[actix_rt::test]
async fn handler_drop_payload() {
let _ = env_logger::try_init();
Expand Down

0 comments on commit b381052

Please sign in to comment.