Skip to content

Commit

Permalink
refactor(node)!: use NodeBuilder to construct and run node
Browse files Browse the repository at this point in the history
- also only expose `RunningNode` to the public API
  • Loading branch information
RolandSherwin authored and joshuef committed Oct 20, 2023
1 parent 8e99318 commit 421ddb3
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 157 deletions.
42 changes: 23 additions & 19 deletions sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ mod rpc;

use clap::Parser;
use eyre::{eyre, Error, Result};
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use libp2p::{identity::Keypair, PeerId};
#[cfg(feature = "metrics")]
use sn_logging::metrics::init_metrics;
use sn_logging::{parse_log_format, LogFormat, LogOutputDest};
use sn_node::{Marker, Node, NodeEvent, NodeEventsReceiver};
use sn_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use sn_peers_acquisition::{parse_peers_args, PeersArgs};
use std::{
env,
Expand Down Expand Up @@ -126,6 +126,7 @@ struct Opt {
#[clap(long)]
local: bool,

#[cfg(feature = "open-metrics")]
/// Specify the port to start the OpenMetrics Server in.
///
/// The special value `0` will cause the OS to assign a random port.
Expand Down Expand Up @@ -171,20 +172,27 @@ fn main() -> Result<()> {

info!("Node started with initial_peers {bootstrap_peers:?}");

// Create a tokio runtime per `start_node` attempt, this ensures
// Create a tokio runtime per `run_node` attempt, this ensures
// any spawned tasks are closed before we would attempt to run
// another process with these args.
#[cfg(feature = "metrics")]
rt.spawn(init_metrics(std::process::id()));
rt.block_on(start_node(
keypair,
node_socket_addr,
bootstrap_peers,
opt.rpc,
opt.local,
&log_output_dest,
root_dir,
))?;
rt.block_on(async move {
let node_builder = NodeBuilder::new(
keypair,
node_socket_addr,
bootstrap_peers,
opt.local,
root_dir,
);
#[cfg(feature = "open-metrics")]
let mut node_builder = node_builder;
#[cfg(feature = "open-metrics")]
node_builder.metrics_server_port(opt.metrics_server_port);
run_node(node_builder, opt.rpc, &log_output_dest).await?;

Ok::<(), eyre::Report>(())
})?;

// actively shut down the runtime
rt.shutdown_timeout(Duration::from_secs(2));
Expand All @@ -199,19 +207,15 @@ fn main() -> Result<()> {
}

/// Start a node with the given configuration.
async fn start_node(
keypair: Keypair,
node_socket_addr: SocketAddr,
peers: Vec<Multiaddr>,
async fn run_node(
node_builder: NodeBuilder,
rpc: Option<SocketAddr>,
local: bool,
log_output_dest: &str,
root_dir: PathBuf,
) -> Result<()> {
let started_instant = std::time::Instant::now();

info!("Starting node ...");
let running_node = Node::run(keypair, node_socket_addr, peers, local, root_dir)?;
let running_node = node_builder.build_and_run()?;

println!(
"
Expand Down
81 changes: 65 additions & 16 deletions sn_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,37 +42,86 @@
#[macro_use]
extern crate tracing;

mod api;
mod error;
mod event;
mod log_markers;
#[cfg(feature = "open-metrics")]
mod metrics;
mod node;
mod put_validation;
mod replication;
mod spends;
mod storecost;

pub use self::{
api::RunningNode,
event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver},
log_markers::Marker,
node::NodeBuilder,
};

use libp2p::Multiaddr;
use sn_networking::Network;
use sn_transfers::MainPubkey;
use crate::error::Result;
use libp2p::PeerId;
use sn_networking::{Network, SwarmLocalState};
use sn_protocol::NetworkAddress;
use std::{collections::HashSet, path::PathBuf};

/// `Node` represents a single node in the distributed network. It handles
/// network events, processes incoming requests, interacts with the data
/// storage, and broadcasts node-related events.
/// Once a node is started and running, the user obtains
/// a `NodeRunning` object which can be used to interact with it.
#[derive(Clone)]
pub struct Node {
pub struct RunningNode {
network: Network,
events_channel: NodeEventsChannel,
/// Peers that are dialed at startup of node.
initial_peers: Vec<Multiaddr>,
reward_address: MainPubkey,
#[cfg(feature = "open-metrics")]
node_metrics: metrics::NodeMetrics,
node_events_channel: NodeEventsChannel,
}

impl RunningNode {
/// Returns this node's `PeerId`
pub fn peer_id(&self) -> PeerId {
self.network.peer_id
}

/// Returns the root directory path for the node.
///
/// This will either be a value defined by the user, or a default location, plus the peer ID
/// appended. The default location is platform specific:
/// - Linux: $HOME/.local/share/safe/node/<peer-id>
/// - macOS: $HOME/Library/Application Support/safe/node/<peer-id>
/// - Windows: C:\Users\<username>\AppData\Roaming\safe\node\<peer-id>
#[allow(rustdoc::invalid_html_tags)]
pub fn root_dir_path(&self) -> PathBuf {
self.network.root_dir_path.clone()
}

/// Returns a `SwarmLocalState` with some information obtained from swarm's local state.
pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
let state = self.network.get_swarm_local_state().await?;
Ok(state)
}

/// Returns the node events channel where to subscribe to receive `NodeEvent`s
pub fn node_events_channel(&self) -> &NodeEventsChannel {
&self.node_events_channel
}

/// Returns the list of all the RecordKeys held by the node
pub async fn get_all_record_addresses(&self) -> Result<HashSet<NetworkAddress>> {
let addresses = self.network.get_all_local_record_addresses().await?;
Ok(addresses)
}

/// Subscribe to given gossipsub topic
pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> {
self.network.subscribe_to_topic(topic_id)?;
Ok(())
}

/// Unsubscribe from given gossipsub topic
pub fn unsubscribe_from_topic(&self, topic_id: String) -> Result<()> {
self.network.unsubscribe_from_topic(topic_id)?;
Ok(())
}

/// Publish a message on a given gossipsub topic
pub fn publish_on_topic(&self, topic_id: String, msg: Vec<u8>) -> Result<()> {
self.network.publish_on_topic(topic_id, msg)?;
Ok(())
}
}

0 comments on commit 421ddb3

Please sign in to comment.