Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: use unbounded channel for Events
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam authored and bochaco committed Sep 24, 2020
1 parent df471b5 commit fb5a3aa
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 76 deletions.
6 changes: 3 additions & 3 deletions src/node/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use xor_name::XorName;

/// Stream of routing node events
pub struct EventStream {
events_rx: mpsc::Receiver<Event>,
events_rx: mpsc::UnboundedReceiver<Event>,
}

impl EventStream {
Expand All @@ -27,7 +27,7 @@ impl EventStream {
xorname: XorName,
incoming_conns: IncomingConnections,
timer_rx: mpsc::UnboundedReceiver<u64>,
events_rx: mpsc::Receiver<Event>,
events_rx: mpsc::UnboundedReceiver<Event>,
) -> Result<Self> {
Self::spawn_connections_handler(Arc::clone(&stage), incoming_conns, xorname);
Self::spawn_timer_handler(stage, timer_rx);
Expand Down Expand Up @@ -100,7 +100,7 @@ impl EventStream {
recv,
};

stage.lock().await.send_event(event).await;
stage.lock().await.send_event(event);
}
}
}
Expand Down
88 changes: 37 additions & 51 deletions src/node/stage/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,11 @@ impl Approved {
Ok(None)
}
Variant::UserMessage(content) => {
self.node_info
.send_event(Event::MessageReceived {
content: content.clone(),
src: msg.src().src_location(),
dst: *msg.dst(),
})
.await;
self.node_info.send_event(Event::MessageReceived {
content: content.clone(),
src: msg.src().src_location(),
dst: *msg.dst(),
});
Ok(None)
}
Variant::BouncedUntrustedMessage(message) => {
Expand Down Expand Up @@ -756,11 +754,9 @@ impl Approved {
);

if self.relocate_promise.is_none() {
self.node_info
.send_event(Event::RelocationStarted {
previous_name: *self.node_info.full_id.public_id().name(),
})
.await;
self.node_info.send_event(Event::RelocationStarted {
previous_name: *self.node_info.full_id.public_id().name(),
});
}

let conn_infos: Vec<_> = self
Expand All @@ -786,11 +782,9 @@ impl Approved {
// Keep it around even if we are not elder anymore, in case we need to resend it.
if self.relocate_promise.is_none() {
self.relocate_promise = Some(msg_bytes.clone());
self.node_info
.send_event(Event::RelocationStarted {
previous_name: *self.node_info.full_id.public_id().name(),
})
.await;
self.node_info.send_event(Event::RelocationStarted {
previous_name: *self.node_info.full_id.public_id().name(),
});
} else {
trace!("ignore RelocatePromise - already have one");
}
Expand Down Expand Up @@ -1268,20 +1262,16 @@ impl Approved {
self.print_network_stats();

if let Some(previous_name) = previous_name {
self.node_info
.send_event(Event::MemberJoined {
name: *p2p_node.name(),
previous_name,
age,
})
.await;
self.node_info.send_event(Event::MemberJoined {
name: *p2p_node.name(),
previous_name,
age,
});
} else {
self.node_info
.send_event(Event::InfantJoined {
name: *p2p_node.name(),
age,
})
.await;
self.node_info.send_event(Event::InfantJoined {
name: *p2p_node.name(),
age,
});
}

self.promote_and_demote_elders().await
Expand All @@ -1301,12 +1291,10 @@ impl Approved {

self.increment_ages(p2p_node.name(), &signature).await?;

self.node_info
.send_event(Event::MemberLeft {
name: *p2p_node.name(),
age,
})
.await;
self.node_info.send_event(Event::MemberLeft {
name: *p2p_node.name(),
age,
});

self.promote_and_demote_elders().await
}
Expand Down Expand Up @@ -1504,24 +1492,22 @@ impl Approved {
self.send_sync(self.shared_state.clone()).await?;
}

self.node_info
.send_event(Event::EldersChanged {
prefix: *self.shared_state.our_prefix(),
key: *self.shared_state.our_history.last_key(),
elders: self
.shared_state
.our_info()
.elders
.keys()
.copied()
.collect(),
})
.await;
self.node_info.send_event(Event::EldersChanged {
prefix: *self.shared_state.our_prefix(),
key: *self.shared_state.our_history.last_key(),
elders: self
.shared_state
.our_info()
.elders
.keys()
.copied()
.collect(),
});
}

if !old_is_elder && new_is_elder {
info!("Promoted to elder");
self.node_info.send_event(Event::PromotedToElder).await;
self.node_info.send_event(Event::PromotedToElder);

// Ping all members to detect recent lost nodes for which the section might need
// our Offline vote.
Expand All @@ -1540,7 +1526,7 @@ impl Approved {
info!("Demoted");
self.shared_state.demote();
self.section_keys_provider = SectionKeysProvider::new(None);
self.node_info.send_event(Event::Demoted).await;
self.node_info.send_event(Event::Demoted);
}

if !new_is_elder {
Expand Down
4 changes: 1 addition & 3 deletions src/node/stage/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ impl Joining {
self.timer.clone(),
)?;

self.node_info
.send_event(Event::Connected(connect_type))
.await;
self.node_info.send_event(Event::Connected(connect_type));

Ok(Some(new_stage))
}
Expand Down
30 changes: 11 additions & 19 deletions src/node/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ use xor_name::{Prefix, XorName};
#[cfg(feature = "mock")]
pub use self::{bootstrapping::BOOTSTRAP_TIMEOUT, joining::JOIN_TIMEOUT};

// Maximum number of events to be buffered internally, when the buffer is full
// no new events can be generated by this crate
// TODO: if external connections or messages are arriving when
// the buffer is full, they need to be rejected.
const MAX_EVENTS_BUFFERED: usize = 1024;

// Type to hold the various states a node goes through during its lifetime.
#[allow(clippy::large_enum_variant)]
enum State {
Expand All @@ -54,7 +48,7 @@ enum State {
pub(crate) struct NodeInfo {
pub full_id: FullId,
pub network_params: NetworkParams,
events_tx: mpsc::Sender<Event>,
events_tx: mpsc::UnboundedSender<Event>,
}

impl NodeInfo {
Expand All @@ -67,8 +61,8 @@ impl NodeInfo {
}

/// Send provided Event to the user which shall receive it through the EventStream
pub async fn send_event(&mut self, event: Event) {
if let Err(err) = self.events_tx.send(event).await {
pub fn send_event(&mut self, event: Event) {
if let Err(err) = self.events_tx.send(event) {
trace!("Error reporting new Event: {:?}", err);
}
}
Expand Down Expand Up @@ -100,7 +94,7 @@ impl Stage {
Self,
IncomingConnections,
mpsc::UnboundedReceiver<u64>,
mpsc::Receiver<Event>,
mpsc::UnboundedReceiver<Event>,
)> {
let comm = Comm::new(transport_config).await?;
let connection_info = comm.our_connection_info()?;
Expand All @@ -123,17 +117,15 @@ impl Stage {
secret_key_share,
};

let (events_tx, events_rx) = mpsc::channel::<Event>(MAX_EVENTS_BUFFERED);
let (events_tx, events_rx) = mpsc::unbounded_channel();
let mut node_info = NodeInfo {
full_id,
network_params,
events_tx,
};

node_info
.send_event(Event::Connected(Connected::First))
.await;
node_info.send_event(Event::PromotedToElder).await;
node_info.send_event(Event::Connected(Connected::First));
node_info.send_event(Event::PromotedToElder);

let (timer_tx, timer_rx) = mpsc::unbounded_channel();
let timer = Timer::new(timer_tx);
Expand All @@ -159,11 +151,11 @@ impl Stage {
Self,
IncomingConnections,
mpsc::UnboundedReceiver<u64>,
mpsc::Receiver<Event>,
mpsc::UnboundedReceiver<Event>,
)> {
let (comm, addr) = Comm::from_bootstrapping(transport_config).await?;

let (events_tx, events_rx) = mpsc::channel::<Event>(MAX_EVENTS_BUFFERED);
let (events_tx, events_rx) = mpsc::unbounded_channel();
let node_info = NodeInfo {
full_id,
network_params,
Expand All @@ -189,8 +181,8 @@ impl Stage {
}

/// Send provided Event to the user which shall receive it through the EventStream
pub async fn send_event(&mut self, event: Event) {
self.node_info_mut().send_event(event).await;
pub fn send_event(&mut self, event: Event) {
self.node_info_mut().send_event(event);
}

/// Returns current FullId of the node
Expand Down

0 comments on commit fb5a3aa

Please sign in to comment.