Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace context for the following actors: #7819

Merged
merged 32 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
969d45c
Trace context for the following actors:
nikurt Oct 13, 2022
754c150
Merge branch 'master' into nikurt-viewca-context
nikurt Oct 14, 2022
7b02a7f
fmt
nikurt Oct 14, 2022
b86133e
Define a macro to avoid repeating the debug span construction.
nikurt Oct 17, 2022
52c4f6d
refactor: remove dead code (#7814)
matklad Oct 14, 2022
a551dcf
Fuzz the running releases (#7833)
Ekleog-NEAR Oct 14, 2022
72eb367
[TransactionResult] Avoid panicing if original column had duplicate e…
robin-near Oct 14, 2022
4ed97af
docs: move epoch docs from confluence (#7838)
matklad Oct 17, 2022
be859da
docs: move sync docs from confluence (#7837)
matklad Oct 17, 2022
f5bc84a
nearcore: run node in archival mode if database is an archive (#7752)
mina86 Oct 17, 2022
50e4cea
client: add {node,validator}_public_key field to /status response; de…
mina86 Oct 17, 2022
292ac32
fix(o11y): Write all logs to stderr (#7834)
nikurt Oct 17, 2022
f6f02fa
refactor: move get_validator_info to EpochManagerAdapter (#7727)
matklad Oct 17, 2022
afd600d
chain: don’t clone Store in RuntimeAdapter::get_store (#7841)
mina86 Oct 17, 2022
a6783f7
Trace context for the following actors:
nikurt Oct 13, 2022
e67da3a
fmt
nikurt Oct 14, 2022
d64ffb7
Merge
nikurt Oct 17, 2022
4ddc73a
Merge branch 'master' into nikurt-viewca-context
nikurt Oct 17, 2022
4c8b410
Merge
nikurt Oct 17, 2022
8fdeebe
Merge branch 'master' into nikurt-viewca-context
nikurt Oct 17, 2022
ef5cc1b
Fix
nikurt Oct 17, 2022
3bc88cc
Merge branch 'master' into nikurt-viewca-context
nikurt Oct 18, 2022
fc0f2e8
SandboxMessage
nikurt Oct 18, 2022
8d197bb
Refactor tests to shorten `actix::spawn(client.send().then())` chains…
nikurt Oct 18, 2022
d5c0b12
Use reflection to deduce actor name and handler name
nikurt Oct 18, 2022
3028d6d
Merge branch 'master' into nikurt-viewca-context
nikurt Oct 18, 2022
5e024b8
macro tune
nikurt Oct 18, 2022
928aa36
macro tune
nikurt Oct 18, 2022
10a571c
macro tune
nikurt Oct 18, 2022
d88da99
macro tune
nikurt Oct 18, 2022
7ea4ce8
macro tune
nikurt Oct 18, 2022
c4f17ea
Merge master into nikurt-viewca-context
near-bulldozer[bot] Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions chain/chunks/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use actix::Message;
use near_network::types::MsgRecipient;
use near_o11y::{WithSpanContext, WithSpanContextExt};
use near_pool::{PoolIteratorWrapper, TransactionPool};
use near_primitives::{
epoch_manager::RngSeed,
Expand All @@ -26,16 +27,19 @@ pub enum ShardsManagerResponse {
InvalidChunk(EncodedShardChunk),
}

impl<A: MsgRecipient<ShardsManagerResponse>> ClientAdapterForShardsManager for A {
impl<A: MsgRecipient<WithSpanContext<ShardsManagerResponse>>> ClientAdapterForShardsManager for A {
fn did_complete_chunk(
&self,
partial_chunk: PartialEncodedChunk,
shard_chunk: Option<ShardChunk>,
) {
self.do_send(ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk });
self.do_send(
ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk }
.with_span_context(),
);
}
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk) {
self.do_send(ShardsManagerResponse::InvalidChunk(chunk));
self.do_send(ShardsManagerResponse::InvalidChunk(chunk).with_span_context());
}
}

Expand Down
12 changes: 8 additions & 4 deletions chain/chunks/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_chain::types::{EpochManagerAdapter, RuntimeAdapter, Tip};
use near_chain::{Chain, ChainStore};
use near_crypto::KeyType;
use near_network::test_utils::MockPeerManagerAdapter;
use near_o11y::WithSpanContext;
use near_primitives::block::BlockHeader;
use near_primitives::hash::{self, CryptoHash};
use near_primitives::merkle::{self, MerklePath};
Expand Down Expand Up @@ -356,14 +357,17 @@ pub struct MockClientAdapterForShardsManager {
pub requests: Arc<RwLock<VecDeque<ShardsManagerResponse>>>,
}

impl MsgRecipient<ShardsManagerResponse> for MockClientAdapterForShardsManager {
fn send(&self, msg: ShardsManagerResponse) -> BoxFuture<'static, Result<(), MailboxError>> {
impl MsgRecipient<WithSpanContext<ShardsManagerResponse>> for MockClientAdapterForShardsManager {
fn send(
&self,
msg: WithSpanContext<ShardsManagerResponse>,
) -> BoxFuture<'static, Result<(), MailboxError>> {
self.do_send(msg);
futures::future::ok(()).boxed()
}

fn do_send(&self, msg: ShardsManagerResponse) {
self.requests.write().unwrap().push_back(msg);
fn do_send(&self, msg: WithSpanContext<ShardsManagerResponse>) {
self.requests.write().unwrap().push_back(msg.msg);
}
}

Expand Down
167 changes: 96 additions & 71 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use near_network::types::{
NetworkClientMessages, NetworkClientResponses, NetworkInfo, NetworkRequests,
PeerManagerAdapter, PeerManagerMessageRequest,
};
use near_o11y::{OpenTelemetrySpanExt, WithSpanContext};
use near_o11y::{handler_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt};
use near_performance_metrics;
use near_performance_metrics_macros::perf;
use near_primitives::block_header::ApprovalType;
Expand Down Expand Up @@ -219,10 +219,10 @@ fn create_sync_job_scheduler<M>(address: Addr<SyncJobsActor>) -> Box<dyn Fn(M)>
where
M: Message + Send + 'static,
M::Result: Send,
SyncJobsActor: Handler<M>,
SyncJobsActor: Handler<WithSpanContext<M>>,
{
Box::new(move |msg: M| {
if let Err(err) = address.try_send(msg) {
if let Err(err) = address.try_send(msg.with_span_context()) {
match err {
SendError::Full(request) => {
address.do_send(request);
Expand Down Expand Up @@ -266,16 +266,8 @@ impl Handler<WithSpanContext<NetworkClientMessages>> for ClientActor {
msg: WithSpanContext<NetworkClientMessages>,
ctx: &mut Context<Self>,
) -> Self::Result {
let msg_type: &str = (&msg.msg).into();
let span = tracing::debug_span!(
target: "client",
"handle",
handler = "NetworkClientMessages",
actor = "ClientActor",
msg_type)
.entered();
span.set_parent(msg.context);
let msg = msg.msg;
let msg_type: &'static str = (&msg.msg).into();
let (_span, msg) = handler_span!("client", msg, msg_type);

self.check_triggers(ctx);

Expand Down Expand Up @@ -632,15 +624,17 @@ impl ClientActor {
}
}
}

#[cfg(feature = "sandbox")]
impl Handler<near_client_primitives::types::SandboxMessage> for ClientActor {
impl Handler<WithSpanContext<near_client_primitives::types::SandboxMessage>> for ClientActor {
type Result = near_client_primitives::types::SandboxResponse;

fn handle(
&mut self,
msg: near_client_primitives::types::SandboxMessage,
msg: WithSpanContext<near_client_primitives::types::SandboxMessage>,
_ctx: &mut Context<Self>,
) -> near_client_primitives::types::SandboxResponse {
let (_span, msg) = handler_span!("client", msg);
match msg {
near_client_primitives::types::SandboxMessage::SandboxPatchState(state) => {
self.client.chain.patch_state(
Expand Down Expand Up @@ -671,12 +665,12 @@ impl Handler<near_client_primitives::types::SandboxMessage> for ClientActor {
}
}

impl Handler<Status> for ClientActor {
impl Handler<WithSpanContext<Status>> for ClientActor {
type Result = Result<StatusResponse, StatusError>;

#[perf]
fn handle(&mut self, msg: Status, ctx: &mut Context<Self>) -> Self::Result {
let _span = tracing::debug_span!(target: "client", "handle", handler = "Status").entered();
fn handle(&mut self, msg: WithSpanContext<Status>, ctx: &mut Context<Self>) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let _d = delay_detector::DelayDetector::new(|| "client status".into());
self.check_triggers(ctx);

Expand Down Expand Up @@ -813,16 +807,16 @@ fn make_known_producer(
}
}

impl Handler<GetNetworkInfo> for ClientActor {
impl Handler<WithSpanContext<GetNetworkInfo>> for ClientActor {
type Result = Result<NetworkInfoResponse, String>;

#[perf]
fn handle(&mut self, _msg: GetNetworkInfo, ctx: &mut Context<Self>) -> Self::Result {
let _span = tracing::debug_span!(
target: "client",
"handle",
handler="GetNetworkInfo")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<GetNetworkInfo>,
ctx: &mut Context<Self>,
) -> Self::Result {
let (_span, _msg) = handler_span!("client", msg);
let _d = delay_detector::DelayDetector::new(|| "client get network info".into());
self.check_triggers(ctx);

Expand Down Expand Up @@ -851,10 +845,15 @@ impl Handler<GetNetworkInfo> for ClientActor {
#[rtype(result = "()")]
pub struct ApplyChunksDoneMessage;

impl Handler<ApplyChunksDoneMessage> for ClientActor {
impl Handler<WithSpanContext<ApplyChunksDoneMessage>> for ClientActor {
type Result = ();

fn handle(&mut self, _msg: ApplyChunksDoneMessage, _ctx: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<ApplyChunksDoneMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
let (_span, _msg) = handler_span!("client", msg);
self.try_process_unfinished_blocks();
}
}
Expand Down Expand Up @@ -1384,7 +1383,7 @@ impl ClientActor {
fn get_apply_chunks_done_callback(&self) -> DoneApplyChunkCallback {
let addr = self.my_address.clone();
Arc::new(move |_| {
addr.do_send(ApplyChunksDoneMessage {});
addr.do_send(ApplyChunksDoneMessage {}.with_span_context());
})
}

Expand Down Expand Up @@ -1908,30 +1907,37 @@ impl Actor for SyncJobsActor {
type Context = Context<Self>;
}

impl Handler<ApplyStatePartsRequest> for SyncJobsActor {
impl Handler<WithSpanContext<ApplyStatePartsRequest>> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: ApplyStatePartsRequest, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ApplyStatePartsRequest")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<ApplyStatePartsRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let result = self.apply_parts(&msg);

self.client_addr.do_send(ApplyStatePartsResponse {
apply_result: result,
shard_id: msg.shard_id,
sync_hash: msg.sync_hash,
});
self.client_addr.do_send(
ApplyStatePartsResponse {
apply_result: result,
shard_id: msg.shard_id,
sync_hash: msg.sync_hash,
}
.with_span_context(),
);
}
}

impl Handler<ApplyStatePartsResponse> for ClientActor {
impl Handler<WithSpanContext<ApplyStatePartsResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: ApplyStatePartsResponse, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ApplyStatePartsResponse")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<ApplyStatePartsResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_apply_result(msg.shard_id, msg.apply_result);
Expand All @@ -1941,27 +1947,33 @@ impl Handler<ApplyStatePartsResponse> for ClientActor {
}
}

impl Handler<BlockCatchUpRequest> for SyncJobsActor {
impl Handler<WithSpanContext<BlockCatchUpRequest>> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: BlockCatchUpRequest, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "BlockCatchUpRequest")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<BlockCatchUpRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work);

self.client_addr.do_send(BlockCatchUpResponse {
sync_hash: msg.sync_hash,
block_hash: msg.block_hash,
results,
});
self.client_addr.do_send(
BlockCatchUpResponse { sync_hash: msg.sync_hash, block_hash: msg.block_hash, results }
.with_span_context(),
);
}
}

impl Handler<BlockCatchUpResponse> for ClientActor {
impl Handler<WithSpanContext<BlockCatchUpResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: BlockCatchUpResponse, _: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<BlockCatchUpResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
if let Some((_, _, blocks_catch_up_state)) =
self.client.catchup_state_syncs.get_mut(&msg.sync_hash)
{
Expand All @@ -1973,31 +1985,42 @@ impl Handler<BlockCatchUpResponse> for ClientActor {
}
}

impl Handler<StateSplitRequest> for SyncJobsActor {
impl Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: StateSplitRequest, _: &mut Self::Context) -> Self::Result {
let _span = tracing::debug_span!(target: "client", "handle", handler = "StateSplitRequest")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<StateSplitRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
let results = msg.runtime.build_state_for_split_shards(
msg.shard_uid,
&msg.state_root,
&msg.next_epoch_shard_layout,
msg.state_split_status,
);

self.client_addr.do_send(StateSplitResponse {
sync_hash: msg.sync_hash,
shard_id: msg.shard_id,
new_state_roots: results,
});
self.client_addr.do_send(
StateSplitResponse {
sync_hash: msg.sync_hash,
shard_id: msg.shard_id,
new_state_roots: results,
}
.with_span_context(),
);
}
}

impl Handler<StateSplitResponse> for ClientActor {
impl Handler<WithSpanContext<StateSplitResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: StateSplitResponse, _: &mut Self::Context) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<StateSplitResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_split_result(msg.shard_id, msg.new_state_roots);
Expand All @@ -2007,13 +2030,15 @@ impl Handler<StateSplitResponse> for ClientActor {
}
}

impl Handler<ShardsManagerResponse> for ClientActor {
impl Handler<WithSpanContext<ShardsManagerResponse>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: ShardsManagerResponse, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ShardsManagerResponse")
.entered();
fn handle(
&mut self,
msg: WithSpanContext<ShardsManagerResponse>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
match msg {
ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk } => {
self.client.on_chunk_completed(
Expand Down
11 changes: 8 additions & 3 deletions chain/client/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use near_client_primitives::{
debug::{EpochInfoView, TrackedShardsView},
types::StatusError,
};
use near_o11y::log_assert;
use near_o11y::{handler_span, log_assert, OpenTelemetrySpanExt, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::syncing::get_num_state_parts;
use near_primitives::types::{AccountId, BlockHeight, ShardId, ValidatorInfoIdentifier};
Expand Down Expand Up @@ -143,11 +143,16 @@ impl BlockProductionTracker {
}
}

impl Handler<DebugStatus> for ClientActor {
impl Handler<WithSpanContext<DebugStatus>> for ClientActor {
type Result = Result<DebugStatusResponse, StatusError>;

#[perf]
fn handle(&mut self, msg: DebugStatus, _ctx: &mut Context<Self>) -> Self::Result {
fn handle(
&mut self,
msg: WithSpanContext<DebugStatus>,
_ctx: &mut Context<Self>,
) -> Self::Result {
let (_span, msg) = handler_span!("client", msg);
match msg {
DebugStatus::SyncStatus => {
Ok(DebugStatusResponse::SyncStatus(self.client.sync_status.clone().into()))
Expand Down
Loading