Skip to content

Commit

Permalink
Trace context for the following actors: (#7819)
Browse files Browse the repository at this point in the history
* ClientActor
* ViewClientActor
* TelemetryActor

Tested by running a mainnet node
  • Loading branch information
nikurt committed Nov 9, 2022
1 parent 2eb3a51 commit f21e049
Show file tree
Hide file tree
Showing 36 changed files with 1,094 additions and 786 deletions.
10 changes: 7 additions & 3 deletions chain/chunks/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use actix::Message;
use near_network::types::MsgRecipient;
use near_o11y::{WithSpanContext, WithSpanContextExt};
use near_pool::{PoolIteratorWrapper, TransactionPool};
use near_primitives::{
epoch_manager::RngSeed,
Expand All @@ -26,16 +27,19 @@ pub enum ShardsManagerResponse {
InvalidChunk(EncodedShardChunk),
}

impl<A: MsgRecipient<ShardsManagerResponse>> ClientAdapterForShardsManager for A {
impl<A: MsgRecipient<WithSpanContext<ShardsManagerResponse>>> ClientAdapterForShardsManager for A {
fn did_complete_chunk(
&self,
partial_chunk: PartialEncodedChunk,
shard_chunk: Option<ShardChunk>,
) {
self.do_send(ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk });
self.do_send(
ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk }
.with_span_context(),
);
}
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk) {
self.do_send(ShardsManagerResponse::InvalidChunk(chunk));
self.do_send(ShardsManagerResponse::InvalidChunk(chunk).with_span_context());
}
}

Expand Down
12 changes: 8 additions & 4 deletions chain/chunks/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_chain::types::{EpochManagerAdapter, RuntimeAdapter, Tip};
use near_chain::{Chain, ChainStore};
use near_crypto::KeyType;
use near_network::test_utils::MockPeerManagerAdapter;
use near_o11y::WithSpanContext;
use near_primitives::block::BlockHeader;
use near_primitives::hash::{self, CryptoHash};
use near_primitives::merkle::{self, MerklePath};
Expand Down Expand Up @@ -356,14 +357,17 @@ pub struct MockClientAdapterForShardsManager {
pub requests: Arc<RwLock<VecDeque<ShardsManagerResponse>>>,
}

impl MsgRecipient<ShardsManagerResponse> for MockClientAdapterForShardsManager {
fn send(&self, msg: ShardsManagerResponse) -> BoxFuture<'static, Result<(), MailboxError>> {
impl MsgRecipient<WithSpanContext<ShardsManagerResponse>> for MockClientAdapterForShardsManager {
fn send(
&self,
msg: WithSpanContext<ShardsManagerResponse>,
) -> BoxFuture<'static, Result<(), MailboxError>> {
self.do_send(msg);
futures::future::ok(()).boxed()
}

fn do_send(&self, msg: ShardsManagerResponse) {
self.requests.write().unwrap().push_back(msg);
fn do_send(&self, msg: WithSpanContext<ShardsManagerResponse>) {
self.requests.write().unwrap().push_back(msg.msg);
}
}

Expand Down
167 changes: 96 additions & 71 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use near_network::types::{
NetworkClientMessages, NetworkClientResponses, NetworkInfo, NetworkRequests,
PeerManagerAdapter, PeerManagerMessageRequest,
};
use near_o11y::{OpenTelemetrySpanExt, WithSpanContext};
use near_o11y::{handler_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt};
use near_performance_metrics;
use near_performance_metrics_macros::perf;
use near_primitives::block_header::ApprovalType;
Expand Down Expand Up @@ -219,10 +219,10 @@ fn create_sync_job_scheduler<M>(address: Addr<SyncJobsActor>) -> Box<dyn Fn(M)>
where
M: Message + Send + 'static,
M::Result: Send,
SyncJobsActor: Handler<M>,
SyncJobsActor: Handler<WithSpanContext<M>>,
{
Box::new(move |msg: M| {
if let Err(err) = address.try_send(msg) {
if let Err(err) = address.try_send(msg.with_span_context()) {
match err {
SendError::Full(request) => {
address.do_send(request);
Expand Down Expand Up @@ -266,16 +266,8 @@ impl Handler<WithSpanContext<NetworkClientMessages>> for ClientActor {
msg: WithSpanContext<NetworkClientMessages>,
ctx: &mut Context<Self>,
) -> Self::Result {
let msg_type: &str = (&msg.msg).into();
let span = tracing::debug_span!(
target: "client",
"handle",
handler = "NetworkClientMessages",
actor = "ClientActor",
msg_type)
.entered();
span.set_parent(msg.context);
let msg = msg.msg;
let msg_type: &'static str = (&msg.msg).into();
let (_span, msg) = handler_span!("client", msg, msg_type);

self.check_triggers(ctx);

Expand Down Expand Up @@ -632,15 +624,17 @@ impl ClientActor {
}
}
}

#[cfg(feature = "sandbox")]
impl Handler<near_client_primitives::types::SandboxMessage> for ClientActor {
impl Handler<WithSpanContext<near_client_primitives::types::SandboxMessage>> for ClientActor {
type Result = near_client_primitives::types::SandboxResponse;

fn handle(
&mut self,
msg: near_client_primitives::types::SandboxMessage,
msg: WithSpanContext<near_client_primitives::types::SandboxMessage>,
_ctx: &mut Context<Self>,
) -> near_client_primitives::types::SandboxResponse {
let (_span, msg) = handler_span!("client", msg);
match msg {
near_client_primitives::types::SandboxMessage::SandboxPatchState(state) => {
self.client.chain.patch_state(
Expand Down Expand Up @@ -671,12 +665,12 @@ impl Handler<near_client_primitives::types::SandboxMessage> for ClientActor {
}
}

impl Handler<Status> for ClientActor {
impl Handler<WithSpanContext<Status>> for ClientActor {
type Result = Result<StatusResponse, StatusError>;

#[perf]
fn handle(&mut self, msg: Status, ctx: &mut Context<Self>) -> Self::Result {
let _span = tracing::debug_span!(target: "client", "handle", handler = "Status").entered();
fn handle(&mut self, msg: WithSpanContext<Status>, ctx: &mut Context<Self>) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let _d = delay_detector::DelayDetector::new(|| "client status".into());
self.check_triggers(ctx);

Expand Down Expand Up @@ -813,16 +807,16 @@ fn make_known_producer(
}
}

impl Handler<GetNetworkInfo> for ClientActor {
impl Handler<WithSpanContext<GetNetworkInfo>> for ClientActor {
type Result = Result<NetworkInfoResponse, String>;

#[perf]
fn handle(&mut self, _msg: GetNetworkInfo, ctx: &mut Context<Self>) -> Self::Result {
let _span = tracing::debug_span!(
target: "client",
"handle",
handler="GetNetworkInfo")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<GetNetworkInfo>,
ctx: &mut Context<Self>,
) -> Self::Result {
let (_span, _msg) = handler_span!("client", msg);
let _d = delay_detector::DelayDetector::new(|| "client get network info".into());
self.check_triggers(ctx);

Expand Down Expand Up @@ -851,10 +845,15 @@ impl Handler<GetNetworkInfo> for ClientActor {
#[rtype(result = "()")]
pub struct ApplyChunksDoneMessage;

impl Handler<ApplyChunksDoneMessage> for ClientActor {
impl Handler<WithSpanContext<ApplyChunksDoneMessage>> for ClientActor {
type Result = ();

fn handle(&mut self, _msg: ApplyChunksDoneMessage, _ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<ApplyChunksDoneMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
let (_span, _msg) = handler_span!("client", msg);
self.try_process_unfinished_blocks();
}
}
Expand Down Expand Up @@ -1384,7 +1383,7 @@ impl ClientActor {
fn get_apply_chunks_done_callback(&self) -> DoneApplyChunkCallback {
let addr = self.my_address.clone();
Arc::new(move |_| {
addr.do_send(ApplyChunksDoneMessage {});
addr.do_send(ApplyChunksDoneMessage {}.with_span_context());
})
}

Expand Down Expand Up @@ -1908,30 +1907,37 @@ impl Actor for SyncJobsActor {
type Context = Context<Self>;
}

impl Handler<ApplyStatePartsRequest> for SyncJobsActor {
impl Handler<WithSpanContext<ApplyStatePartsRequest>> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: ApplyStatePartsRequest, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ApplyStatePartsRequest")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<ApplyStatePartsRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let result = self.apply_parts(&msg);

self.client_addr.do_send(ApplyStatePartsResponse {
apply_result: result,
shard_id: msg.shard_id,
sync_hash: msg.sync_hash,
});
self.client_addr.do_send(
ApplyStatePartsResponse {
apply_result: result,
shard_id: msg.shard_id,
sync_hash: msg.sync_hash,
}
.with_span_context(),
);
}
}

impl Handler<ApplyStatePartsResponse> for ClientActor {
impl Handler<WithSpanContext<ApplyStatePartsResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: ApplyStatePartsResponse, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ApplyStatePartsResponse")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<ApplyStatePartsResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_apply_result(msg.shard_id, msg.apply_result);
Expand All @@ -1941,27 +1947,33 @@ impl Handler<ApplyStatePartsResponse> for ClientActor {
}
}

impl Handler<BlockCatchUpRequest> for SyncJobsActor {
impl Handler<WithSpanContext<BlockCatchUpRequest>> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: BlockCatchUpRequest, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "BlockCatchUpRequest")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<BlockCatchUpRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work);

self.client_addr.do_send(BlockCatchUpResponse {
sync_hash: msg.sync_hash,
block_hash: msg.block_hash,
results,
});
self.client_addr.do_send(
BlockCatchUpResponse { sync_hash: msg.sync_hash, block_hash: msg.block_hash, results }
.with_span_context(),
);
}
}

impl Handler<BlockCatchUpResponse> for ClientActor {
impl Handler<WithSpanContext<BlockCatchUpResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: BlockCatchUpResponse, _: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<BlockCatchUpResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
if let Some((_, _, blocks_catch_up_state)) =
self.client.catchup_state_syncs.get_mut(&msg.sync_hash)
{
Expand All @@ -1973,31 +1985,42 @@ impl Handler<BlockCatchUpResponse> for ClientActor {
}
}

impl Handler<StateSplitRequest> for SyncJobsActor {
impl Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: StateSplitRequest, _: &mut Self::Context) -> Self::Result {
let _span = tracing::debug_span!(target: "client", "handle", handler = "StateSplitRequest")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<StateSplitRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let results = msg.runtime.build_state_for_split_shards(
msg.shard_uid,
&msg.state_root,
&msg.next_epoch_shard_layout,
msg.state_split_status,
);

self.client_addr.do_send(StateSplitResponse {
sync_hash: msg.sync_hash,
shard_id: msg.shard_id,
new_state_roots: results,
});
self.client_addr.do_send(
StateSplitResponse {
sync_hash: msg.sync_hash,
shard_id: msg.shard_id,
new_state_roots: results,
}
.with_span_context(),
);
}
}

impl Handler<StateSplitResponse> for ClientActor {
impl Handler<WithSpanContext<StateSplitResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: StateSplitResponse, _: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<StateSplitResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_split_result(msg.shard_id, msg.new_state_roots);
Expand All @@ -2007,13 +2030,15 @@ impl Handler<StateSplitResponse> for ClientActor {
}
}

impl Handler<ShardsManagerResponse> for ClientActor {
impl Handler<WithSpanContext<ShardsManagerResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: ShardsManagerResponse, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ShardsManagerResponse")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<ShardsManagerResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
match msg {
ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk } => {
self.client.on_chunk_completed(
Expand Down
11 changes: 8 additions & 3 deletions chain/client/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use near_client_primitives::{
debug::{EpochInfoView, TrackedShardsView},
types::StatusError,
};
use near_o11y::log_assert;
use near_o11y::{handler_span, log_assert, OpenTelemetrySpanExt, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::syncing::get_num_state_parts;
use near_primitives::types::{AccountId, BlockHeight, ShardId, ValidatorInfoIdentifier};
Expand Down Expand Up @@ -143,11 +143,16 @@ impl BlockProductionTracker {
}
}

impl Handler<DebugStatus> for ClientActor {
impl Handler<WithSpanContext<DebugStatus>> for ClientActor {
type Result = Result<DebugStatusResponse, StatusError>;

#[perf]
fn handle(&mut self, msg: DebugStatus, _ctx: &mut Context<Self>) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<DebugStatus>,
_ctx: &mut Context<Self>,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
match msg {
DebugStatus::SyncStatus => {
Ok(DebugStatusResponse::SyncStatus(self.client.sync_status.clone().into()))
Expand Down
Loading

0 comments on commit f21e049

Please sign in to comment.