From 9c8c2d6e69eaa3f6a0543e792a74720ada5df998 Mon Sep 17 00:00:00 2001 From: driftluo Date: Mon, 1 Feb 2021 17:32:00 +0800 Subject: [PATCH 1/5] chore: change log level --- yamux/src/session.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 3b46bd1b..7f7a7bdd 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -16,7 +16,7 @@ use futures::{ channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, Sink, Stream, }; -use log::{debug, log_enabled, trace, warn}; +use log::{debug, log_enabled, trace}; use tokio::prelude::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; @@ -459,7 +459,10 @@ where } } else { // TODO: stream already closed ? - warn!("substream({}) should exist but not", stream_id); + debug!( + "substream({}) should exist but not, may drop by self", + stream_id + ); false } }; From 905177cac06e028fcbf295ffd5ace4310789aff3 Mon Sep 17 00:00:00 2001 From: driftluo Date: Mon, 1 Feb 2021 20:27:20 +0800 Subject: [PATCH 2/5] fix: remote does not respond go away --- yamux/src/session.rs | 45 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 7f7a7bdd..767b65cb 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -239,6 +239,8 @@ where let frame = Frame::new_go_away(code); self.send_frame(cx, frame)?; self.local_go_away = true; + // max wait time for remote go away + self.keepalive = Some(interval(self.config.connection_write_timeout)); Ok(()) } @@ -624,7 +626,12 @@ where if let Some(ref mut interval) = self.keepalive { match Pin::new(interval).as_mut().poll_next(cx) { Poll::Ready(Some(_)) => { - self.keep_alive(cx, Instant::now())?; + if self.local_go_away { + // deadline for local go away wait time + self.remote_go_away = true; + } else { + self.keep_alive(cx, Instant::now())?; + } } Poll::Ready(None) => { debug!("yamux::Session poll keepalive interval finished"); @@ -678,7 +685,14 @@ mod timer { #[cfg(feature = "generic-timer")] pub use generic_time::{interval, Interval}; #[cfg(feature = "tokio-timer")] - pub use tokio::time::{interval, Interval}; + pub use tokio::time::Interval; + + #[cfg(feature = "tokio-timer")] + pub fn interval(period: ::std::time::Duration) -> Interval { + use tokio::time::{self, interval_at}; + + interval_at(time::Instant::now() + period, period) + } #[cfg(target_arch = "wasm32")] pub use wasm_mock::Instant; @@ -842,6 +856,7 @@ mod test { io, pin::Pin, task::{Context, Poll}, + time::Duration, }; use tokio::{ io::AsyncReadExt, @@ -1097,4 +1112,30 @@ mod test { assert_eq!(reset_msg.stream_id(), 5) }); } + + #[test] + fn test_remote_does_not_respond_go_away() { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let (_remote, local) = MockSocket::new(); + let mut config = Config::default(); + config.enable_keepalive = false; + config.connection_write_timeout = Duration::from_secs(1); + + let mut session = Session::new_server(local, config); + + let mut control = session.control(); + tokio::spawn(async move { + let _ignore = control.close().await; + }); + + while let Some(Ok(mut stream)) = session.next().await { + tokio::spawn(async move { + let mut buf = [0; 100]; + let _ignore = stream.read(&mut buf).await; + }); + } + }); + } } From 853d47599459e2b94b8a7316bc1fbb354e7ff277 Mon Sep 17 00:00:00 2001 From: driftluo Date: Tue, 2 Feb 2021 12:26:41 +0800 Subject: [PATCH 3/5] fix: drop with local close should send event --- yamux/src/stream.rs | 59 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/yamux/src/stream.rs b/yamux/src/stream.rs index 5c007198..8dc6dab8 100644 --- a/yamux/src/stream.rs +++ b/yamux/src/stream.rs @@ -449,18 +449,20 @@ impl AsyncWrite for StreamHandle { impl Drop for StreamHandle { fn drop(&mut self) { - if !self.unbound_event_sender.is_closed() - && self.state != StreamState::Closed - && self.state != StreamState::LocalClosing - { - let mut flags = self.get_flags(); - flags.add(Flag::Rst); - let frame = Frame::new_window_update(flags, self.id, 0); - let rst_event = StreamEvent::Frame(frame); + if !self.unbound_event_sender.is_closed() && self.state != StreamState::Closed { let event = StreamEvent::Closed(self.id); - // Always successful unless the session is dropped - let _ignore = self.unbound_event_sender.unbounded_send(rst_event); - let _ignore = self.unbound_event_sender.unbounded_send(event); + if self.state == StreamState::LocalClosing { + let _ignore = self.unbound_event_sender.unbounded_send(event); + } else { + let mut flags = self.get_flags(); + flags.add(Flag::Rst); + let frame = Frame::new_window_update(flags, self.id, 0); + let rst_event = StreamEvent::Frame(frame); + + // Always successful unless the session is dropped + let _ignore = self.unbound_event_sender.unbounded_send(rst_event); + let _ignore = self.unbound_event_sender.unbounded_send(event); + } } } } @@ -575,6 +577,41 @@ mod test { }); } + #[test] + fn test_drop_with_state_local_close() { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let (_frame_sender, frame_receiver) = channel(2); + let (unbound_sender, mut unbound_receiver) = unbounded(); + let mut stream = StreamHandle::new( + 0, + unbound_sender, + frame_receiver, + StreamState::Init, + INITIAL_STREAM_WINDOW, + INITIAL_STREAM_WINDOW, + ); + + let _ignore = stream.shutdown().await; + + let event = unbound_receiver.next().await.unwrap(); + match event { + StreamEvent::Frame(frame) => { + assert!(frame.flags().contains(Flag::Fin)); + assert_eq!(frame.ty(), Type::WindowUpdate); + } + _ => panic!("must be fin window update"), + } + + drop(stream); + let event = unbound_receiver.next().await.unwrap(); + match event { + StreamEvent::Closed(_) => (), + _ => panic!("must be state closed"), + } + }); + } + #[test] fn test_data_large_than_recv_window() { let mut rt = tokio::runtime::Runtime::new().unwrap(); From 74e8e94bf3b4f1c108a520cf064d9a8b4735589f Mon Sep 17 00:00:00 2001 From: driftluo Date: Thu, 18 Feb 2021 12:23:45 +0800 Subject: [PATCH 4/5] fix: force registration of new timer to driver --- tentacle/Cargo.toml | 3 --- yamux/src/session.rs | 5 ++++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index df0d3f36..156ceb0a 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -56,9 +56,6 @@ env_logger = "0.6.0" crossbeam-channel = "0.3.6" systemstat = "0.1.3" futures-test = "0.3.5" -## lock on 1.1 -## https://github.com/myrrlyn/funty/issues/3 -funty = "=1.1.0" [target.'cfg(unix)'.dev-dependencies] nix = "0.13.0" diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 767b65cb..6007dd07 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -239,8 +239,11 @@ where let frame = Frame::new_go_away(code); self.send_frame(cx, frame)?; self.local_go_away = true; + let mut new_timer = interval(self.config.connection_write_timeout); + // force registration of new timer to driver + let _ignore = Pin::new(&mut new_timer).as_mut().poll_next(cx); // max wait time for remote go away - self.keepalive = Some(interval(self.config.connection_write_timeout)); + self.keepalive = Some(new_timer); Ok(()) } From 6cc45c94864fccc1cba76673f2732da1e0ad25ac Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 19 Feb 2021 18:34:34 +0800 Subject: [PATCH 5/5] chore: add comment --- yamux/src/session.rs | 9 +++++++-- yamux/src/stream.rs | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 6007dd07..ecf7ea04 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -242,7 +242,8 @@ where let mut new_timer = interval(self.config.connection_write_timeout); // force registration of new timer to driver let _ignore = Pin::new(&mut new_timer).as_mut().poll_next(cx); - // max wait time for remote go away + // Reuse the keepalive timer to set a time out. If remote peer does not respond + // within the time out, consider this session as remote gone away. self.keepalive = Some(new_timer); Ok(()) } @@ -630,7 +631,8 @@ where match Pin::new(interval).as_mut().poll_next(cx) { Poll::Ready(Some(_)) => { if self.local_go_away { - // deadline for local go away wait time + // The remote peer has not responded to our sent go away code. + // Assume that remote peer has gone away and this session should be closed. self.remote_go_away = true; } else { self.keep_alive(cx, Instant::now())?; @@ -1133,6 +1135,9 @@ mod test { let _ignore = control.close().await; }); + // The purpose of this test is to ensure that if the remote does not respond to the + // go away message, it must be able to actively disconnect the session instead of hanging. + // So, if the test fails to exit, it means there has a problem while let Some(Ok(mut stream)) = session.next().await { tokio::spawn(async move { let mut buf = [0; 100]; diff --git a/yamux/src/stream.rs b/yamux/src/stream.rs index 8dc6dab8..0c27408e 100644 --- a/yamux/src/stream.rs +++ b/yamux/src/stream.rs @@ -452,6 +452,8 @@ impl Drop for StreamHandle { if !self.unbound_event_sender.is_closed() && self.state != StreamState::Closed { let event = StreamEvent::Closed(self.id); if self.state == StreamState::LocalClosing { + // LocalClosing means that local have sent Fin to the remote and waiting for a response. + // So, here only need to send a cleanup message let _ignore = self.unbound_event_sender.unbounded_send(event); } else { let mut flags = self.get_flags();