Skip to content

Commit

Permalink
socks5: wait to close buffer (#1702)
Browse files Browse the repository at this point in the history
* socks5: wait to close buffer

This is the fix proposed by @simonwicky in
#1701

* socks5: fix typo in patch

* socks5: fix tests

* socks5: add type for returned data and index

* socks5: make closed_at_index an Option

* changelog: add note
  • Loading branch information
octol committed Oct 27, 2022
1 parent 2d5f851 commit cf65bc1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
### Fixed

- validator-api, mixnode, gateway should now prefer values in config.toml over mainnet defaults ([#1645])
- socks5-client: fix bug where in some cases packet reordering could trigger a connection being closed too early ([#1702])

### Changed

Expand All @@ -39,6 +40,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
[#1669]: https://github.com/nymtech/nym/pull/1669
[#1671]: https://github.com/nymtech/nym/pull/1671
[#1673]: https://github.com/nymtech/nym/pull/1673
[#1702]: https://github.com/nymtech/nym/pull/1702


## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
Expand Down
33 changes: 23 additions & 10 deletions common/socks5/ordered-buffer/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ pub struct OrderedMessageBuffer {
messages: HashMap<u64, OrderedMessage>,
}

/// Data returned from `OrderedMessageBuffer` on a successful read of gapless ordered data.
#[derive(Debug, PartialEq, Eq)]
pub struct ReadContiguousData {
pub data: Vec<u8>,
pub last_index: u64,
}

impl OrderedMessageBuffer {
pub fn new() -> OrderedMessageBuffer {
OrderedMessageBuffer {
Expand Down Expand Up @@ -42,7 +49,7 @@ impl OrderedMessageBuffer {
/// a read will return the bytes of messages 0, 1, 2. Subsequent reads will
/// return `None` until message 3 comes in, at which point 3, 4, and any
/// further contiguous messages which have arrived will be returned.
pub fn read(&mut self) -> Option<Vec<u8>> {
pub fn read(&mut self) -> Option<ReadContiguousData> {
if !self.messages.contains_key(&self.next_index) {
return None;
}
Expand All @@ -66,7 +73,10 @@ impl OrderedMessageBuffer {
.collect();

trace!("Returning {} bytes from ordered message buffer", data.len());
Some(data)
Some(ReadContiguousData {
data,
last_index: index,
})
}
}

Expand Down Expand Up @@ -102,11 +112,11 @@ mod test_chunking_and_reassembling {
};

buffer.write(first_message);
let first_read = buffer.read().unwrap();
let first_read = buffer.read().unwrap().data;
assert_eq!(vec![1, 2, 3, 4], first_read);

buffer.write(second_message);
let second_read = buffer.read().unwrap();
let second_read = buffer.read().unwrap().data;
assert_eq!(vec![5, 6, 7, 8], second_read);

assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
Expand All @@ -128,7 +138,7 @@ mod test_chunking_and_reassembling {
buffer.write(first_message);
buffer.write(second_message);
let second_read = buffer.read();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap());
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data);
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
}

Expand All @@ -147,8 +157,8 @@ mod test_chunking_and_reassembling {

buffer.write(second_message);
buffer.write(first_message);
let read = buffer.read();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read.unwrap());
let read = buffer.read().unwrap().data;
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read);
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
}
}
Expand Down Expand Up @@ -182,7 +192,7 @@ mod test_chunking_and_reassembling {
#[test]
fn everything_up_to_the_indexing_gap_is_returned() {
let mut buffer = setup();
let ordered_bytes = buffer.read().unwrap();
let ordered_bytes = buffer.read().unwrap().data;
assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);

// we shouldn't get any more from a second attempt if nothing is added
Expand All @@ -208,7 +218,7 @@ mod test_chunking_and_reassembling {
};
buffer.write(two_message);

let more_ordered_bytes = buffer.read().unwrap();
let more_ordered_bytes = buffer.read().unwrap().data;
assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);

// let's add another message
Expand All @@ -227,7 +237,10 @@ mod test_chunking_and_reassembling {
};
buffer.write(four_message);

assert_eq!([4, 4, 4, 4, 5, 5, 5, 5].to_vec(), buffer.read().unwrap());
assert_eq!(
[4, 4, 4, 4, 5, 5, 5, 5].to_vec(),
buffer.read().unwrap().data
);

// at this point we should again get back nothing if we try a read
assert_eq!(None, buffer.read());
Expand Down
2 changes: 1 addition & 1 deletion common/socks5/ordered-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod buffer;
mod message;
mod sender;

pub use buffer::OrderedMessageBuffer;
pub use buffer::{OrderedMessageBuffer, ReadContiguousData};
pub use message::MessageError;
pub use message::OrderedMessage;
pub use sender::OrderedMessageSender;
23 changes: 15 additions & 8 deletions common/socks5/proxy-helpers/src/connection_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer};
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
use socks5_requests::ConnectionId;
use std::collections::{HashMap, HashSet};
use task::ShutdownListener;
Expand Down Expand Up @@ -38,23 +38,27 @@ pub enum ControllerCommand {

struct ActiveConnection {
is_closed: bool,
closed_at_index: Option<u64>,
connection_sender: Option<ConnectionSender>,
ordered_buffer: OrderedMessageBuffer,
}

impl ActiveConnection {
fn write_to_buf(&mut self, payload: Vec<u8>) {
fn write_to_buf(&mut self, payload: Vec<u8>, is_closed: bool) {
let ordered_message = match OrderedMessage::try_from_bytes(payload) {
Ok(msg) => msg,
Err(err) => {
error!("Malformed ordered message - {:?}", err);
return;
}
};
if is_closed {
self.closed_at_index = Some(ordered_message.index);
}
self.ordered_buffer.write(ordered_message);
}

fn read_from_buf(&mut self) -> Option<Vec<u8>> {
fn read_from_buf(&mut self) -> Option<ReadContiguousData> {
self.ordered_buffer.read()
}
}
Expand Down Expand Up @@ -99,6 +103,7 @@ impl Controller {
is_closed: false,
connection_sender: Some(connection_sender),
ordered_buffer: OrderedMessageBuffer::new(),
closed_at_index: None,
};
if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) {
error!("Received a duplicate 'Connect'!")
Expand Down Expand Up @@ -127,21 +132,23 @@ impl Controller {
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
if !payload.is_empty() {
active_connection.write_to_buf(payload);
active_connection.write_to_buf(payload, is_closed);
} else if !is_closed {
error!("Tried to write an empty message to a not-closing connection. Please let us know if you see this message");
}
// if messages get unordered, make sure we don't lose information about
// remote socket getting closed!
active_connection.is_closed |= is_closed;

if let Some(payload) = active_connection.read_from_buf() {
if let Some(closed_at_index) = active_connection.closed_at_index {
if payload.last_index > closed_at_index {
active_connection.is_closed = true;
}
}
if let Err(err) = active_connection
.connection_sender
.as_mut()
.unwrap()
.unbounded_send(ConnectionMessage {
payload,
payload: payload.data,
socket_closed: active_connection.is_closed,
})
{
Expand Down

0 comments on commit cf65bc1

Please sign in to comment.