Skip to content

Commit

Permalink
set read timeout for half-open connections (one direction closed)
Browse files Browse the repository at this point in the history
- ref #490
  • Loading branch information
zonyitoo committed Jun 5, 2021
1 parent a8de054 commit e32c869
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 22 deletions.
23 changes: 17 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/shadowsocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async-trait = "0.1"
mio = "0.7"
socket2 = { version = "0.4", features = ["all"] }
tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "process", "rt", "sync", "time"] }
tokio-io-timeout = "1.1"

trust-dns-resolver = { version = "0.20", optional = true }
arc-swap = { version = "1.3", optional = true }
Expand Down
76 changes: 60 additions & 16 deletions crates/shadowsocks/src/relay/tcprelay/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use std::{
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::ready;
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_io_timeout::TimeoutReader;

use crate::crypto::v1::{CipherCategory, CipherKind};

Expand Down Expand Up @@ -175,26 +178,26 @@ enum TransferState {
Done(u64),
}

#[pin_project(project = CopyBidirectionalProj)]
struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
a: &'a mut A,
b: &'a mut B,
#[pin]
a: TimeoutReader<&'a mut A>,
#[pin]
b: TimeoutReader<&'a mut B>,
a_to_b: TransferState,
b_to_a: TransferState,
}

fn transfer_one_direction<A, B>(
cx: &mut Context<'_>,
state: &mut TransferState,
r: &mut A,
w: &mut B,
mut r: Pin<&mut TimeoutReader<&mut A>>,
mut w: Pin<&mut TimeoutReader<&mut B>>,
) -> Poll<io::Result<u64>>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
let mut r = Pin::new(r);
let mut w = Pin::new(w);

loop {
match state {
TransferState::Running(buf) => {
Expand All @@ -218,19 +221,60 @@ where
{
type Output = io::Result<(u64, u64)>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Unpack self into mut refs to each field to avoid borrow check issues.
let CopyBidirectional { a, b, a_to_b, b_to_a } = &mut *self;
let CopyBidirectionalProj {
mut a,
mut b,
a_to_b,
b_to_a,
} = self.project();

let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?;
let b_to_a = transfer_one_direction(cx, b_to_a, &mut *b, &mut *a)?;
let poll_a_to_b = transfer_one_direction(cx, a_to_b, a.as_mut(), b.as_mut())?;
let poll_b_to_a = transfer_one_direction(cx, b_to_a, b.as_mut(), a.as_mut())?;

// It is not a problem if ready! returns early because transfer_one_direction for the
// other direction will keep returning TransferState::Done(count) in future calls to poll
let a_to_b = ready!(a_to_b);
let b_to_a = ready!(b_to_a);

Poll::Ready(Ok((a_to_b, b_to_a)))
// When one direction have already finished, FIN have already sent to the writer.
// But the FIN may have been lost.
// This timer is for purging those "Half Open" connections.
const READ_TIMEOUT_WHEN_ONE_SHUTDOWN: Duration = Duration::from_secs(5);

// Check if one end have already finished
match (poll_a_to_b, poll_b_to_a) {
(Poll::Ready(a_to_b), Poll::Ready(b_to_a)) => Poll::Ready(Ok((a_to_b, b_to_a))),

(Poll::Ready(a_to_b), Poll::Pending) => {
// a -> b finished, then FIN have already sent to b, setting a read timeout on b
if let None = b.timeout() {
b.as_mut().set_timeout_pinned(Some(READ_TIMEOUT_WHEN_ONE_SHUTDOWN));

// poll again to ensure Waker have already registered to the timer
let b_to_a = ready!(transfer_one_direction(cx, b_to_a, b.as_mut(), a.as_mut())?);

Poll::Ready(Ok((a_to_b, b_to_a)))
} else {
Poll::Pending
}
}

(Poll::Pending, Poll::Ready(b_to_a)) => {
// b -> a finished, then FIN have already sent to a, setting a read timeout on a
if let None = a.timeout() {
a.as_mut().set_timeout_pinned(Some(READ_TIMEOUT_WHEN_ONE_SHUTDOWN));

// poll again to ensure Waker have already registered to the timer
let a_to_b = ready!(transfer_one_direction(cx, a_to_b, a.as_mut(), b.as_mut())?);

Poll::Ready(Ok((a_to_b, b_to_a)))
} else {
Poll::Pending
}
}

(Poll::Pending, Poll::Pending) => Poll::Pending,
}
}
}

Expand Down Expand Up @@ -271,8 +315,8 @@ where
P: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
CopyBidirectional {
a: encrypted,
b: plain,
a: TimeoutReader::new(encrypted),
b: TimeoutReader::new(plain),
a_to_b: TransferState::Running(CopyBuffer::new(encrypted_read_buffer_size(method))),
b_to_a: TransferState::Running(CopyBuffer::new(plain_read_buffer_size(method))),
}
Expand Down

0 comments on commit e32c869

Please sign in to comment.