diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index df0d3f36..156ceb0a 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -56,9 +56,6 @@ env_logger = "0.6.0" crossbeam-channel = "0.3.6" systemstat = "0.1.3" futures-test = "0.3.5" -## lock on 1.1 -## https://github.com/myrrlyn/funty/issues/3 -funty = "=1.1.0" [target.'cfg(unix)'.dev-dependencies] nix = "0.13.0" diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 3b46bd1b..ecf7ea04 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -16,7 +16,7 @@ use futures::{ channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, Sink, Stream, }; -use log::{debug, log_enabled, trace, warn}; +use log::{debug, log_enabled, trace}; use tokio::prelude::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; @@ -239,6 +239,12 @@ where let frame = Frame::new_go_away(code); self.send_frame(cx, frame)?; self.local_go_away = true; + let mut new_timer = interval(self.config.connection_write_timeout); + // force registration of new timer to driver + let _ignore = Pin::new(&mut new_timer).as_mut().poll_next(cx); + // Reuse the keepalive timer to set a time out. If remote peer does not respond + // within the time out, consider this session as remote gone away. + self.keepalive = Some(new_timer); Ok(()) } @@ -459,7 +465,10 @@ where } } else { // TODO: stream already closed ? - warn!("substream({}) should exist but not", stream_id); + debug!( + "substream({}) should exist but not, may drop by self", + stream_id + ); false } }; @@ -621,7 +630,13 @@ where if let Some(ref mut interval) = self.keepalive { match Pin::new(interval).as_mut().poll_next(cx) { Poll::Ready(Some(_)) => { - self.keep_alive(cx, Instant::now())?; + if self.local_go_away { + // The remote peer has not responded to our sent go away code. + // Assume that remote peer has gone away and this session should be closed. + self.remote_go_away = true; + } else { + self.keep_alive(cx, Instant::now())?; + } } Poll::Ready(None) => { debug!("yamux::Session poll keepalive interval finished"); @@ -675,7 +690,14 @@ mod timer { #[cfg(feature = "generic-timer")] pub use generic_time::{interval, Interval}; #[cfg(feature = "tokio-timer")] - pub use tokio::time::{interval, Interval}; + pub use tokio::time::Interval; + + #[cfg(feature = "tokio-timer")] + pub fn interval(period: ::std::time::Duration) -> Interval { + use tokio::time::{self, interval_at}; + + interval_at(time::Instant::now() + period, period) + } #[cfg(target_arch = "wasm32")] pub use wasm_mock::Instant; @@ -839,6 +861,7 @@ mod test { io, pin::Pin, task::{Context, Poll}, + time::Duration, }; use tokio::{ io::AsyncReadExt, @@ -1094,4 +1117,33 @@ mod test { assert_eq!(reset_msg.stream_id(), 5) }); } + + #[test] + fn test_remote_does_not_respond_go_away() { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let (_remote, local) = MockSocket::new(); + let mut config = Config::default(); + config.enable_keepalive = false; + config.connection_write_timeout = Duration::from_secs(1); + + let mut session = Session::new_server(local, config); + + let mut control = session.control(); + tokio::spawn(async move { + let _ignore = control.close().await; + }); + + // The purpose of this test is to ensure that if the remote does not respond to the + // go away message, it must be able to actively disconnect the session instead of hanging. + // So, if the test fails to exit, it means there has a problem + while let Some(Ok(mut stream)) = session.next().await { + tokio::spawn(async move { + let mut buf = [0; 100]; + let _ignore = stream.read(&mut buf).await; + }); + } + }); + } } diff --git a/yamux/src/stream.rs b/yamux/src/stream.rs index 5c007198..0c27408e 100644 --- a/yamux/src/stream.rs +++ b/yamux/src/stream.rs @@ -449,18 +449,22 @@ impl AsyncWrite for StreamHandle { impl Drop for StreamHandle { fn drop(&mut self) { - if !self.unbound_event_sender.is_closed() - && self.state != StreamState::Closed - && self.state != StreamState::LocalClosing - { - let mut flags = self.get_flags(); - flags.add(Flag::Rst); - let frame = Frame::new_window_update(flags, self.id, 0); - let rst_event = StreamEvent::Frame(frame); + if !self.unbound_event_sender.is_closed() && self.state != StreamState::Closed { let event = StreamEvent::Closed(self.id); - // Always successful unless the session is dropped - let _ignore = self.unbound_event_sender.unbounded_send(rst_event); - let _ignore = self.unbound_event_sender.unbounded_send(event); + if self.state == StreamState::LocalClosing { + // LocalClosing means that local have sent Fin to the remote and waiting for a response. + // So, here only need to send a cleanup message + let _ignore = self.unbound_event_sender.unbounded_send(event); + } else { + let mut flags = self.get_flags(); + flags.add(Flag::Rst); + let frame = Frame::new_window_update(flags, self.id, 0); + let rst_event = StreamEvent::Frame(frame); + + // Always successful unless the session is dropped + let _ignore = self.unbound_event_sender.unbounded_send(rst_event); + let _ignore = self.unbound_event_sender.unbounded_send(event); + } } } } @@ -575,6 +579,41 @@ mod test { }); } + #[test] + fn test_drop_with_state_local_close() { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let (_frame_sender, frame_receiver) = channel(2); + let (unbound_sender, mut unbound_receiver) = unbounded(); + let mut stream = StreamHandle::new( + 0, + unbound_sender, + frame_receiver, + StreamState::Init, + INITIAL_STREAM_WINDOW, + INITIAL_STREAM_WINDOW, + ); + + let _ignore = stream.shutdown().await; + + let event = unbound_receiver.next().await.unwrap(); + match event { + StreamEvent::Frame(frame) => { + assert!(frame.flags().contains(Flag::Fin)); + assert_eq!(frame.ty(), Type::WindowUpdate); + } + _ => panic!("must be fin window update"), + } + + drop(stream); + let event = unbound_receiver.next().await.unwrap(); + match event { + StreamEvent::Closed(_) => (), + _ => panic!("must be state closed"), + } + }); + } + #[test] fn test_data_large_than_recv_window() { let mut rt = tokio::runtime::Runtime::new().unwrap();