Skip to content

Commit

Permalink
impl Display for NodeID + From<SigningKey> for SecretKey
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Nov 29, 2021
1 parent 561a232 commit eb0ff72
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 36 deletions.
6 changes: 6 additions & 0 deletions jormungandr-lib/src/crypto/key.rs
Expand Up @@ -414,6 +414,12 @@ impl<A: AsymmetricKey> From<SecretKey<A>> for SigningKey<A> {
}
}

impl<A: AsymmetricKey> From<SigningKey<A>> for SecretKey<A> {
fn from(key: SigningKey<A>) -> Self {
key.0
}
}

impl<A: AsymmetricPublicKey> From<PublicKey<A>> for Identifier<A> {
fn from(key: PublicKey<A>) -> Self {
Identifier(key)
Expand Down
2 changes: 1 addition & 1 deletion jormungandr/src/blockchain/process.rs
Expand Up @@ -243,7 +243,7 @@ impl Process {
hash = %header.hash(),
parent = %header.parent_id(),
date = %header.block_date(),
peer = ?node_id
%node_id
);
let _enter = span.enter();
tracing::debug!("received block announcement from network");
Expand Down
2 changes: 1 addition & 1 deletion jormungandr/src/network/client/connect.rs
Expand Up @@ -55,7 +55,7 @@ pub fn connect(state: ConnectionState, channels: Channels) -> (ConnectHandle, Co
//TODO: check id is the expected one
let peer_id = validate_peer_auth(hr.auth, &nonce)?;

tracing::debug!(node_id = ?peer_id, "authenticated server peer node");
tracing::debug!(node_id = %peer_id, "authenticated server peer node");

// Send client authentication
let auth = keypair.sign(&hr.nonce);
Expand Down
17 changes: 9 additions & 8 deletions jormungandr/src/network/client/mod.rs
Expand Up @@ -11,7 +11,7 @@ use super::{
comm::{OutboundSubscription, PeerComms},
Address,
},
subscription::{BlockAnnouncementProcessor, FragmentProcessor, GossipProcessor},
subscription::{BlockAnnouncementProcessor, Direction, FragmentProcessor, GossipProcessor},
Channels, GlobalStateR,
};
use crate::{
Expand Down Expand Up @@ -84,6 +84,7 @@ impl Client {
builder.channels.topology_box,
inbound.peer_id,
global_state.clone(),
Direction::Client,
);

Client {
Expand Down Expand Up @@ -153,7 +154,7 @@ impl Progress {
}

impl Client {
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id))]
fn process_block_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<ProcessingOutcome, ()>> {
use self::ProcessingOutcome::*;
// Drive sending of a message to block task to clear the buffered
Expand Down Expand Up @@ -244,7 +245,7 @@ impl Client {
Ok(Continue).into()
}

#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id))]
fn upload_blocks(&mut self, block_ids: BlockIds) -> Result<(), ()> {
if block_ids.is_empty() {
tracing::info!("peer has sent an empty block solicitation");
Expand Down Expand Up @@ -294,7 +295,7 @@ impl Client {
Ok(())
}

#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id))]
fn push_missing_headers(&mut self, req: ChainPullRequest) -> Result<(), ()> {
let from = req.from.decode().map_err(|e| {
tracing::info!(
Expand Down Expand Up @@ -346,7 +347,7 @@ impl Client {
Ok(())
}

#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id))]
fn pull_headers(&mut self, req: ChainPullRequest) {
let mut block_box = self.block_sink.message_box();

Expand Down Expand Up @@ -392,7 +393,7 @@ impl Client {
);
}

#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id))]
fn solicit_blocks(&mut self, block_ids: BlockIds) {
let mut block_box = self.block_sink.message_box();
let (handle, sink, _) = intercom::stream_request(buffer_sizes::inbound::BLOCKS);
Expand Down Expand Up @@ -437,7 +438,7 @@ impl Client {
);
}

#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id, direction = "in"))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id, direction = "in"))]
fn process_fragments(&mut self, cx: &mut Context<'_>) -> Poll<Result<ProcessingOutcome, ()>> {
use self::ProcessingOutcome::*;
let mut fragment_sink = Pin::new(&mut self.fragment_sink);
Expand Down Expand Up @@ -471,7 +472,7 @@ impl Client {
}
}

#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = ?self.inbound.peer_id, direction = "in"))]
#[instrument(skip_all, level = "debug", fields(peer_addr = %self.inbound.peer_addr, id = %self.inbound.peer_id, direction = "in"))]
fn process_gossip(&mut self, cx: &mut Context<'_>) -> Poll<Result<ProcessingOutcome, ()>> {
use self::ProcessingOutcome::*;
let mut gossip_sink = Pin::new(&mut self.gossip_sink);
Expand Down
27 changes: 11 additions & 16 deletions jormungandr/src/network/mod.rs
Expand Up @@ -16,8 +16,6 @@ mod subscription;
use self::convert::Encode;

use futures::{future, prelude::*};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use thiserror::Error;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -172,12 +170,9 @@ impl GlobalState {
) -> Self {
let peers = Peers::new(config.max_connections);

let mut rng_seed = [0; 32];
rand::thread_rng().fill(&mut rng_seed);
let mut prng = ChaChaRng::from_seed(rng_seed);

//TODO: move this to a secure enclave
let keypair = NodeKeyPair::from(config.node_key);
let keypair =
NodeKeyPair::from(<chain_crypto::SecretKey<_>>::from(config.node_key.clone()));

GlobalState {
block0_hash,
Expand Down Expand Up @@ -368,7 +363,7 @@ where
let mut res = Vec::new();
for peer in peers {
if f(peer.id(), arg.clone())
.instrument(span!(Level::DEBUG, "p2p_comm", peer = %peer.address(), id = ?peer.id()))
.instrument(span!(Level::DEBUG, "p2p_comm", peer = %peer.address(), id = %peer.id()))
.await
.is_err()
{
Expand Down Expand Up @@ -469,14 +464,14 @@ fn connect_and_propagate(
let _enter = state.span.enter();
options.evict_clients = state.num_clients_to_bump();
if let Some(self_addr) = state.node_address() {
if node_addr == self_addr {
tracing::error!(peer = %node_addr, "topology tells the node to connect to itself, ignoring");
if addr == self_addr {
tracing::error!(peer = %addr, "topology tells the node to connect to itself, ignoring");
return;
}
}
drop(_enter);
let peer = Peer::new(node_addr);
let conn_span = span!(parent: &state.span, Level::DEBUG, "client", addr = %node_addr);
let peer = Peer::new(addr);
let conn_span = span!(parent: &state.span, Level::DEBUG, "client", addr = %addr);
let spawn_state = state.clone();
let cf = async move {
let conn_state = ConnectionState::new(state.clone(), &peer, Span::current());
Expand Down Expand Up @@ -506,24 +501,24 @@ fn connect_and_propagate(
if !benign {
channels
.topology_box
.send(TopologyMsg::DemotePeer(node_id))
.send(TopologyMsg::DemotePeer(id))
.await
.unwrap_or_else(|e| {
tracing::error!("Error sending message to topology task: {}", e)
});
state.peers.remove_peer(&node_id).await;
state.peers.remove_peer(&id).await;
}
}
Ok(client) => {
// This enforce processing any pending operation that could
// have been scheduled on this peer
state.peers.update_entry(node_id).await;
state.peers.update_entry(id).await;

state.inc_client_count();

channels
.topology_box
.send(TopologyMsg::PromotePeer(node_id))
.send(TopologyMsg::PromotePeer(id))
.await
.unwrap_or_else(|e| {
tracing::error!("Error sending message to topology task: {}", e)
Expand Down
10 changes: 5 additions & 5 deletions jormungandr/src/network/p2p/comm.rs
Expand Up @@ -524,7 +524,7 @@ impl Peers {
pub async fn server_complete_handshake(&self, peer_addr: Address, id: NodeId) {
tracing::debug!(
peer_addr = %peer_addr,
node_id = ?id,
node_id = %id,
"authenticated client peer node"
);
let mut map = self.inner().await;
Expand All @@ -533,7 +533,7 @@ impl Peers {
} else {
tracing::warn!(
%peer_addr,
?id,
%id,
"peer is not known to the node, was the handshake procedure skipped?",
);
}
Expand Down Expand Up @@ -690,7 +690,7 @@ impl Peers {
pub async fn solicit_blocks_peer(&self, peer: &NodeId, hashes: BlockIds) {
let span = debug_span!(
"block solicitation",
?peer,
%peer,
peer_addr = tracing::field::Empty,
hashes = %format!("[{}]", hashes.iter().map(hex::encode).collect::<Vec<_>>().join(", "))
);
Expand Down Expand Up @@ -724,7 +724,7 @@ impl Peers {
pub async fn pull_headers(&self, peer: &NodeId, from: BlockIds, to: BlockId) {
let span = debug_span!(
"pull_header",
?peer,
%peer,
peer_addr = tracing::field::Empty,
from = %format!("[{}]", from.iter().map(hex::encode).collect::<Vec<_>>().join(", ")),
to = %hex::encode(to)
Expand All @@ -749,7 +749,7 @@ impl Peers {
None => {
// TODO: connect and request on demand, or select another peer?
tracing::info!(
peer = ?peer,
peer = %peer,
"peer not available to pull headers from"
);
}
Expand Down
7 changes: 7 additions & 0 deletions jormungandr/src/topology/mod.rs
Expand Up @@ -7,6 +7,7 @@ use jormungandr_lib::time::SystemTime;
use serde::Serialize;
use serde::Serializer;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::hash::{Hash, Hasher};

mod gossip;
Expand Down Expand Up @@ -75,6 +76,12 @@ impl TryFrom<&[u8]> for NodeId {
}
}

impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl AsRef<keynesis::key::ed25519::PublicKey> for NodeId {
fn as_ref(&self) -> &keynesis::key::ed25519::PublicKey {
&self.0
Expand Down
8 changes: 4 additions & 4 deletions jormungandr/src/topology/quarantine.rs
Expand Up @@ -76,16 +76,16 @@ impl ReportRecords {
if self.report_whitelist.contains(&node.address()) {
tracing::debug!(
node = %node.address(),
id=?node.id(),
id=%node.id(),
"quarantine whitelists prevents this node from being reported",
);
ReportNodeStatus::Ok
} else if self.report_grace.contains(&node.id()) {
tracing::trace!(node = %node.address(), id=?node.id(), "not reporting node in grace list");
tracing::trace!(node = %node.address(), id=%node.id(), "not reporting node in grace list");
ReportNodeStatus::Ok
} else {
let mut peer_info = PeerInfo::from(node);
tracing::debug!(node = %peer_info.address, id=?peer_info.id, ?self.report_duration, "reporting node");
tracing::debug!(node = %peer_info.address, id=%peer_info.id, ?self.report_duration, "reporting node");
// If we'll handle report reasons other that a connectivity issue in the future, we may want to
// demote a peer all the way down to dirty in case of a serious violation.
topology.remove_peer(peer_info.id.as_ref());
Expand All @@ -102,7 +102,7 @@ impl ReportRecords {
// nodes again after some time if we haven't heard from them sooner (and avoid network splits).
if topology.peers().dirty().contains(peer_info.id.as_ref()) {
peer_info.quarantined = Some(SystemTime::now().into());
tracing::debug!(node = %peer_info.address, id=?peer_info.id, "node has been quarantined");
tracing::debug!(node = %peer_info.address, id=%peer_info.id, "node has been quarantined");
result = ReportNodeStatus::Quarantine;
}

Expand Down
2 changes: 1 addition & 1 deletion jormungandr/src/topology/topology.rs
Expand Up @@ -209,7 +209,7 @@ impl P2pTopology {
}

/// register a strike against the given peer
#[instrument(skip(self), level = "debug")]
#[instrument(skip_all, level = "debug", fields(%node_id))]
pub fn report_node(&mut self, node_id: &NodeId) {
if let Some(node) = self.topology.get(node_id.as_ref()).cloned() {
let result = self
Expand Down

0 comments on commit eb0ff72

Please sign in to comment.