Skip to content

Commit

Permalink
chore: add node id/public key to log mdc
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Nov 11, 2021
1 parent 9f8e289 commit 9ed5fd6
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 13 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ config = { version = "0.9.3" }
either = "1.6.1"
futures = { version = "^0.3.16", default-features = false, features = ["alloc"] }
log = { version = "0.4.8", features = ["std"] }
log-mdc = "0.1.0"
num_cpus = "1"
regex = "1"
rustyline = "9.0"
Expand Down
3 changes: 3 additions & 0 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) ->
PeerFeatures::COMMUNICATION_NODE,
)?;

log_mdc::insert("node-public-key", node_identity.public_key().to_string());
log_mdc::insert("node-id", node_identity.node_id().to_string());

// Exit if create_id or init arguments were run
if bootstrap.create_id {
info!(
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ integer-encoding = "3.0.2"
lazy_static = "1.4.0"
lmdb-zero = "0.4.4"
log = "0.4"
log-mdc = "0.1.0"
monero = { version = "^0.13.0", features = ["serde_support"], optional = true }
newtype-ops = "0.1.4"
num = "0.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ where B: BlockchainBackend + 'static
let db = self.db.clone();
let config = self.config.clone();

let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
context.spawn_when_ready(move |handles| async move {
log_mdc::extend(mdc);
let chain_metadata_service = handles.expect_handle::<ChainMetadataHandle>();
let node_local_interface = handles.expect_handle::<LocalNodeCommsInterface>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
let next_state_future = self.next_state_event(&mut state);

// Get the next `StateEvent`, returning a `UserQuit` state event if the interrupt signal is triggered
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
let next_event = select_next_state_event(interrupt_signal, next_state_future).await;
log_mdc::extend(mdc);
// Publish the event on the event bus
let _ = self.event_publisher.send(Arc::new(next_event.clone()));
trace!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,17 @@ impl BlockSync {
});

let timer = Instant::now();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
match synchronizer.synchronize().await {
Ok(()) => {
log_mdc::extend(mdc);
info!(target: LOG_TARGET, "Blocks synchronized in {:.0?}", timer.elapsed());
self.is_synced = true;
StateEvent::BlocksSynchronized
},
Err(err) => {
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "Block sync failed: {}", err);
StateEvent::BlockSyncFailed
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,28 @@ impl HeaderSync {
});

let timer = Instant::now();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
match synchronizer.synchronize().await {
Ok(sync_peer) => {
log_mdc::extend(mdc);
info!(target: LOG_TARGET, "Headers synchronized in {:.0?}", timer.elapsed());
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
},
Err(err @ BlockHeaderSyncError::SyncFailedAllPeers) => {
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "{}. Continuing...", err);
StateEvent::Continue
},
Err(err @ BlockHeaderSyncError::NetworkSilence) => {
log_mdc::extend(mdc);
warn!(target: LOG_TARGET, "{}", err);
self.is_synced = true;
StateEvent::NetworkSilence
},
Err(err) => {
log_mdc::extend(mdc);
debug!(target: LOG_TARGET, "Header sync failed: {}", err);
StateEvent::HeaderSyncFailed
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ impl Listening {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
let mut time_since_better_block = None;
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
loop {
let metadata_event = shared.metadata_event_stream.recv().await;
log_mdc::extend(mdc.clone());
match metadata_event.as_ref().map(|v| v.deref()) {
Ok(ChainMetadataEvent::NetworkSilence) => {
debug!("NetworkSilence event received");
Expand All @@ -139,6 +142,7 @@ impl Listening {
.peer_manager
.set_peer_metadata(peer.node_id(), 1, peer_data.to_bytes())
.await;
log_mdc::extend(mdc.clone());
}

let configured_sync_peers = &shared.config.block_sync_config.sync_peers;
Expand Down Expand Up @@ -182,6 +186,7 @@ impl Listening {
return FatalError(format!("Could not get local blockchain metadata. {}", e));
},
};
log_mdc::extend(mdc.clone());

// If this node is just one block behind, wait for block propagation before
// rushing to sync mode
Expand Down Expand Up @@ -216,6 +221,7 @@ impl Listening {
return FatalError(format!("Could not get local blockchain metadata. {}", e));
},
};
log_mdc::extend(mdc.clone());

let sync_mode = determine_sync_mode(
shared.config.blocks_behind_before_considered_lagging,
Expand Down
8 changes: 7 additions & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ macro_rules! make_async_fn {
$(#[$outer])*
pub async fn $fn(&self) -> Result<$rtype, ChainStorageError> {
let db = self.db.clone();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
tokio::task::spawn_blocking(move || {
trace_log($name, move || db.$fn())
log_mdc::extend(mdc.clone());
trace_log($name, move || db.$fn())
})
.await?
}
Expand All @@ -107,7 +110,10 @@ macro_rules! make_async_fn {
$(#[$outer])*
pub async fn $fn$(< $( $lt $( : $clt )? ),+ +Sync+Send + 'static >)?(&self, $($param: $ptype),+) -> Result<$rtype, ChainStorageError> {
let db = self.db.clone();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
tokio::task::spawn_blocking(move || {
log_mdc::extend(mdc.clone());
trace_log($name, move || db.$fn($($param),+))
})
.await?
Expand Down
29 changes: 21 additions & 8 deletions base_layer/core/src/mempool/async_mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,38 @@ use tari_common_types::types::Signature;
macro_rules! make_async {
($fn:ident($($param1:ident:$ptype1:ty,$param2:ident:$ptype2:ty),+) -> $rtype:ty) => {
pub async fn $fn(mp: Mempool, $($param1: $ptype1, $param2: $ptype2),+) -> Result<$rtype, MempoolError> {
tokio::task::spawn_blocking(move || mp.$fn($($param1,$param2),+))
.await
.or_else(|err| Err(MempoolError::BlockingTaskSpawnError(err.to_string())))
.and_then(|inner_result| inner_result)
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
tokio::task::spawn_blocking(move || {
log_mdc::extend(mdc.clone());
mp.$fn($($param1,$param2),+)
})
.await
.or_else(|err| Err(MempoolError::BlockingTaskSpawnError(err.to_string())))
.and_then(|inner_result| inner_result)
}
};

($fn:ident($($param:ident:$ptype:ty),+) -> $rtype:ty) => {
pub async fn $fn(mp: Mempool, $($param: $ptype),+) -> Result<$rtype, MempoolError> {
tokio::task::spawn_blocking(move || mp.$fn($($param),+))
.await
.or_else(|err| Err(MempoolError::BlockingTaskSpawnError(err.to_string())))
.and_then(|inner_result| inner_result)
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
tokio::task::spawn_blocking(move || {
log_mdc::extend(mdc.clone());
mp.$fn($($param),+)
})
.await
.or_else(|err| Err(MempoolError::BlockingTaskSpawnError(err.to_string())))
.and_then(|inner_result| inner_result)
}
};

($fn:ident() -> $rtype:ty) => {
pub async fn $fn(mp: Mempool) -> Result<$rtype, MempoolError> {
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
tokio::task::spawn_blocking(move || {
log_mdc::extend(mdc.clone());
mp.$fn()
})
.await
Expand Down
5 changes: 5 additions & 0 deletions base_layer/core/src/mempool/sync_protocol/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ impl ServiceInitializer for MempoolSyncInitializer {
let mempool = self.mempool.clone();
let notif_rx = self.notif_rx.take().unwrap();

let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
context.spawn_until_shutdown(move |handles| async move {
log_mdc::extend(mdc.clone());
let state_machine = handles.expect_handle::<StateMachineHandle>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
// Ensure that we get an subscription ASAP so that we don't miss any connectivity events
Expand All @@ -84,6 +87,7 @@ impl ServiceInitializer for MempoolSyncInitializer {
if !status_watch.borrow().bootstrapped {
debug!(target: LOG_TARGET, "Waiting for node to bootstrap...");
while status_watch.changed().await.is_ok() {
log_mdc::extend(mdc.clone());
if status_watch.borrow().bootstrapped {
debug!(target: LOG_TARGET, "Node bootstrapped. Starting mempool sync protocol");
break;
Expand All @@ -94,6 +98,7 @@ impl ServiceInitializer for MempoolSyncInitializer {
);
sleep(Duration::from_secs(1)).await;
}
log_mdc::extend(mdc.clone());
}

MempoolSyncProtocol::new(config, notif_rx, connectivity_event_subscription, mempool)
Expand Down
5 changes: 2 additions & 3 deletions common/logging/log4rs_sample_base_node.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ appenders:
encoder:
pattern: "{d(%H:%M)} {h({l}):5} {m}{n}"
filters:
-
kind: threshold
- kind: threshold
level: warn

# An appender named "network" that writes to a file with a custom pattern encoder
Expand Down Expand Up @@ -54,7 +53,7 @@ appenders:
count: 5
pattern: "log/base-node/base_layer.{}.log"
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}"
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] [{X(node-public-key)},{X(node-id)}] {l:5} {m}{n}"

# An appender named "other" that writes to a file with a custom pattern encoder
other:
Expand Down
1 change: 1 addition & 0 deletions comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ futures = { version = "^0.3", features = ["async-await"] }
lazy_static = "1.3.0"
lmdb-zero = "0.4.4"
log = { version = "0.4.0", features = ["std"] }
log-mdc = "0.1.0"
multiaddr = { version = "0.13.0" }
nom = { version = "5.1.0", features = ["std"], default-features = false }
openssl-sys = { version = "0.9.66", features = ["vendored"], optional = true }
Expand Down
1 change: 1 addition & 0 deletions comms/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ libsqlite3-sys = { version = ">=0.8.0, <0.13.0", features = ["bundled"], optiona
digest = "0.9.0"
futures = { version = "^0.3.1" }
log = "0.4.8"
log-mdc = "0.1.0"
prost = "=0.8.0"
prost-types = "=0.8.0"
rand = "0.8"
Expand Down
4 changes: 4 additions & 0 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ impl DhtConnectivity {
pub fn spawn(mut self) -> JoinHandle<Result<(), DhtConnectivityError>> {
// Listen to events as early as possible
let connectivity_events = self.connectivity.get_event_subscription();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
task::spawn(async move {
log_mdc::extend(mdc.clone());
debug!(target: LOG_TARGET, "Waiting for connectivity manager to start");
let _ = self.connectivity.wait_started().await;
log_mdc::extend(mdc.clone());
match self.run(connectivity_events).await {
Ok(_) => Ok(()),
Err(err) => {
Expand Down
3 changes: 3 additions & 0 deletions comms/dht/src/discovery/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ impl DhtDiscoveryService {
}

pub fn spawn(self) {
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
task::spawn(async move {
log_mdc::extend(mdc);
info!(target: LOG_TARGET, "Discovery service started");
self.run().await
});
Expand Down
7 changes: 6 additions & 1 deletion comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ struct ConnectivityManagerActor {

impl ConnectivityManagerActor {
pub fn spawn(self) -> JoinHandle<()> {
task::spawn(Self::run(self))
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
task::spawn(async {
log_mdc::extend(mdc);
Self::run(self).await
})
}

#[tracing::instrument(name = "connectivity_manager_actor::run", skip(self))]
Expand Down

0 comments on commit 9ed5fd6

Please sign in to comment.