Skip to content

Commit

Permalink
Use Sink API instead of Stream for client
Browse files Browse the repository at this point in the history
Issue #34
  • Loading branch information
stepancheg committed Oct 28, 2018
1 parent fd03305 commit 57c052b
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 105 deletions.
18 changes: 18 additions & 0 deletions httpbis-test/src/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,24 @@ impl HttpConnTester {
assert!(data.is_empty());
}

pub fn recv_frames_data_check(
&mut self,
stream_id: StreamId,
frame_size: usize,
total_size: usize,
end: bool,
) -> Vec<u8> {
let mut r = Vec::new();
while r.len() != total_size {
let expect_frame_end = end && (r.len() + frame_size >= total_size);
let frame = self.recv_frame_data_check(stream_id, expect_frame_end);
assert!(frame.len() == frame_size || r.len() + frame.len() == total_size);
r.extend_from_slice(&frame[..]);
}
assert_eq!(total_size, r.len());
r
}

/// Receive at most two frames till END_STREAM frame
/// * if first frame has not END_STREAM, second must have empty payload
pub fn recv_frame_data_tail(&mut self, stream_id: StreamId) -> Vec<u8> {
Expand Down
150 changes: 148 additions & 2 deletions httpbis-test/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use futures::sync::oneshot;

use tokio_core::reactor;

use futures::executor;
use futures::future;
use futures::Async;
use httpbis::for_test::solicit::DEFAULT_SETTINGS;
use httpbis::for_test::*;
use httpbis::ErrorCode;
Expand Down Expand Up @@ -218,8 +221,7 @@ pub fn issue_89() {

let r1 = client.start_get("/r1", "localhost");

server_tester.recv_frame_headers_check(1, false);
assert!(server_tester.recv_frame_data_tail(1).is_empty());
server_tester.recv_frame_headers_check(1, true);

server_tester.send_headers(1, Headers::ok_200(), false);
let (_, resp1) = r1.0.wait().unwrap();
Expand Down Expand Up @@ -284,3 +286,147 @@ fn external_event_loop() {

t.join().expect("join");
}

#[test]
pub fn sink_poll() {
init_logger();

let (mut server_tester, client) = HttpConnTester::new_server_with_client_xchg();

let (mut sender, _response) = client
.start_post_sink("/foo", "sink")
.wait()
.expect("start_post_sink");

server_tester.recv_frame_headers_check(1, false);

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(65535, client.conn_state().out_window_size);
assert_eq!(65535, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(65535, client.stream_state(1).pump_out_window_size);

assert_eq!(Ok(Async::Ready(())), sender.poll());

let b = Bytes::from(vec![1; 65_535]);
sender.send_data(b.clone()).expect("send_data");

assert_eq!(
b,
Bytes::from(server_tester.recv_frames_data_check(1, 16_384, 65_535, false))
);

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(0, client.conn_state().out_window_size);
assert_eq!(0, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(0, client.stream_state(1).out_window_size);
assert_eq!(0, client.stream_state(1).pump_out_window_size);

let mut sender = executor::spawn(future::lazy(move || {
assert_eq!(Ok(Async::NotReady), sender.poll());
future::ok::<_, ()>(sender)
})).wait_future()
.unwrap();

server_tester.send_window_update_conn(3);
server_tester.send_window_update_stream(1, 5);

sender.wait().unwrap();

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(3, client.conn_state().out_window_size);
assert_eq!(3, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(5, client.stream_state(1).out_window_size);
assert_eq!(5, client.stream_state(1).pump_out_window_size);

let b = Bytes::from(vec![11, 22]);
sender.send_data(b.clone()).expect("send_data");
assert_eq!(
b,
Bytes::from(server_tester.recv_frame_data_check(1, false))
);

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(1, client.conn_state().out_window_size);
assert_eq!(1, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(3, client.stream_state(1).out_window_size);
assert_eq!(3, client.stream_state(1).pump_out_window_size);

sender.wait().unwrap();

let b = Bytes::from(vec![33, 44]);
sender.send_data(b.clone()).expect("send_data");
assert_eq!(
b.slice(0, 1),
Bytes::from(server_tester.recv_frame_data_check(1, false))
);

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(0, client.conn_state().out_window_size);
assert_eq!(-1, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(2, client.stream_state(1).out_window_size);
assert_eq!(1, client.stream_state(1).pump_out_window_size);
}

#[test]
fn sink_reset_by_peer() {
init_logger();

let (mut server_tester, client) = HttpConnTester::new_server_with_client_xchg();

let (mut sender, _response) = client
.start_post_sink("/foo", "sink")
.wait()
.expect("start_post_sink");

server_tester.recv_frame_headers_check(1, false);

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(65535, client.conn_state().out_window_size);
assert_eq!(65535, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(65535, client.stream_state(1).out_window_size);
assert_eq!(65535, client.stream_state(1).pump_out_window_size);

assert_eq!(Ok(Async::Ready(())), sender.poll());

let b = Bytes::from(vec![1; 65_535 * 2]);
sender.send_data(b.clone()).expect("send_data");

assert_eq!(
b.slice(0, 65_535),
Bytes::from(server_tester.recv_frames_data_check(1, 16_384, 65_535, false))
);

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(0, client.conn_state().out_window_size);
assert_eq!(-65535, client.conn_state().pump_out_window_size);
assert_eq!(65535, client.stream_state(1).in_window_size);
assert_eq!(0, client.stream_state(1).out_window_size);
assert_eq!(-65535, client.stream_state(1).pump_out_window_size);

server_tester.send_rst(1, ErrorCode::Cancel);

while client.conn_state().streams.len() != 0 {
// spin-wait
}

// pump out window must be reset to out window

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(0, client.conn_state().out_window_size);
assert_eq!(0, client.conn_state().pump_out_window_size);

// check that if more data is sent, pump_out_window_size is not exhausted

let b = Bytes::from(vec![1; 100_000]);
sender.send_data(b.clone()).expect("send_data");

assert_eq!(65535, client.conn_state().in_window_size);
assert_eq!(0, client.conn_state().out_window_size);
assert_eq!(0, client.conn_state().pump_out_window_size);
}
20 changes: 7 additions & 13 deletions src/bin/client_server_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ extern crate httpbis;
use bytes::Bytes;
use futures::future::Future;
use futures::stream::Stream;
use futures::sync::mpsc;
use httpbis::Client;
use httpbis::Headers;
use httpbis::HttpStreamAfterHeaders;
Expand Down Expand Up @@ -95,29 +94,24 @@ fn ping_pong() {
Default::default(),
).expect("client");

let (request_tx, request_rx) = mpsc::unbounded();

let (header, body) = client
.start_post_stream(
"/any",
"localhost",
request_rx.map_err(|()| httpbis::Error::Other("other")),
).wait()
let (mut sender, response) = client
.start_post_sink("/any", "localhost")
.wait()
.expect("request");

let (header, response) = response.wait().expect("response wait");

assert_eq!(200, header.status());

let body = body.filter_data();
let body = response.filter_data();
let mut body = body.wait();

let mut i = 0u32;
forever(|| {
i = i.wrapping_add(1);
let mut req = Vec::new();
req.resize(BLOCK_SIZE, i as u8);
request_tx
.unbounded_send(Bytes::from(req))
.expect("send error");
sender.send_data(Bytes::from(req)).expect("send_data");

let mut read = 0;
while read < BLOCK_SIZE {
Expand Down
Loading

0 comments on commit 57c052b

Please sign in to comment.