Skip to content

Commit

Permalink
Merge #1916
Browse files Browse the repository at this point in the history
1916: fix(log): build tracing `Subscriber` without custom macros + small fixes r=RolandSherwin a=RolandSherwin

- fix(log): use vector of `Layers` to build the `Subscriber`
  - Replaces the macros with a vector containing boxed Layers
- chore(testnet): enable otlp feature flag
- fix(log): prevent panic when we have multiple tracing subscribers
  - When we have the `otlp` feature enabled, we effectively have
  multiple subscribers consuming the logs.
  - Under rare circumstances, we get a panic with the following msg,
  `Format: was already formatted once`
  - Turns out it's because the `itertools::format()` is being called
  multiple times
  - Was not able to reproduce the issue with a minimal setup, but this
  gets rid of the panic.
- chore(log): remove unused event formatting option
  - the `.event_format()` overrides the `.with_thread_names()` option,
  hence remove it

Co-authored-by: RolandSherwin <RolandSherwin@protonmail.com>
  • Loading branch information
bors[bot] and RolandSherwin committed Dec 22, 2022
2 parents c6ac3e5 + ff4a6ae commit 24e9b56
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 78 deletions.
1 change: 0 additions & 1 deletion sn_interface/src/lib.rs
Expand Up @@ -142,7 +142,6 @@ pub fn init_logger() {
tracing_subscriber::fmt::fmt()
// NOTE: uncomment this line for pretty printed log output.
//.pretty()
.with_thread_names(true)
.with_ansi(false)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_target(false)
Expand Down
137 changes: 68 additions & 69 deletions sn_node/src/bin/sn_node/log/mod.rs
Expand Up @@ -4,116 +4,115 @@ use sn_interface::LogFormatter;
use sn_node::node::{Config, Result};

use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::filter::{EnvFilter, Targets};
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::layer::Filter;
use tracing_subscriber::{prelude::*, Registry};

#[cfg(feature = "otlp")]
macro_rules! otlp_layer {
() => {{
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
// Derive endpoints etc. from environment variables like `OTEL_EXPORTER_OTLP_ENDPOINT`
.with_env(),
)
.with_trace_config(
opentelemetry::sdk::trace::config().with_resource(Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
current_crate_str(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_INSTANCE_ID,
std::process::id().to_string(),
),
])),
)
.install_batch(opentelemetry::runtime::Tokio);

match tracer {
Ok(t) => Ok(tracing_opentelemetry::layer().with_tracer(t).with_filter(EnvFilter::from_env("RUST_LOG_OTLP"))),
Err(e) => Err(e),
}
}};
use tracing_subscriber::{
filter::{EnvFilter, Targets},
fmt as tracing_fmt,
layer::Filter,
prelude::*,
Layer, Registry,
};

#[derive(Default)]
pub struct TracingLayers {
layers: Vec<Box<dyn Layer<Registry> + Send + Sync>>,
guard: Option<WorkerGuard>,
}

macro_rules! fmt_layer {
($config:expr) => {{
impl TracingLayers {
fn fmt_layer(&mut self, config: &Config) {
// Filter by log level either from `RUST_LOG` or default to crate only.
let target_filter: Box<dyn Filter<Registry> + Send + Sync> =
if let Ok(f) = EnvFilter::try_from_default_env() {
Box::new(f)
} else {
Box::new(Targets::new().with_target(current_crate_str(), $config.verbose()))
Box::new(Targets::new().with_target(current_crate_str(), config.verbose()))
};
let mut guard: Option<WorkerGuard> = None;
let fmt_layer: Layer<Registry> = tracing_subscriber::fmt::layer()
.with_thread_names(true)
.with_ansi(false);
let fmt_layer = tracing_fmt::layer().with_ansi(false);

let fmt_layer = if let Some(log_dir) = $config.log_dir() {
if let Some(log_dir) = config.log_dir() {
println!("Starting logging to directory: {:?}", log_dir);

let (non_blocking, worker_guard) = appender::file_rotater(
log_dir,
$config.logs_max_bytes,
$config.logs_max_lines,
$config.logs_retained,
$config.logs_uncompressed,
config.logs_max_bytes,
config.logs_max_lines,
config.logs_retained,
config.logs_uncompressed,
);
guard = Some(worker_guard);
self.guard = Some(worker_guard);

let fmt_layer = fmt_layer.with_writer(non_blocking);

if $config.json_logs {
fmt_layer.json().with_filter(target_filter).boxed()
if config.json_logs {
let layer = fmt_layer.json().with_filter(target_filter).boxed();
self.layers.push(layer);
} else {
fmt_layer
let layer = fmt_layer
.event_format(LogFormatter::default())
.with_filter(target_filter)
.boxed()
.boxed();
self.layers.push(layer);
}
} else {
println!("Starting logging to stdout");

fmt_layer
let layer = fmt_layer
.with_target(false)
.event_format(LogFormatter::default())
.with_filter(target_filter)
.boxed()
.boxed();
self.layers.push(layer);
};
}

(fmt_layer, guard)
}};
#[cfg(feature = "otlp")]
fn otlp_layer(&mut self) -> Result<()> {
use opentelemetry::{
sdk::{trace, Resource},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource::{SERVICE_INSTANCE_ID, SERVICE_NAME};

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
// Derive endpoints etc. from environment variables like `OTEL_EXPORTER_OTLP_ENDPOINT`
.with_env(),
)
.with_trace_config(trace::config().with_resource(Resource::new(vec![
KeyValue::new(SERVICE_NAME, current_crate_str()),
KeyValue::new(SERVICE_INSTANCE_ID, std::process::id().to_string()),
])))
.install_batch(opentelemetry::runtime::Tokio)?;
let otlp_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(EnvFilter::from_env("RUST_LOG_OTLP"))
.boxed();
self.layers.push(otlp_layer);
Ok(())
}
}

/// Inits node logging, returning the global node guard if required.
/// This guard should be held for the life of the program.
///
/// Logging should be instantiated only once.
pub fn init_node_logging(config: &Config) -> Result<Option<WorkerGuard>> {
let reg = tracing_subscriber::registry();
let mut layers = TracingLayers::default();
layers.fmt_layer(config);

let (fmt, guard) = fmt_layer!(config);
let reg = reg.with(fmt);
#[cfg(feature = "otlp")]
layers.otlp_layer()?;

#[cfg(feature = "tokio-console")]
let reg = reg.with(console_subscriber::spawn());

#[cfg(feature = "otlp")]
let reg = reg.with(otlp_layer!()?);
layers.layers.push(console_subscriber::spawn().boxed());

reg.init();
tracing_subscriber::registry().with(layers.layers).init();

Ok(guard)
Ok(layers.guard)
}

/// Get current root module name (e.g. "sn_node")
Expand Down
12 changes: 4 additions & 8 deletions sn_node/src/node/mod.rs
Expand Up @@ -54,8 +54,8 @@ pub use qp2p::{Config as NetworkConfig, SendStream};
pub use xor_name::{Prefix, XorName, XOR_NAME_LEN}; // TODO remove pub on API update

mod core {
use crate::comm::Comm;
use crate::{
comm::Comm,
node::{
bootstrap::JoiningAsRelocated,
data::Capacity,
Expand All @@ -68,9 +68,8 @@ mod core {
},
UsedSpace,
};

use sn_consensus::Generation;
use sn_dysfunction::IssueType;

use sn_interface::{
messaging::{
signature_aggregator::SignatureAggregator,
Expand All @@ -85,9 +84,6 @@ mod core {
};

use ed25519_dalek::Keypair;
use itertools::Itertools;

use sn_consensus::Generation;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
net::SocketAddr,
Expand Down Expand Up @@ -539,10 +535,10 @@ mod core {
if new.is_elder {
let sap = self.network_knowledge.section_auth();
info!(
"Section updated: prefix: ({:b}), key: {:?}, elders: {}",
"Section updated: prefix: ({:b}), key: {:?}, elders: {:?}",
new_prefix,
new_section_key,
sap.elders().format(", ")
sap.elders_vec(),
);

// It can happen that we recieve the SAP demonstrating that we've become elders
Expand Down
1 change: 1 addition & 0 deletions testnet/Cargo.toml
Expand Up @@ -15,6 +15,7 @@ version = "0.1.0"
# required to pass on flag to node builds
chaos = []
statemap = []
otlp = []

[[bin]]
path="bin.rs"
Expand Down
4 changes: 4 additions & 0 deletions testnet/bin.rs
Expand Up @@ -118,6 +118,10 @@ async fn main() -> Result<()> {
build_args.extend(["--features", "statemap"]);
}

if cfg!(feature = "otlp") {
build_args.extend(["--features", "otlp"]);
}

if cfg!(feature = "unstable-wiremsg-debuginfo") {
build_args.extend(["--features", "unstable-wiremsg-debuginfo"]);
}
Expand Down

0 comments on commit 24e9b56

Please sign in to comment.