Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kademlia buckets to comms layer #2817

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ where B: BlockchainBackend + 'static

// Add your RPC services here ‍🏴‍☠️️☮️🌊
let rpc_server = rpc_server
.add_service(dht.rpc_service())
.add_service(dht.create_rpc_service())
.add_service(base_node::create_base_node_sync_rpc_service(db.clone()))
.add_service(mempool::create_mempool_rpc_service(
handles.expect_handle::<MempoolHandle>(),
Expand Down
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,15 @@ impl CommandHandler {
});
}

pub fn discover_peer(&self, dest_pubkey: Box<RistrettoPublicKey>) {
pub fn discover_peer(&self, dest_pubkey: RistrettoPublicKey) {
let mut dht = self.discovery_service.clone();

self.executor.spawn(async move {
let start = Instant::now();
println!("🌎 Peer discovery started.");

match dht
.discover_peer(dest_pubkey.clone(), NodeDestination::PublicKey(dest_pubkey))
.discover_peer(dest_pubkey.clone(), NodeDestination::PublicKey(Box::new(dest_pubkey)))
.await
{
Ok(p) => {
Expand Down Expand Up @@ -623,6 +623,7 @@ impl CommandHandler {
pub fn list_connections(&self) {
let mut connectivity = self.connectivity.clone();
let peer_manager = self.peer_manager.clone();
let node_id = self.base_node_identity.node_id().clone();

self.executor.spawn(async move {
match connectivity.get_active_connections().await {
Expand All @@ -638,6 +639,7 @@ impl CommandHandler {
"Public Key",
"Address",
"Direction",
"Dist Bucket",
"Age",
"Role",
"User Agent",
Expand All @@ -663,6 +665,7 @@ impl CommandHandler {
peer.public_key,
conn.address(),
conn.direction(),
peer.node_id.distance(&node_id).get_bucket(4).2,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the config, I'm not sure if it is exposed all the way

format_duration_basic(conn.age()),
{
if peer.features == PeerFeatures::COMMUNICATION_CLIENT {
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl Parser {
/// Function to process the discover-peer command
fn process_discover_peer<'a, I: Iterator<Item = &'a str>>(&mut self, mut args: I) {
let dest_pubkey = match args.next().and_then(parse_emoji_id_or_public_key) {
Some(v) => Box::new(v),
Some(v) => v,
None => {
println!("Please enter a valid destination public key or emoji id");
println!("discover-peer [hex public key or emoji id]");
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_console_wallet/src/automation/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,14 @@ pub async fn discover_peer(
) -> Result<(), CommandError> {
use ParsedArgument::*;
let dest_public_key = match args[0].clone() {
PublicKey(key) => Ok(Box::new(key)),
PublicKey(key) => Ok(key),
_ => Err(CommandError::Argument),
}?;

let start = Instant::now();
println!("🌎 Peer discovery started.");
match dht_service
.discover_peer(dest_public_key.clone(), NodeDestination::PublicKey(dest_public_key))
.discover_peer(dest_public_key.clone(), NodeDestination::PublicKey(Box::new(dest_public_key)))
.await
{
Ok(peer) => {
Expand Down
13 changes: 13 additions & 0 deletions applications/tari_console_wallet/src/ui/components/receive_tab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl ReceiveTab {
Constraint::Length(3),
Constraint::Length(3),
Constraint::Length(3),
Constraint::Length(3),
Constraint::Min(1),
]
.as_ref(),
Expand Down Expand Up @@ -87,6 +88,18 @@ impl ReceiveTab {
.split(info_chunks[3]);
let emoji_id = Paragraph::new(app_state.get_identity().emoji_id.as_str());
f.render_widget(emoji_id, label_layout[0]);

// Node id
let block = Block::default()
.borders(Borders::ALL)
.title(Span::styled("Node ID", Style::default().fg(Color::White)));
f.render_widget(block, info_chunks[4]);
let label_layout = Layout::default()
.constraints([Constraint::Length(1)].as_ref())
.margin(1)
.split(info_chunks[4]);
let node_id = Paragraph::new(app_state.get_identity().node_id.as_str());
f.render_widget(node_id, label_layout[0]);
}
}

Expand Down
2 changes: 2 additions & 0 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ impl AppStateData {
public_address: node_identity.public_address().to_string(),
emoji_id: eid,
qr_code: image,
node_id: node_identity.node_id().to_string(),
};
let base_node_previous = base_node_selected.clone();

Expand Down Expand Up @@ -879,6 +880,7 @@ pub struct MyIdentity {
pub public_address: String,
pub emoji_id: String,
pub qr_code: String,
pub node_id: String,
}

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl BlockSync {
StateEvent::BlocksSynchronized
},
Err(err) => {
debug!(target: LOG_TARGET, "Block sync failed: {}", err);
warn!(target: LOG_TARGET, "Block sync failed: {}", err);
StateEvent::BlockSyncFailed
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
let tip_header = self.db.fetch_last_header().await?;
let local_metadata = self.db.get_chain_metadata().await?;
if tip_header.height <= local_metadata.height_of_longest_chain() {
debug!(
info!(
target: LOG_TARGET,
"Blocks already synchronized to height {}.", tip_header.height
);
Expand All @@ -138,7 +138,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
let chain_header = self.db.fetch_chain_header(best_height).await?;

let best_full_block_hash = chain_header.accumulated_data().hash.clone();
debug!(
info!(
target: LOG_TARGET,
"Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
peer,
Expand Down
47 changes: 38 additions & 9 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,20 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

info!(
target: LOG_TARGET,
"Request for blocks for sync from {} to hash:{}",
start_header.height,
message.end_hash.to_hex()
);
let start = start_header.height + 1;
if start < metadata.pruned_height() {
warn!(
target: LOG_TARGET,
"Requested full block body at height {}, however this node has an effective pruned height of {}",
start,
metadata.pruned_height()
);
return Err(RpcStatus::bad_request(format!(
"Requested full block body at height {}, however this node has an effective pruned height of {}",
start,
Expand All @@ -103,13 +115,17 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ

let end = end_header.height;
if start > end {
warn!(
target: LOG_TARGET,
"Start block #{} is higher than end block #{}", start, end
);
return Err(RpcStatus::bad_request(format!(
"Start block #{} is higher than end block #{}",
start, end
)));
}

debug!(
info!(
target: LOG_TARGET,
"Initiating block sync with peer `{}` from height {} to {}", peer_node_id, start, end,
);
Expand All @@ -125,14 +141,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
break;
}

debug!(target: LOG_TARGET, "Sending blocks #{} - #{}", start, end);
info!(target: LOG_TARGET, "Sending blocks #{} - #{}", start, end);
let blocks = db
.fetch_blocks(start..=end)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET));

match blocks {
Ok(blocks) if blocks.is_empty() => {
info!(target: LOG_TARGET, "No blocks to send");
break;
},
Ok(blocks) => {
Expand All @@ -159,7 +176,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

debug!(
info!(
target: LOG_TARGET,
"Block sync round complete for peer `{}`.", peer_node_id,
);
Expand All @@ -182,20 +199,25 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("Header not found with given hash"))?;

let tip_header = db
.fetch_tip_header()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let mut count = message.count;
if count == 0 {
let tip_header = db
.fetch_tip_header()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
count = tip_header.height().saturating_sub(start_header.height);
}
if count == 0 {
return Ok(Streaming::empty());
}

// There may be more headers in our DB than blocks. Don't send these because we cannot
// the body
count = cmp::min(count, tip_header.height().saturating_sub(start_header.height));

let chunk_size = cmp::min(100, count) as usize;
debug!(
info!(
target: LOG_TARGET,
"Initiating header sync with peer `{}` from height {} to {} (chunk_size={})",
peer_node_id,
Expand Down Expand Up @@ -310,9 +332,16 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

// Only send up to the tip, otherwise we will not be able to provide the blocks
let headers = headers
.into_iter()
.filter(|h| h.height <= metadata.height_of_longest_chain())
.map(Into::into)
.collect();

Ok(Response::new(FindChainSplitResponse {
fork_hash_index: idx as u32,
headers: headers.into_iter().map(Into::into).collect(),
headers,
tip_height: metadata.height_of_longest_chain(),
}))
},
Expand Down
24 changes: 18 additions & 6 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,10 @@ impl LMDBDatabase {
})?
};

let mut total_kernel_sum = Commitment::from_bytes(&[0u8; 32]).expect("Could not create commitment");
let mut total_utxo_sum = Commitment::from_bytes(&[0u8; 32]).expect("Could not create commitment");
let mut total_kernel_sum =
Commitment::from_bytes(&[0u8; 32]).expect("We are using a static input, so this can never fail");
let mut total_utxo_sum =
Commitment::from_bytes(&[0u8; 32]).expect("We are using a static input, so this can never fail");
let BlockAccumulatedData {
kernels: pruned_kernel_set,
outputs: pruned_output_set,
Expand Down Expand Up @@ -1466,8 +1468,13 @@ impl BlockchainBackend for LMDBDatabase {
let previous_mmr_count = if start_height == 0 {
0
} else {
let header: BlockHeader =
lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist");
let header: BlockHeader = lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.ok_or_else(|| {
ChainStorageError::ValueNotFound {
entity: "BlockHeader".to_string(),
field: "height".to_string(),
value: (start_height - 1).to_string(),
}
})?;
debug!(target: LOG_TARGET, "Previous header:{}", header);
header.kernel_mmr_size
};
Expand Down Expand Up @@ -1537,8 +1544,13 @@ impl BlockchainBackend for LMDBDatabase {
let previous_mmr_count = if start_height == 0 {
0
} else {
let header: BlockHeader =
lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.expect("Header should exist");
let header: BlockHeader = lmdb_get(&txn, &self.headers_db, &(start_height - 1))?.ok_or_else(|| {
ChainStorageError::ValueNotFound {
entity: "BlockHeader".to_string(),
field: "height".to_string(),
value: (start_height - 1).to_string(),
}
})?;
debug!(target: LOG_TARGET, "Previous header:{}", header);
header.output_mmr_size
};
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ async fn handle_outbound_tx(
exclude_peers: Vec<NodeId>,
) -> Result<(), MempoolServiceError> {
let result = outbound_message_service
.propagate(
.broadcast(
NodeDestination::Unknown,
OutboundEncryption::ClearText,
exclude_peers,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ where

// DHT RPC service is only available for communication nodes
if node_identity.has_peer_features(PeerFeatures::COMMUNICATION_NODE) {
comms = comms.add_rpc_server(RpcServer::new().add_service(dht.rpc_service()));
comms = comms.add_rpc_server(RpcServer::new().add_service(dht.create_rpc_service()));
}

// Hook up DHT messaging middlewares
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let ping_stream = self.ping_stream.take().expect("ping_stream cannot be None").fuse();
pin_mut!(ping_stream);

let request_stream = self.request_rx.take().expect("ping_stream cannot be None").fuse();
let request_stream = self.request_rx.take().expect("request_stream cannot be None").fuse();
pin_mut!(request_stream);

let mut ping_tick = match self.config.auto_ping_interval {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,9 @@ where TBackend: TransactionBackend + 'static
match self
.resources
.outbound_message_service
.closest_broadcast(
.closer_only(
NodeId::from_public_key(&self.dest_pubkey),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::SenderPartialTransaction, proto_message),
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,9 @@ async fn send_transaction_finalized_message_store_and_forward(
outbound_message_service: &mut OutboundMessageRequester,
) -> Result<bool, TransactionServiceError> {
match outbound_message_service
.closest_broadcast(
.closer_only(
NodeId::from_public_key(&destination_pubkey),
OutboundEncryption::EncryptFor(Box::new(destination_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::TransactionFinalized, msg.clone()),
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ pub async fn send_transaction_cancelled_message(
.await?;

let _ = outbound_message_service
.closest_broadcast(
.closer_only(
NodeId::from_public_key(&destination_public_key),
OutboundEncryption::EncryptFor(Box::new(destination_public_key)),
vec![],
OutboundDomainMessage::new(TariMessageType::SenderPartialTransaction, proto_message),
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ async fn send_transaction_reply_store_and_forward(
outbound_message_service: &mut OutboundMessageRequester,
) -> Result<bool, TransactionServiceError> {
match outbound_message_service
.closest_broadcast(
.closer_only(
NodeId::from_public_key(&destination_pubkey),
OutboundEncryption::EncryptFor(Box::new(destination_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, msg),
)
.await
Expand Down
4 changes: 2 additions & 2 deletions common/logging/log4rs_sample_base_node.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ loggers:
additive: false
# Route log events sent to the "comms" logger to the "network" appender
comms:
level: info
level: debug
appenders:
- network
additive: false
# Route log events sent to the "p2p" logger to the "network" appender
p2p:
level: info
level: debug
appenders:
- network
additive: false
Expand Down
Loading