Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(node): cache Connections to nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Sep 24, 2020
1 parent 8828a68 commit a78c305
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 37 deletions.
33 changes: 28 additions & 5 deletions src/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ use crate::{
messages::{Message, Variant},
};
use bytes::Bytes;
use futures::lock::Mutex;
use hex_fmt::HexFmt;
use lru_time_cache::LruCache;
use qp2p::{Config, Connection, Endpoint, IncomingConnections, QuicP2p};
use std::{boxed::Box, net::SocketAddr, sync::Arc};

// Number of Connections to maintain in the cache
const CONNECTIONS_CACHE_SIZE: usize = 1024;

// Communication component of the node to interact with other nodes.
#[derive(Clone)]
pub(crate) struct Comm {
quic_p2p: Arc<Box<QuicP2p>>,
endpoint: Arc<Box<Endpoint>>,
node_conns: Arc<Mutex<LruCache<SocketAddr, Connection>>>,
}

impl Comm {
Expand All @@ -36,7 +42,13 @@ impl Comm {
// the incoming messages from other nodes.
let endpoint = Arc::new(Box::new(quic_p2p.new_endpoint()?));

Ok(Self { quic_p2p, endpoint })
let node_conns = Arc::new(Mutex::new(LruCache::with_capacity(CONNECTIONS_CACHE_SIZE)));

Ok(Self {
quic_p2p,
endpoint,
node_conns,
})
}

pub async fn from_bootstrapping(transport_config: Config) -> Result<(Self, Connection)> {
Expand All @@ -50,8 +62,16 @@ impl Comm {

let quic_p2p = Arc::new(Box::new(quic_p2p));
let endpoint = Arc::new(Box::new(endpoint));

Ok((Self { quic_p2p, endpoint }, connection))
let node_conns = Arc::new(Mutex::new(LruCache::with_capacity(CONNECTIONS_CACHE_SIZE)));

Ok((
Self {
quic_p2p,
endpoint,
node_conns,
},
connection,
))
}

/// Starts listening for events returning a stream where to read them from.
Expand Down Expand Up @@ -103,8 +123,11 @@ impl Comm {
msg: Bytes,
) -> Result<()> {
trace!("Sending message to target {:?}", recipient);
// TODO: can we cache the Connections to nodes to make this more efficient??
let conn = self.endpoint.connect_to(recipient).await?;
// Cache the Connection to the node or obtain the already cached one
let mut node_conns = self.node_conns.lock().await;
let conn = node_conns
.entry(*recipient)
.or_insert(self.endpoint.connect_to(recipient).await?);
conn.send_uni(msg).await.map_err(Error::Network)
}

Expand Down
14 changes: 8 additions & 6 deletions src/node/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
error::Result,
event::{Connected, Event},
location::DstLocation,
messages::Message,
Expand All @@ -20,8 +21,9 @@ use tokio::sync::mpsc;
use xor_name::XorName;

// Maximum number of events to be buffered internally, when the buffer is full
// no new events can be generated by this crate, even if external messages
// coming from other peers need to be rejected.
// no new events can be generated by this crate
// TODO: if external connections or messages are arriving when
// the buffer is full, they need to be rejected.
const MAX_EVENTS_BUFFERED: usize = 1024;

/// Stream of routing node events
Expand All @@ -30,16 +32,16 @@ pub struct EventStream {
}

impl EventStream {
pub(crate) fn new(
pub(crate) async fn new(
stage: Arc<Mutex<Stage>>,
incoming_conns: IncomingConnections,
xorname: XorName,
is_genesis: bool,
) -> Self {
) -> Result<Self> {
let incoming_conns = stage.lock().await.listen_events()?;
let (events_tx, events_rx) = mpsc::channel::<Event>(MAX_EVENTS_BUFFERED);
Self::spawn_connections_handler(stage, events_tx, incoming_conns, xorname, is_genesis);

Self { events_rx }
Ok(Self { events_rx })
}

/// Returns next event
Expand Down
45 changes: 19 additions & 26 deletions src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl Default for NodeConfig {
/// role, and can be any [`SrcLocation`](enum.SrcLocation.html).
pub struct Node {
stage: Arc<Mutex<Stage>>,
full_id: FullId,
is_genesis: bool,
}

Expand All @@ -93,7 +92,7 @@ impl Node {
let is_genesis = config.first;

let stage = if is_genesis {
match Stage::first_node(transport_config, full_id.clone(), network_params, rng).await {
match Stage::first_node(transport_config, full_id, network_params, rng).await {
Ok(stage) => {
info!("{} Started a new network as a seed node.", node_name);
stage
Expand All @@ -106,25 +105,23 @@ impl Node {
}
} else {
info!("{} Bootstrapping a new node.", node_name);
Stage::bootstrap(transport_config, full_id.clone(), network_params, rng).await?
Stage::bootstrap(transport_config, full_id, network_params, rng).await?
};

Ok(Self {
stage: Arc::new(Mutex::new(stage)),
full_id,
is_genesis,
})
}

/// Obtain an Event stream to read all reported events from
pub async fn listen_events(&self) -> Result<EventStream> {
let incoming_conns = self.stage.lock().await.listen_events()?;
Ok(EventStream::new(
EventStream::new(
Arc::clone(&self.stage),
incoming_conns,
*self.full_id.public_id().name(),
*self.id().await.name(),
self.is_genesis,
))
)
.await
}

/// Is this the genesis node or not
Expand All @@ -133,13 +130,13 @@ impl Node {
}

/// Returns the `PublicId` of this node.
pub fn id(&self) -> &PublicId {
self.full_id.public_id()
pub async fn id(&self) -> PublicId {
self.stage.lock().await.full_id().public_id().clone()
}

/// The name of this node.
pub fn name(&self) -> &XorName {
self.id().name()
pub async fn name(&self) -> XorName {
*self.id().await.name()
}

/// Returns connection info of this node.
Expand All @@ -164,19 +161,15 @@ impl Node {

/// Returns whether the node is Elder.
pub async fn is_elder(&self) -> bool {
self.stage
.lock()
.await
.approved()
.map(|stage| {
stage
.shared_state
.sections
.our()
.elders
.contains_key(self.name())
})
.unwrap_or(false)
match self.stage.lock().await.approved() {
None => false,
Some(stage) => stage
.shared_state
.sections
.our()
.elders
.contains_key(&self.name().await),
}
}

/// Returns the information of all the current section elders.
Expand Down
6 changes: 6 additions & 0 deletions src/node/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,17 @@ impl Stage {
}
}

/// Returns current FullId of the node
pub fn full_id(&self) -> &FullId {
&self.full_id
}

/// Returns connection info of this node.
pub fn our_connection_info(&mut self) -> Result<SocketAddr> {
self.comm.our_connection_info()
}

/// Resturns a stream to obtain incoming connections from
pub fn listen_events(&mut self) -> Result<IncomingConnections> {
self.comm.listen_events()
}
Expand Down

0 comments on commit a78c305

Please sign in to comment.