Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion p2p/src/network/yamux/p2p_network_yamux_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub enum P2pNetworkYamuxAction {
#[action_event(level = trace)]
IncomingFrame {
addr: ConnectionAddr,
frame: YamuxFrame,
},
#[action_event(level = trace)]
OutgoingFrame {
Expand Down
158 changes: 68 additions & 90 deletions p2p/src/network/yamux/p2p_network_yamux_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl P2pNetworkYamuxState {
}
}
1 => {
let difference = i32::from_be_bytes(b);
let difference = u32::from_be_bytes(b);
let frame = YamuxFrame {
flags,
stream_id,
Expand All @@ -100,7 +100,7 @@ impl P2pNetworkYamuxState {
continue;
}
2 => {
let opaque = i32::from_be_bytes(b);
let opaque = u32::from_be_bytes(b);
let frame = YamuxFrame {
flags,
stream_id,
Expand Down Expand Up @@ -143,11 +143,12 @@ impl P2pNetworkYamuxState {

yamux_state.buffer = yamux_state.buffer[offset..].to_vec();

let incoming_data = yamux_state.incoming.clone();
let frame_count = yamux_state.incoming.len();
let dispatcher = state_context.into_dispatcher();
incoming_data.into_iter().for_each(|frame| {
dispatcher.push(P2pNetworkYamuxAction::IncomingFrame { addr, frame })
});

for _ in 0..frame_count {
dispatcher.push(P2pNetworkYamuxAction::IncomingFrame { addr })
}

Ok(())
}
Expand Down Expand Up @@ -181,69 +182,74 @@ impl P2pNetworkYamuxState {

Ok(())
}
P2pNetworkYamuxAction::IncomingFrame { addr, frame } => {
P2pNetworkYamuxAction::IncomingFrame { addr } => {
let mut pending_outgoing = VecDeque::default();
if let Some(frame) = yamux_state.incoming.pop_front() {
if frame.flags.contains(YamuxFlags::SYN) {
yamux_state
.streams
.insert(frame.stream_id, YamuxStreamState::incoming());
let Some(frame) = yamux_state.incoming.pop_front() else {
bug_condition!(
"Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`"
);
return Ok(());
};

if frame.flags.contains(YamuxFlags::SYN) {
yamux_state
.streams
.insert(frame.stream_id, YamuxStreamState::incoming());

if frame.stream_id != 0 {
connection_state.streams.insert(
frame.stream_id,
P2pNetworkStreamState::new_incoming(meta.time()),
);
if frame.stream_id != 0 {
connection_state.streams.insert(
frame.stream_id,
P2pNetworkStreamState::new_incoming(meta.time()),
);
}
}
if frame.flags.contains(YamuxFlags::ACK) {
yamux_state
.streams
.entry(frame.stream_id)
.or_default()
.established = true;
}

match &frame.inner {
YamuxFrameInner::Data(data) => {
if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
// must not underflow
// TODO: check it and disconnect peer that violates flow rules
stream.window_ours = stream.window_ours.wrapping_sub(data.len() as u32);
}
}
if frame.flags.contains(YamuxFlags::ACK) {
yamux_state
YamuxFrameInner::WindowUpdate { difference } => {
let stream = yamux_state
.streams
.entry(frame.stream_id)
.or_default()
.established = true;
}

match frame.inner {
YamuxFrameInner::Data(data) => {
if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
// must not underflow
// TODO: check it and disconnect peer that violates flow rules
stream.window_ours =
stream.window_ours.wrapping_sub(data.len() as u32);
}
}
YamuxFrameInner::WindowUpdate { difference } => {
let stream = yamux_state
.streams
.entry(frame.stream_id)
.or_insert_with(YamuxStreamState::incoming);
stream.update_window(false, difference);
if difference > 0 {
// have some fresh space in the window
// try send as many frames as can
let mut window = stream.window_theirs;
while let Some(mut frame) = stream.pending.pop_front() {
let len = frame.len() as u32;
if let Some(new_window) = window.checked_sub(len) {
pending_outgoing.push_back(frame);
window = new_window;
} else {
if let Some(remaining) =
frame.split_at((len - window) as usize)
{
stream.pending.push_front(remaining);
}
pending_outgoing.push_back(frame);

break;
.or_insert_with(YamuxStreamState::incoming);

stream.window_theirs = stream.window_theirs.saturating_add(*difference);

if *difference > 0 {
// have some fresh space in the window
// try send as many frames as can
let mut window = stream.window_theirs;
while let Some(mut frame) = stream.pending.pop_front() {
let len = frame.len() as u32;
if let Some(new_window) = window.checked_sub(len) {
pending_outgoing.push_back(frame);
window = new_window;
} else {
if let Some(remaining) = frame.split_at((len - window) as usize)
{
stream.pending.push_front(remaining);
}
pending_outgoing.push_back(frame);

break;
}
}
}
YamuxFrameInner::Ping { .. } => {}
YamuxFrameInner::GoAway(res) => yamux_state.set_res(res),
}
YamuxFrameInner::Ping { .. } => {}
YamuxFrameInner::GoAway(res) => yamux_state.set_res(*res),
}

let (dispatcher, state) = state_context.into_dispatcher_and_state();
Expand Down Expand Up @@ -344,16 +350,9 @@ impl P2pNetworkYamuxState {
});
}
}
YamuxFrameInner::WindowUpdate { difference } => {
if *difference < 0 {
let error =
P2pNetworkConnectionError::YamuxBadWindowUpdate(frame.stream_id);
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
} else {
while let Some(frame) = pending_outgoing.pop_front() {
dispatcher
.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
}
YamuxFrameInner::WindowUpdate { .. } => {
while let Some(frame) = pending_outgoing.pop_front() {
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
}
}
_ => {}
Expand Down Expand Up @@ -408,7 +407,7 @@ impl P2pNetworkYamuxState {
// }
}
YamuxFrameInner::WindowUpdate { difference } => {
stream.update_window(true, *difference);
stream.window_ours = stream.window_ours.saturating_add(*difference);
}
_ => {}
}
Expand Down Expand Up @@ -475,24 +474,3 @@ impl P2pNetworkYamuxState {
}
}
}

impl YamuxStreamState {
pub fn update_window(&mut self, ours: bool, difference: i32) {
let window = if ours {
&mut self.window_ours
} else {
&mut self.window_theirs
};
if difference < 0 {
let decreasing = (-difference) as u32;
if *window < decreasing {
*window = 0;
} else {
*window = (*window).wrapping_sub(decreasing);
}
} else {
let increasing = difference as u32;
*window = (*window).wrapping_add(increasing);
}
}
}
8 changes: 4 additions & 4 deletions p2p/src/network/yamux/p2p_network_yamux_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ bitflags::bitflags! {
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct YamuxPing {
pub stream_id: StreamId,
pub opaque: i32,
pub opaque: u32,
pub response: bool,
}

Expand Down Expand Up @@ -227,12 +227,12 @@ impl YamuxFrame {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum YamuxFrameInner {
Data(Data),
WindowUpdate { difference: i32 },
Ping { opaque: i32 },
WindowUpdate { difference: u32 },
Ping { opaque: u32 },
GoAway(Result<(), YamuxSessionError>),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum YamuxSessionError {
Protocol,
Internal,
Expand Down
Loading