From 172915640d6a1080dff4a261e324fcd9b12d36ef Mon Sep 17 00:00:00 2001 From: Stepan Koltsov Date: Sat, 13 May 2017 21:56:31 +0300 Subject: [PATCH] Do not panic if client call died Fixes #65 --- src/client.rs | 8 +++++--- src/client_conn.rs | 18 ++++++++++-------- tests/http_client.rs | 37 +++++++++++++++++++++++++++++++++---- tests/http_server.rs | 1 + tests/test_misc/tester.rs | 7 ++++++- 5 files changed, 55 insertions(+), 16 deletions(-) diff --git a/src/client.rs b/src/client.rs index 00a31c33..db7bbe88 100644 --- a/src/client.rs +++ b/src/client.rs @@ -49,11 +49,13 @@ impl HttpClient { pub fn new(host: &str, port: u16, tls: bool, conf: HttpClientConf) -> HttpResult { // TODO: sync // TODO: try connect to all addrs - let socket_addr = (host, port).to_socket_addrs()?.next().unwrap(); + let socket_addr = (host, port).to_socket_addrs()?.next().expect("resolve host/port"); let tls_enabled = match tls { true => { - let connector = Arc::new(TlsConnector::builder().unwrap().build().unwrap()); + let tls_connector = TlsConnector::builder().expect("TlsConnector::Builder") + .build().expect("TlsConnectorBuilder::build"); + let connector = Arc::new(tls_connector); ClientTlsOption::Tls(host.to_owned(), connector) }, false => ClientTlsOption::Plain, @@ -156,7 +158,7 @@ fn run_client_event_loop( send_to_back: mpsc::Sender) { // Create an event loop. - let mut lp = reactor::Core::new().unwrap(); + let mut lp = reactor::Core::new().expect("Core::new"); // Create a channel to receive shutdown signal. let (shutdown_signal, shutdown_future) = shutdown_signal(); diff --git a/src/client_conn.rs b/src/client_conn.rs index bc3f6381..15317d37 100644 --- a/src/client_conn.rs +++ b/src/client_conn.rs @@ -49,17 +49,18 @@ impl HttpStream for HttpClientStream { fn new_data_chunk(&mut self, data: &[u8], last: bool) { if let Some(ref mut response_handler) = self.response_handler { - response_handler.send(ResultOrEof::Item(HttpStreamPart { + // TODO: reset stream if called is dead + drop(response_handler.send(ResultOrEof::Item(HttpStreamPart { content: HttpStreamPartContent::Data(Bytes::from(data)), last: last, - })).unwrap(); + }))); } } fn rst(&mut self, error_code: ErrorCode) { if let Some(ref mut response_handler) = self.response_handler { - response_handler.send(ResultOrEof::Error(HttpError::CodeError(error_code))) - .unwrap(); + // TODO: reset stream if called is dead + drop(response_handler.send(ResultOrEof::Error(HttpError::CodeError(error_code)))); } } @@ -121,10 +122,11 @@ impl LoopInner for ClientInner { if headers.0.len() != 0 { if let Some(ref mut response_handler) = stream.response_handler { - response_handler.send(ResultOrEof::Item(HttpStreamPart { + // TODO: reset stream if called is dead + drop(response_handler.send(ResultOrEof::Item(HttpStreamPart { content: HttpStreamPartContent::Headers(headers), last: end_stream == EndStream::Yes, - })).unwrap(); + }))); } } } @@ -191,14 +193,14 @@ impl ClientWriteLoop { to_write_tx_1.send(ClientToWriteMessage::BodyChunk(BodyChunkMessage { stream_id: stream_id, chunk: chunk, - })).unwrap(); + })).expect("client must be dead"); futures::finished::<_, HttpError>(()) }); let future = future .and_then(move |()| { to_write_tx_2.send(ClientToWriteMessage::End(EndRequestMessage { stream_id: stream_id, - })).unwrap(); + })).expect("client must be dead"); futures::finished::<_, HttpError>(()) }); diff --git a/tests/http_client.rs b/tests/http_client.rs index 60d1d73b..c527038d 100644 --- a/tests/http_client.rs +++ b/tests/http_client.rs @@ -29,8 +29,6 @@ fn stream_count() { let server = HttpServerTester::new(); - debug!("started server on {}", server.port()); - let client: HttpClient = HttpClient::new("::1", server.port(), false, Default::default()).expect("connect"); @@ -71,8 +69,6 @@ fn rst_is_error() { let server = HttpServerTester::new(); - debug!("started server on {}", server.port()); - let client: HttpClient = HttpClient::new("::1", server.port(), false, Default::default()).expect("connect"); @@ -97,3 +93,36 @@ fn rst_is_error() { let state: ConnectionStateSnapshot = client.dump_state().wait().expect("state"); assert_eq!(0, state.streams.len(), "{:?}", state); } + +#[test] +fn client_call_dropped() { + env_logger::init().ok(); + + let server = HttpServerTester::new(); + + let client: HttpClient = + HttpClient::new("::1", server.port(), false, Default::default()).expect("connect"); + + let mut server_tester = server.accept(); + server_tester.recv_preface(); + server_tester.settings_xchg(); + + { + let req = client.start_get("/fgfg", "localhost"); + + server_tester.recv_message(1); + + drop(req); + + server_tester.send_headers(1, Headers::ok_200(), true); + } + + { + let req = client.start_get("/fgfg", "localhost").collect(); + server_tester.recv_message(3); + server_tester.send_headers(3, Headers::ok_200(), true); + let resp = req.wait().expect("OK"); + assert_eq!(200, resp.headers.status()); + } + +} diff --git a/tests/http_server.rs b/tests/http_server.rs index 5b328ba7..7c1e6e5e 100644 --- a/tests/http_server.rs +++ b/tests/http_server.rs @@ -4,6 +4,7 @@ extern crate native_tls; extern crate tokio_core; extern crate tokio_tls; extern crate httpbis; +#[macro_use] extern crate log; extern crate env_logger; diff --git a/tests/test_misc/tester.rs b/tests/test_misc/tester.rs index f5c7fad1..9efb7224 100644 --- a/tests/test_misc/tester.rs +++ b/tests/test_misc/tester.rs @@ -32,7 +32,12 @@ pub struct HttpServerTester(net::TcpListener); impl HttpServerTester { pub fn new() -> HttpServerTester { - HttpServerTester(net::TcpListener::bind("[::1]:0".parse::().unwrap()).unwrap()) + let socket = net::TcpListener::bind("[::1]:0".parse::().unwrap()).unwrap(); + let server = HttpServerTester(socket); + + debug!("started HttpServerTester on {}", server.port()); + + server } pub fn port(&self) -> u16 {