Skip to content

Commit

Permalink
Do not panic if client call died
Browse files Browse the repository at this point in the history
Fixes #65
  • Loading branch information
stepancheg committed May 13, 2017
1 parent 0a7440f commit 1729156
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 16 deletions.
8 changes: 5 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ impl HttpClient {
pub fn new(host: &str, port: u16, tls: bool, conf: HttpClientConf) -> HttpResult<HttpClient> {
// TODO: sync
// TODO: try connect to all addrs
let socket_addr = (host, port).to_socket_addrs()?.next().unwrap();
let socket_addr = (host, port).to_socket_addrs()?.next().expect("resolve host/port");

let tls_enabled = match tls {
true => {
let connector = Arc::new(TlsConnector::builder().unwrap().build().unwrap());
let tls_connector = TlsConnector::builder().expect("TlsConnector::Builder")
.build().expect("TlsConnectorBuilder::build");
let connector = Arc::new(tls_connector);
ClientTlsOption::Tls(host.to_owned(), connector)
},
false => ClientTlsOption::Plain,
Expand Down Expand Up @@ -156,7 +158,7 @@ fn run_client_event_loop(
send_to_back: mpsc::Sender<LoopToClient>)
{
// Create an event loop.
let mut lp = reactor::Core::new().unwrap();
let mut lp = reactor::Core::new().expect("Core::new");

// Create a channel to receive shutdown signal.
let (shutdown_signal, shutdown_future) = shutdown_signal();
Expand Down
18 changes: 10 additions & 8 deletions src/client_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,18 @@ impl HttpStream for HttpClientStream {

fn new_data_chunk(&mut self, data: &[u8], last: bool) {
if let Some(ref mut response_handler) = self.response_handler {
response_handler.send(ResultOrEof::Item(HttpStreamPart {
// TODO: reset stream if called is dead
drop(response_handler.send(ResultOrEof::Item(HttpStreamPart {
content: HttpStreamPartContent::Data(Bytes::from(data)),
last: last,
})).unwrap();
})));
}
}

fn rst(&mut self, error_code: ErrorCode) {
if let Some(ref mut response_handler) = self.response_handler {
response_handler.send(ResultOrEof::Error(HttpError::CodeError(error_code)))
.unwrap();
// TODO: reset stream if called is dead
drop(response_handler.send(ResultOrEof::Error(HttpError::CodeError(error_code))));
}
}

Expand Down Expand Up @@ -121,10 +122,11 @@ impl LoopInner for ClientInner {
if headers.0.len() != 0 {

if let Some(ref mut response_handler) = stream.response_handler {
response_handler.send(ResultOrEof::Item(HttpStreamPart {
// TODO: reset stream if called is dead
drop(response_handler.send(ResultOrEof::Item(HttpStreamPart {
content: HttpStreamPartContent::Headers(headers),
last: end_stream == EndStream::Yes,
})).unwrap();
})));
}
}
}
Expand Down Expand Up @@ -191,14 +193,14 @@ impl<I : AsyncRead + AsyncWrite + Send + 'static> ClientWriteLoop<I> {
to_write_tx_1.send(ClientToWriteMessage::BodyChunk(BodyChunkMessage {
stream_id: stream_id,
chunk: chunk,
})).unwrap();
})).expect("client must be dead");
futures::finished::<_, HttpError>(())
});
let future = future
.and_then(move |()| {
to_write_tx_2.send(ClientToWriteMessage::End(EndRequestMessage {
stream_id: stream_id,
})).unwrap();
})).expect("client must be dead");
futures::finished::<_, HttpError>(())
});

Expand Down
37 changes: 33 additions & 4 deletions tests/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ fn stream_count() {

let server = HttpServerTester::new();

debug!("started server on {}", server.port());

let client: HttpClient =
HttpClient::new("::1", server.port(), false, Default::default()).expect("connect");

Expand Down Expand Up @@ -71,8 +69,6 @@ fn rst_is_error() {

let server = HttpServerTester::new();

debug!("started server on {}", server.port());

let client: HttpClient =
HttpClient::new("::1", server.port(), false, Default::default()).expect("connect");

Expand All @@ -97,3 +93,36 @@ fn rst_is_error() {
let state: ConnectionStateSnapshot = client.dump_state().wait().expect("state");
assert_eq!(0, state.streams.len(), "{:?}", state);
}

#[test]
fn client_call_dropped() {
env_logger::init().ok();

let server = HttpServerTester::new();

let client: HttpClient =
HttpClient::new("::1", server.port(), false, Default::default()).expect("connect");

let mut server_tester = server.accept();
server_tester.recv_preface();
server_tester.settings_xchg();

{
let req = client.start_get("/fgfg", "localhost");

server_tester.recv_message(1);

drop(req);

server_tester.send_headers(1, Headers::ok_200(), true);
}

{
let req = client.start_get("/fgfg", "localhost").collect();
server_tester.recv_message(3);
server_tester.send_headers(3, Headers::ok_200(), true);
let resp = req.wait().expect("OK");
assert_eq!(200, resp.headers.status());
}

}
1 change: 1 addition & 0 deletions tests/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate native_tls;
extern crate tokio_core;
extern crate tokio_tls;
extern crate httpbis;
#[macro_use]
extern crate log;
extern crate env_logger;

Expand Down
7 changes: 6 additions & 1 deletion tests/test_misc/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ pub struct HttpServerTester(net::TcpListener);

impl HttpServerTester {
pub fn new() -> HttpServerTester {
HttpServerTester(net::TcpListener::bind("[::1]:0".parse::<net::SocketAddr>().unwrap()).unwrap())
let socket = net::TcpListener::bind("[::1]:0".parse::<net::SocketAddr>().unwrap()).unwrap();
let server = HttpServerTester(socket);

debug!("started HttpServerTester on {}", server.port());

server
}

pub fn port(&self) -> u16 {
Expand Down

0 comments on commit 1729156

Please sign in to comment.