Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to do chunk-level tracing. #10843

Closed
wants to merge 13 commits into from
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ stdx = { package = "near-stdx", path = "utils/stdx" }
[patch.crates-io]
protobuf = { git = "https://github.com/near/rust-protobuf.git", branch = "3.0.2-patch" }
protobuf-support = { git = "https://github.com/near/rust-protobuf.git", branch = "3.0.2-patch" }
tracing-opentelemetry = { path = "./tracing-opentelemetry-0.17.4" }

# Note that "bench" profile inherits from "release" profile and
# "test" profile inherits from "dev" profile.
Expand Down
5 changes: 2 additions & 3 deletions chain/chain/src/update_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::types::{
use near_async::time::Clock;
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_o11y::opentelemetry::root_span_for_chunk;
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::Receipt;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
Expand Down Expand Up @@ -160,6 +161,7 @@ pub fn apply_new_chunk(
runtime: &dyn RuntimeAdapter,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<NewChunkResult, Error> {
let _span = root_span_for_chunk(data.chunk_header.chunk_hash().0).entered();
let NewChunkData {
chunk_header,
transactions,
Expand All @@ -172,7 +174,6 @@ pub fn apply_new_chunk(
let shard_id = shard_context.shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
"new_chunk",
shard_id)
.entered();
Expand Down Expand Up @@ -235,7 +236,6 @@ pub fn apply_old_chunk(
let shard_id = shard_context.shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
"existing_chunk",
shard_id)
.entered();
Expand Down Expand Up @@ -294,7 +294,6 @@ fn apply_resharding(
let shard_id = shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
"resharding",
shard_id,
?shard_uid)
Expand Down
25 changes: 20 additions & 5 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::{debug, debug_span, error, warn};
use near_o11y::opentelemetry::root_span_for_chunk;

pub mod adapter;
mod chunk_cache;
Expand Down Expand Up @@ -554,18 +555,27 @@ impl ShardsManager {
None => return Ok(false),
Some(it) => it,
};
let tracks_shard = cares_about_shard_this_or_next_epoch(Some(me), &prev_hash, shard_id, false, &self.shard_tracker);
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_hash)?;
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;
let single_shard_tracking = checked_feature!("stable", SingleShardTracking, protocol_version);
let block_producers =
self.epoch_manager.get_epoch_block_producers_ordered(&epoch_id, prev_hash)?;
for (bp, _) in block_producers {
if bp.account_id() == me {
return Ok(true);
return if single_shard_tracking {
Ok(tracks_shard)
} else {
Ok(true)
}
}
}
let chunk_producer =
self.epoch_manager.get_chunk_producer(&epoch_id, next_chunk_height, shard_id)?;
if &chunk_producer == me {
return Ok(true);
if self.epoch_manager.is_chunk_producer_for_epoch(&epoch_id, me)? {
return if single_shard_tracking {
Ok(tracks_shard)
} else {
Ok(true)
}
}
Ok(false)
}
Expand Down Expand Up @@ -803,6 +813,7 @@ impl ShardsManager {
request: PartialEncodedChunkRequestMsg,
route_back: CryptoHash,
) {
let _chunk_span = root_span_for_chunk(request.chunk_hash.0).entered();
let _span = tracing::debug_span!(
target: "chunks",
"process_partial_encoded_chunk_request",
Expand Down Expand Up @@ -1225,6 +1236,7 @@ impl ShardsManager {
&mut self,
forward: PartialEncodedChunkForwardMsg,
) -> Result<(), Error> {
let _chunk_span = root_span_for_chunk(forward.chunk_hash.0).entered();
let maybe_header = self
.validate_partial_encoded_chunk_forward(&forward)
.and_then(|_| self.get_partial_encoded_chunk_header(&forward.chunk_hash));
Expand Down Expand Up @@ -1408,6 +1420,7 @@ impl ShardsManager {
&mut self,
partial_encoded_chunk: MaybeValidated<PartialEncodedChunk>,
) -> Result<ProcessPartialEncodedChunkResult, Error> {
let _chunk_span = root_span_for_chunk(partial_encoded_chunk.chunk_hash().0).entered();
let partial_encoded_chunk =
partial_encoded_chunk.map(|chunk| PartialEncodedChunkV2::from(chunk));
let header = &partial_encoded_chunk.header;
Expand Down Expand Up @@ -1547,6 +1560,7 @@ impl ShardsManager {
&mut self,
response: PartialEncodedChunkResponseMsg,
) -> Result<(), Error> {
let _chunk_span = root_span_for_chunk(response.chunk_hash.0).entered();
let header = self.get_partial_encoded_chunk_header(&response.chunk_hash)?;
let partial_chunk = PartialEncodedChunk::new(header, response.parts, response.receipts);
// We already know the header signature is valid because we read it from the
Expand All @@ -1561,6 +1575,7 @@ impl ShardsManager {
&mut self,
header: &ShardChunkHeader,
) -> Result<(), Error> {
let _chunk_span = root_span_for_chunk(header.chunk_hash().0).entered();
if self.insert_header_if_not_exists_and_process_cached_chunk_forwards(header) {
self.try_process_chunk_parts_and_receipts(header)?;
}
Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use near_network::types::{
HighestHeightPeerInfo, NetworkRequests, PeerManagerAdapter, ReasonForBan,
};
use near_o11y::log_assert;
use near_o11y::opentelemetry::root_span_for_chunk;
use near_o11y::WithSpanContextExt;
use near_pool::InsertTransactionResult;
use near_primitives::block::{Approval, ApprovalInner, ApprovalMessage, Block, BlockHeader, Tip};
Expand Down Expand Up @@ -652,6 +653,12 @@ impl Client {
"Producing block",
);

for chunk in new_chunks.values() {
let _span = root_span_for_chunk(chunk.0).entered();
let _ =
tracing::info_span!("Producing block with chunk", chunk_hash = ?*chunk).entered();
}

// If we are producing empty blocks and there are no transactions.
if !self.config.produce_empty_blocks && new_chunks.is_empty() {
debug!(target: "client", "Empty blocks, skipping block production");
Expand Down Expand Up @@ -916,6 +923,11 @@ impl Client {
num_outgoing_receipts = outgoing_receipts.len(),
"Produced chunk");

{
let _chunk_span = root_span_for_chunk(encoded_chunk.chunk_hash().0).entered();
let _ = tracing::info_span!("Produced chunk").entered();
}

metrics::CHUNK_PRODUCED_TOTAL.inc();
self.chunk_production_info.put(
(next_height, shard_id),
Expand Down
5 changes: 5 additions & 0 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use near_network::types::ReasonForBan;
use near_network::types::{
NetworkInfo, NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest,
};
use near_o11y::opentelemetry::root_span_for_chunk;
use near_o11y::WithSpanContextExt;
use near_performance_metrics;
use near_performance_metrics_macros::perf;
Expand Down Expand Up @@ -1835,6 +1836,8 @@ impl ClientActionHandler<ChunkStateWitnessMessage> for ClientActions {

#[perf]
fn handle(&mut self, msg: ChunkStateWitnessMessage) -> Self::Result {
let _span = root_span_for_chunk(msg.0.chunk_hash.0).entered();
let _span2 = tracing::debug_span!("processing chunk state witness").entered();
if let Err(err) = self.client.process_chunk_state_witness(msg.0, None) {
tracing::error!(target: "client", ?err, "Error processing chunk state witness");
}
Expand All @@ -1854,6 +1857,8 @@ impl ClientActionHandler<ChunkEndorsementMessage> for ClientActions {

#[perf]
fn handle(&mut self, msg: ChunkEndorsementMessage) -> Self::Result {
let _span = root_span_for_chunk(msg.0.chunk_hash().0).entered();
let _span2 = tracing::debug_span!("processing chunk endorsement").entered();
if let Err(err) = self.client.process_chunk_endorsement(msg.0) {
tracing::error!(target: "client", ?err, "Error processing chunk endorsement");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_o11y::opentelemetry::root_span_for_chunk;
use near_primitives::block_body::ChunkEndorsementSignatures;
use near_primitives::checked_feature;
use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
Expand Down Expand Up @@ -89,6 +90,7 @@ impl ChunkEndorsementTracker {
};
tracing::debug!(target: "stateless_validation", ?chunk_hash, "Processing pending chunk endorsements.");
for endorsement in chunk_endorsements.values() {
let _chunk_span = root_span_for_chunk(endorsement.chunk_hash().0).entered();
if let Err(error) = self.process_chunk_endorsement(chunk_header, endorsement.clone()) {
tracing::debug!(target: "stateless_validation", ?endorsement, "Error processing pending chunk endorsement: {:?}", error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use near_chain::{Block, Chain, ChainStoreAccess};
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerMessageRequest};
use near_o11y::opentelemetry::root_span_for_chunk;
use near_pool::TransactionGroupIteratorWrapper;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::merklize;
Expand Down Expand Up @@ -116,6 +117,7 @@ impl ChunkValidator {
let runtime_adapter = self.runtime_adapter.clone();
let chunk_endorsement_tracker = self.chunk_endorsement_tracker.clone();
self.validation_spawner.spawn("stateless_validation", move || {
let _span = root_span_for_chunk(state_witness.chunk_header.chunk_hash().0).entered();
// processing_done_tracker must survive until the processing is finished.
let _processing_done_tracker_capture = processing_done_tracker;

Expand Down Expand Up @@ -684,6 +686,7 @@ impl Client {
) -> Result<ChunkStateWitness, Error> {
let witness = signed_witness.witness_bytes.decode()?;
let chunk_header = &witness.chunk_header;
let _span = root_span_for_chunk(chunk_header.chunk_hash().0).entered();
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

Expand Down
37 changes: 37 additions & 0 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use lru::LruCache;
use near_async::messaging::SendAsync;
use near_async::time;
use near_crypto::Signature;
use near_o11y::opentelemetry::root_span_for_chunk;
use near_o11y::{handler_debug_span, log_assert, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::hash::CryptoHash;
Expand Down Expand Up @@ -418,6 +419,17 @@ impl PeerActor {
PeerMessage::SyncSnapshotHosts(_) => {
metrics::SYNC_SNAPSHOT_HOSTS.with_label_values(&["sent"]).inc()
}
PeerMessage::Routed(routed) => match &routed.msg.body {
RoutedMessageBody::ChunkStateWitness(msg) => {
let _span = root_span_for_chunk(msg.chunk_hash.0).entered();
let _ = tracing::info_span!("Sending chunk state witness to peer", source=?routed.msg.author, target=?routed.msg.target).entered();
}
RoutedMessageBody::ChunkEndorsement(msg) => {
let _span = root_span_for_chunk(msg.chunk_hash().0).entered();
let _ = tracing::info_span!("Sending chunk endorsement to peer", source=?routed.msg.author, target=?routed.msg.target).entered();
}
_ => (),
},
_ => (),
};

Expand Down Expand Up @@ -983,6 +995,8 @@ impl PeerActor {
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
let _span = root_span_for_chunk(request.chunk_hash.0).entered();
let _span2 = tracing::info_span!("Received PartialEncodedChunkRequest").entered();
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
Expand All @@ -992,6 +1006,8 @@ impl PeerActor {
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
let _span = root_span_for_chunk(response.chunk_hash.0).entered();
let _span2 = tracing::info_span!("Received PartialEncodedChunkResponse").entered();
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
Expand All @@ -1001,26 +1017,36 @@ impl PeerActor {
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
let _span = root_span_for_chunk(chunk.chunk_hash().0).entered();
let _span2 = tracing::info_span!("Received VersionedPartialEncodedChunk").entered();
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
let _span = root_span_for_chunk(msg.chunk_hash.0).entered();
let _span2 = tracing::info_span!("Received PartialEncodedChunkForward").entered();
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg));
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
let _span = root_span_for_chunk(witness.chunk_hash.0).entered();
let _span2 = tracing::info_span!("Received ChunkStateWitness").entered();
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkStateWitnessAck(ack) => {
let _span = root_span_for_chunk(ack.chunk_hash.0).entered();
let _span2 = tracing::info_span!("Received ChunkStateWitnessAck").entered();
network_state.client.send_async(ChunkStateWitnessAckMessage(ack)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
let _span = root_span_for_chunk(endorsement.chunk_hash().0).entered();
let _span2 = tracing::info_span!("Received ChunkEndorsement").entered();
network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok();
None
}
Expand Down Expand Up @@ -1388,6 +1414,17 @@ impl PeerActor {
}));
}
PeerMessage::Routed(mut msg) => {
match &msg.msg.body {
RoutedMessageBody::ChunkStateWitness(inner) => {
let _span = root_span_for_chunk(inner.chunk_hash.0).entered();
let _ = tracing::info_span!("Received chunk state witness from peer", source=?msg.msg.author, target=?msg.msg.target).entered();
}
RoutedMessageBody::ChunkEndorsement(inner) => {
let _span = root_span_for_chunk(inner.chunk_hash().0).entered();
let _ = tracing::info_span!("Received chunk endorsement from peer", source=?msg.msg.author, target=?msg.msg.target).entered();
}
_ => {}
}
tracing::trace!(
target: "network",
"Received routed message from {} to {:?}.",
Expand Down
2 changes: 1 addition & 1 deletion core/o11y/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod log_config;
mod log_counter;
pub mod macros;
pub mod metrics;
mod opentelemetry;
pub mod opentelemetry;
mod reload;
mod subscriber;
pub mod testonly;
Expand Down
20 changes: 19 additions & 1 deletion core/o11y/src/opentelemetry.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::reload::TracingLayer;
use near_crypto::PublicKey;
use near_primitives_core::hash::CryptoHash;
use near_primitives_core::types::AccountId;
use opentelemetry::sdk::trace::{self, IdGenerator, Sampler};
use opentelemetry::sdk::Resource;
use opentelemetry::trace::TraceId;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use tracing::level_filters::LevelFilter;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::filter::targets::Targets;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
Expand Down Expand Up @@ -54,7 +59,9 @@ where

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint("http://34.32.208.68:5433"),
tayfunelmas marked this conversation as resolved.
Show resolved Hide resolved
)
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
Expand All @@ -75,3 +82,14 @@ pub(crate) fn get_opentelemetry_filter(opentelemetry_level: OpenTelemetryLevel)
OpenTelemetryLevel::TRACE => LevelFilter::TRACE,
})
}

pub fn root_span_for_chunk(chunk_hash: CryptoHash) -> Span {
let mut chunk_hash_first_16_bytes = [0u8; 16];
chunk_hash_first_16_bytes.copy_from_slice(&chunk_hash.0[..16]);
let trace_id = TraceId::from_bytes(chunk_hash_first_16_bytes);

let span =
tracing::info_span!( target: "chunk_tracing", parent: None, "chunk_tracing", ?chunk_hash);
span.set_trace_id(trace_id);
span
}
Loading