Skip to content
This repository has been archived by the owner on Jan 19, 2023. It is now read-only.

Commit

Permalink
undo json logging changes
Browse files Browse the repository at this point in the history
  • Loading branch information
roshaans committed Nov 29, 2022
1 parent 676d348 commit c05f926
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 120 deletions.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ syn = "1.0.90"
tempfile = "3.3.0"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1" }
tracing = { version = "0.1.36", features = ["std"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.11", features = ["fmt", "std", "env-filter"] }
tracing-appender = "0.2.2"
prometheus = "0.13.1"
atty = "0.2"
quote = "1.0.17"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
aws-types= "0.13.0"
aws-sdk-s3="0.13.0"
chrono = "0.4.19"
tracing-bunyan-formatter = "0.3.4"
actix-web = "=4.0.1"

near-jsonrpc-primitives = "0.14.0"
Expand Down
39 changes: 37 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_primitives::utils::from_timestamp;
use std::env;
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;
use tracing_utils::DefaultSubcriberGuard;
mod configs;
mod db_adapters;
mod metrics_server;
Expand Down Expand Up @@ -54,8 +55,7 @@ async fn main() -> anyhow::Result<()> {

let pool = sqlx::PgPool::connect(&env::var("DATABASE_URL")?).await?;

let env_filter = EnvFilter::new("near_lake_framework=info,indexer_events=info");
let _subscriber = tracing_utils::init_tracing(env_filter).await;
let _writer_guard = init_tracing();

tracing::info!(target: LOGGING_PREFIX, "Chain_id: {}", opts.chain_id);

Expand Down Expand Up @@ -145,3 +145,38 @@ async fn handle_streamer_message(

Ok(streamer_message.block.header.height)
}

fn init_tracing() -> DefaultSubcriberGuard {
let mut env_filter = EnvFilter::new("near_lake_framework=info,indexer_events=info");

if let Ok(rust_log) = env::var("RUST_LOG") {
if !rust_log.is_empty() {
for directive in rust_log.split(',').filter_map(|s| match s.parse() {
Ok(directive) => Some(directive),
Err(err) => {
tracing::warn!(
target: crate::LOGGING_PREFIX,
"Ignoring directive `{}`: {}",
s,
err
);
None
}
}) {
env_filter = env_filter.add_directive(directive);
}
}
}

let (non_blocking_writer, _guard) = tracing_appender::non_blocking(std::io::stderr());

let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(env_filter)
.with_writer(non_blocking_writer)
.finish();

DefaultSubcriberGuard {
subscriber_guard: tracing::subscriber::set_default(subscriber),
writer_guard: _guard,
}
}
125 changes: 11 additions & 114 deletions src/tracing_utils.rs
Original file line number Diff line number Diff line change
@@ -1,119 +1,16 @@
pub use {tracing, tracing_appender, tracing_subscriber};

use std::env;
use tracing_appender::non_blocking::NonBlocking;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::filter::Filtered;
use tracing_subscriber::layer::{Layered, SubscriberExt};
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{fmt, EnvFilter, Layer};

type LogLayer<Inner> = Layered<
Filtered<
fmt::Layer<Inner, fmt::format::DefaultFields, fmt::format::Format, NonBlocking>,
EnvFilter,
Inner,
>,
Inner,
>;

/// The resource representing a registered subscriber for non_blocking tracing logger
///
/// since the non_blocking_writer writes to a new thread "at a later point in time",
/// to ensure that all logs get flushed out in case of panics, we return the writer_guard
/// which when dropped, immediately flushes whatever non_blocking_writer has in its
/// cache to its intended place, that could be a file or a std_out.
///
/// https://docs.rs/tracing-appender/0.1.1/tracing_appender/non_blocking/struct.WorkerGuard.html
///
/// Once dropped, the subscriber is unregistered, and the output is flushed. Any messages output
/// after this value is dropped will be delivered to a previously active subscriber, if any.
pub struct DefaultSubscriberGuard<S> {
// We must first drop the `local_subscriber_guard` so that no new messages are delivered to
// this subscriber while we take care of flushing the messages already in queue. If dropped the
// other way around, the events/spans generated while the subscriber drop guard runs would be
// lost.
subscriber: Option<S>,
#[allow(dead_code)]
writer_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}

impl<S: tracing::Subscriber + Send + Sync> DefaultSubscriberGuard<S> {
/// Register this default subscriber globally , for all threads.
///
/// Must not be called more than once. Mutually exclusive with `Self::local`.
pub fn global(mut self) -> Self {
if let Some(subscriber) = self.subscriber.take() {
tracing::subscriber::set_global_default(subscriber)
.expect("could not set a global subscriber");
} else {
panic!("trying to set a default subscriber that has been already taken")
}
self
}
}

fn is_terminal() -> bool {
// Crate `atty` provides a platform-independent way of checking whether the output is a tty.
atty::is(atty::Stream::Stderr)
}

fn add_non_blocking_log_layer<S>(
filter: EnvFilter,
writer: NonBlocking,
ansi: bool,
subscriber: S,
) -> LogLayer<S>
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let layer = fmt::layer()
.with_ansi(ansi)
.with_writer(writer)
.with_filter(filter);

subscriber.with(layer)
}

pub fn default_subscriber_with_non_blocking_layer(
env_filter: EnvFilter,
) -> DefaultSubscriberGuard<impl tracing::Subscriber + Send + Sync> {
let color_output = std::env::var_os("NO_COLOR").is_none() && is_terminal();

let stderr = std::io::stderr();
let lined_stderr = std::io::LineWriter::new(stderr);
let (writer, writer_guard) = tracing_appender::non_blocking(lined_stderr);

let formatting_layer = BunyanFormattingLayer::new("indexer-events".into(), std::io::stdout);

let subscriber = tracing_subscriber::registry();

let subscriber = add_non_blocking_log_layer(env_filter, writer, color_output, subscriber);

let subscriber = subscriber.with(JsonStorageLayer).with(formatting_layer);

DefaultSubscriberGuard {
subscriber: Some(subscriber),
writer_guard: Some(writer_guard),
}
}

pub async fn init_tracing(
mut env_filter: EnvFilter,
) -> DefaultSubscriberGuard<impl tracing::Subscriber + Send + Sync> {
if let Ok(rust_log) = env::var("RUST_LOG") {
if !rust_log.is_empty() {
for directive in rust_log.split(',').filter_map(|s| match s.parse() {
Ok(directive) => Some(directive),
Err(err) => {
tracing::warn!(
target: crate::LOGGING_PREFIX,
"Ignoring directive `{}`: {}",
s,
err
);
None
}
}) {
env_filter = env_filter.add_directive(directive);
}
}
}

let subscriber = default_subscriber_with_non_blocking_layer(env_filter);

subscriber.global()
#[allow(dead_code)]
pub struct DefaultSubcriberGuard {
pub subscriber_guard: tracing::subscriber::DefaultGuard,
pub writer_guard: tracing_appender::non_blocking::WorkerGuard,
}

0 comments on commit c05f926

Please sign in to comment.