Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

socks5: wait to close buffer #1702

Merged
merged 6 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
### Fixed

- validator-api, mixnode, gateway should now prefer values in config.toml over mainnet defaults ([#1645])
- socks5-client: fix bug where in some cases packet reordering could trigger a connection being closed too early ([#1702])

### Changed

Expand All @@ -39,6 +40,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
[#1669]: https://github.com/nymtech/nym/pull/1669
[#1671]: https://github.com/nymtech/nym/pull/1671
[#1673]: https://github.com/nymtech/nym/pull/1673
[#1702]: https://github.com/nymtech/nym/pull/1702


## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
Expand Down
33 changes: 23 additions & 10 deletions common/socks5/ordered-buffer/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ pub struct OrderedMessageBuffer {
messages: HashMap<u64, OrderedMessage>,
}

/// Data returned from `OrderedMessageBuffer` on a successful read of gapless ordered data.
#[derive(Debug, PartialEq, Eq)]
pub struct ReadContiguousData {
pub data: Vec<u8>,
pub last_index: u64,
}

impl OrderedMessageBuffer {
pub fn new() -> OrderedMessageBuffer {
OrderedMessageBuffer {
Expand Down Expand Up @@ -42,7 +49,7 @@ impl OrderedMessageBuffer {
/// a read will return the bytes of messages 0, 1, 2. Subsequent reads will
/// return `None` until message 3 comes in, at which point 3, 4, and any
/// further contiguous messages which have arrived will be returned.
pub fn read(&mut self) -> Option<Vec<u8>> {
pub fn read(&mut self) -> Option<ReadContiguousData> {
if !self.messages.contains_key(&self.next_index) {
return None;
}
Expand All @@ -66,7 +73,10 @@ impl OrderedMessageBuffer {
.collect();

trace!("Returning {} bytes from ordered message buffer", data.len());
Some(data)
Some(ReadContiguousData {
data,
last_index: index,
})
}
}

Expand Down Expand Up @@ -102,11 +112,11 @@ mod test_chunking_and_reassembling {
};

buffer.write(first_message);
let first_read = buffer.read().unwrap();
let first_read = buffer.read().unwrap().data;
assert_eq!(vec![1, 2, 3, 4], first_read);

buffer.write(second_message);
let second_read = buffer.read().unwrap();
let second_read = buffer.read().unwrap().data;
assert_eq!(vec![5, 6, 7, 8], second_read);

assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
Expand All @@ -128,7 +138,7 @@ mod test_chunking_and_reassembling {
buffer.write(first_message);
buffer.write(second_message);
let second_read = buffer.read();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap());
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data);
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
}

Expand All @@ -147,8 +157,8 @@ mod test_chunking_and_reassembling {

buffer.write(second_message);
buffer.write(first_message);
let read = buffer.read();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read.unwrap());
let read = buffer.read().unwrap().data;
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read);
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
}
}
Expand Down Expand Up @@ -182,7 +192,7 @@ mod test_chunking_and_reassembling {
#[test]
fn everything_up_to_the_indexing_gap_is_returned() {
let mut buffer = setup();
let ordered_bytes = buffer.read().unwrap();
let ordered_bytes = buffer.read().unwrap().data;
assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);

// we shouldn't get any more from a second attempt if nothing is added
Expand All @@ -208,7 +218,7 @@ mod test_chunking_and_reassembling {
};
buffer.write(two_message);

let more_ordered_bytes = buffer.read().unwrap();
let more_ordered_bytes = buffer.read().unwrap().data;
assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);

// let's add another message
Expand All @@ -227,7 +237,10 @@ mod test_chunking_and_reassembling {
};
buffer.write(four_message);

assert_eq!([4, 4, 4, 4, 5, 5, 5, 5].to_vec(), buffer.read().unwrap());
assert_eq!(
[4, 4, 4, 4, 5, 5, 5, 5].to_vec(),
buffer.read().unwrap().data
);

// at this point we should again get back nothing if we try a read
assert_eq!(None, buffer.read());
Expand Down
2 changes: 1 addition & 1 deletion common/socks5/ordered-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod buffer;
mod message;
mod sender;

pub use buffer::OrderedMessageBuffer;
pub use buffer::{OrderedMessageBuffer, ReadContiguousData};
pub use message::MessageError;
pub use message::OrderedMessage;
pub use sender::OrderedMessageSender;
23 changes: 15 additions & 8 deletions common/socks5/proxy-helpers/src/connection_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer};
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
use socks5_requests::ConnectionId;
use std::collections::{HashMap, HashSet};
use task::ShutdownListener;
Expand Down Expand Up @@ -38,23 +38,27 @@ pub enum ControllerCommand {

struct ActiveConnection {
is_closed: bool,
closed_at_index: Option<u64>,
connection_sender: Option<ConnectionSender>,
ordered_buffer: OrderedMessageBuffer,
}

impl ActiveConnection {
fn write_to_buf(&mut self, payload: Vec<u8>) {
fn write_to_buf(&mut self, payload: Vec<u8>, is_closed: bool) {
let ordered_message = match OrderedMessage::try_from_bytes(payload) {
Ok(msg) => msg,
Err(err) => {
error!("Malformed ordered message - {:?}", err);
return;
}
};
if is_closed {
self.closed_at_index = Some(ordered_message.index);
}
self.ordered_buffer.write(ordered_message);
}

fn read_from_buf(&mut self) -> Option<Vec<u8>> {
fn read_from_buf(&mut self) -> Option<ReadContiguousData> {
self.ordered_buffer.read()
}
}
Expand Down Expand Up @@ -99,6 +103,7 @@ impl Controller {
is_closed: false,
connection_sender: Some(connection_sender),
ordered_buffer: OrderedMessageBuffer::new(),
closed_at_index: None,
};
if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) {
error!("Received a duplicate 'Connect'!")
Expand Down Expand Up @@ -127,21 +132,23 @@ impl Controller {
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
if !payload.is_empty() {
active_connection.write_to_buf(payload);
active_connection.write_to_buf(payload, is_closed);
} else if !is_closed {
error!("Tried to write an empty message to a not-closing connection. Please let us know if you see this message");
}
// if messages get unordered, make sure we don't lose information about
// remote socket getting closed!
active_connection.is_closed |= is_closed;

if let Some(payload) = active_connection.read_from_buf() {
if let Some(closed_at_index) = active_connection.closed_at_index {
if payload.last_index > closed_at_index {
octol marked this conversation as resolved.
Show resolved Hide resolved
active_connection.is_closed = true;
}
}
if let Err(err) = active_connection
.connection_sender
.as_mut()
.unwrap()
.unbounded_send(ConnectionMessage {
payload,
payload: payload.data,
socket_closed: active_connection.is_closed,
})
{
Expand Down