From cf65bc1295e68be1fcbfda166d838680156b5b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Thu, 27 Oct 2022 12:42:01 +0200 Subject: [PATCH] socks5: wait to close buffer (#1702) * socks5: wait to close buffer This is the fix proposed by @simonwicky in https://github.com/nymtech/nym/issues/1701 * socks5: fix typo in patch * socks5: fix tests * socks5: add type for returned data and index * socks5: make closed_at_index an Option * changelog: add note --- CHANGELOG.md | 2 ++ common/socks5/ordered-buffer/src/buffer.rs | 33 +++++++++++++------ common/socks5/ordered-buffer/src/lib.rs | 2 +- .../src/connection_controller.rs | 23 ++++++++----- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48321f57c6..fb5fcc80bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https:// ### Fixed - validator-api, mixnode, gateway should now prefer values in config.toml over mainnet defaults ([#1645]) +- socks5-client: fix bug where in some cases packet reordering could trigger a connection being closed too early ([#1702]) ### Changed @@ -39,6 +40,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https:// [#1669]: https://github.com/nymtech/nym/pull/1669 [#1671]: https://github.com/nymtech/nym/pull/1671 [#1673]: https://github.com/nymtech/nym/pull/1673 +[#1702]: https://github.com/nymtech/nym/pull/1702 ## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2) diff --git a/common/socks5/ordered-buffer/src/buffer.rs b/common/socks5/ordered-buffer/src/buffer.rs index 19d946f8d6..33c4d69142 100644 --- a/common/socks5/ordered-buffer/src/buffer.rs +++ b/common/socks5/ordered-buffer/src/buffer.rs @@ -13,6 +13,13 @@ pub struct OrderedMessageBuffer { messages: HashMap, } +/// Data returned from `OrderedMessageBuffer` on a successful read of gapless ordered data. +#[derive(Debug, PartialEq, Eq)] +pub struct ReadContiguousData { + pub data: Vec, + pub last_index: u64, +} + impl OrderedMessageBuffer { pub fn new() -> OrderedMessageBuffer { OrderedMessageBuffer { @@ -42,7 +49,7 @@ impl OrderedMessageBuffer { /// a read will return the bytes of messages 0, 1, 2. Subsequent reads will /// return `None` until message 3 comes in, at which point 3, 4, and any /// further contiguous messages which have arrived will be returned. - pub fn read(&mut self) -> Option> { + pub fn read(&mut self) -> Option { if !self.messages.contains_key(&self.next_index) { return None; } @@ -66,7 +73,10 @@ impl OrderedMessageBuffer { .collect(); trace!("Returning {} bytes from ordered message buffer", data.len()); - Some(data) + Some(ReadContiguousData { + data, + last_index: index, + }) } } @@ -102,11 +112,11 @@ mod test_chunking_and_reassembling { }; buffer.write(first_message); - let first_read = buffer.read().unwrap(); + let first_read = buffer.read().unwrap().data; assert_eq!(vec![1, 2, 3, 4], first_read); buffer.write(second_message); - let second_read = buffer.read().unwrap(); + let second_read = buffer.read().unwrap().data; assert_eq!(vec![5, 6, 7, 8], second_read); assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty @@ -128,7 +138,7 @@ mod test_chunking_and_reassembling { buffer.write(first_message); buffer.write(second_message); let second_read = buffer.read(); - assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap()); + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data); assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty } @@ -147,8 +157,8 @@ mod test_chunking_and_reassembling { buffer.write(second_message); buffer.write(first_message); - let read = buffer.read(); - assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read.unwrap()); + let read = buffer.read().unwrap().data; + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read); assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty } } @@ -182,7 +192,7 @@ mod test_chunking_and_reassembling { #[test] fn everything_up_to_the_indexing_gap_is_returned() { let mut buffer = setup(); - let ordered_bytes = buffer.read().unwrap(); + let ordered_bytes = buffer.read().unwrap().data; assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes); // we shouldn't get any more from a second attempt if nothing is added @@ -208,7 +218,7 @@ mod test_chunking_and_reassembling { }; buffer.write(two_message); - let more_ordered_bytes = buffer.read().unwrap(); + let more_ordered_bytes = buffer.read().unwrap().data; assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes); // let's add another message @@ -227,7 +237,10 @@ mod test_chunking_and_reassembling { }; buffer.write(four_message); - assert_eq!([4, 4, 4, 4, 5, 5, 5, 5].to_vec(), buffer.read().unwrap()); + assert_eq!( + [4, 4, 4, 4, 5, 5, 5, 5].to_vec(), + buffer.read().unwrap().data + ); // at this point we should again get back nothing if we try a read assert_eq!(None, buffer.read()); diff --git a/common/socks5/ordered-buffer/src/lib.rs b/common/socks5/ordered-buffer/src/lib.rs index 7a3c9e6501..9ed9d9c867 100644 --- a/common/socks5/ordered-buffer/src/lib.rs +++ b/common/socks5/ordered-buffer/src/lib.rs @@ -2,7 +2,7 @@ mod buffer; mod message; mod sender; -pub use buffer::OrderedMessageBuffer; +pub use buffer::{OrderedMessageBuffer, ReadContiguousData}; pub use message::MessageError; pub use message::OrderedMessage; pub use sender::OrderedMessageSender; diff --git a/common/socks5/proxy-helpers/src/connection_controller.rs b/common/socks5/proxy-helpers/src/connection_controller.rs index 0eebf8d994..1a9b13cc29 100644 --- a/common/socks5/proxy-helpers/src/connection_controller.rs +++ b/common/socks5/proxy-helpers/src/connection_controller.rs @@ -4,7 +4,7 @@ use futures::channel::mpsc; use futures::StreamExt; use log::*; -use ordered_buffer::{OrderedMessage, OrderedMessageBuffer}; +use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData}; use socks5_requests::ConnectionId; use std::collections::{HashMap, HashSet}; use task::ShutdownListener; @@ -38,12 +38,13 @@ pub enum ControllerCommand { struct ActiveConnection { is_closed: bool, + closed_at_index: Option, connection_sender: Option, ordered_buffer: OrderedMessageBuffer, } impl ActiveConnection { - fn write_to_buf(&mut self, payload: Vec) { + fn write_to_buf(&mut self, payload: Vec, is_closed: bool) { let ordered_message = match OrderedMessage::try_from_bytes(payload) { Ok(msg) => msg, Err(err) => { @@ -51,10 +52,13 @@ impl ActiveConnection { return; } }; + if is_closed { + self.closed_at_index = Some(ordered_message.index); + } self.ordered_buffer.write(ordered_message); } - fn read_from_buf(&mut self) -> Option> { + fn read_from_buf(&mut self) -> Option { self.ordered_buffer.read() } } @@ -99,6 +103,7 @@ impl Controller { is_closed: false, connection_sender: Some(connection_sender), ordered_buffer: OrderedMessageBuffer::new(), + closed_at_index: None, }; if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) { error!("Received a duplicate 'Connect'!") @@ -127,21 +132,23 @@ impl Controller { fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec, is_closed: bool) { if let Some(active_connection) = self.active_connections.get_mut(&conn_id) { if !payload.is_empty() { - active_connection.write_to_buf(payload); + active_connection.write_to_buf(payload, is_closed); } else if !is_closed { error!("Tried to write an empty message to a not-closing connection. Please let us know if you see this message"); } - // if messages get unordered, make sure we don't lose information about - // remote socket getting closed! - active_connection.is_closed |= is_closed; if let Some(payload) = active_connection.read_from_buf() { + if let Some(closed_at_index) = active_connection.closed_at_index { + if payload.last_index > closed_at_index { + active_connection.is_closed = true; + } + } if let Err(err) = active_connection .connection_sender .as_mut() .unwrap() .unbounded_send(ConnectionMessage { - payload, + payload: payload.data, socket_closed: active_connection.is_closed, }) {