Skip to content

Commit

Permalink
Temp progress
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Feb 8, 2023
1 parent 4d07e40 commit 035ca37
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::time::{Duration, Instant};

pub const MAX_MESSAGE_WIDTH: usize = 40;

pub use mod sse_drain;

/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);

Expand Down
62 changes: 62 additions & 0 deletions common/logging/src/sse_drain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! This module provides an implementation of `slog::Drain` that optionally writes to a channel if
//! there are subscribers to a HTTP SSE stream.

use crossbeam_channel::mpsc::{Sender, Receiver};
use slog_async::AsyncRecord;

/// The components required in the HTTP API task to receive logged events.
pub struct SSELoggingComponents {
/// The channel to receive events from.
pub receiver: Receiver<AsyncRecord>,
/// Indicates if there are currently subscribers to the http API.
pub subscribers: Arc<AtomicBool>,
}


/// An slog drain used to pass logs to the SSE event stream in the HTTP API.
pub struct SSEDrain {
/// The channel to send events to.
sender: mpsc::Sender<AsyncRecord>,
/// Indicates if there are currently subscribers to the http API.
pub subscribers: Arc<AtomicBool>,
}

impl SSEDrain {
/// Create a new SSE drain.
pub fn new(channel_size: usize) -> (Self, SSELoggingComponents)) {

let (sender, receiver) = crossbeam_channel::bounded(channel_size);
let subscribers = Arc::new(AtomicBool::new(false));

let drain = SSEDrain {
sender,
subscribers,
}
(drain, SSELoggingComponents { receiver, subscribers })
}
}

impl Drain for SSEDrain {
type Ok = ();
type Err = &'static str;

fn log(&self, record: &Record, logger_values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
if !subscribers.load(Ordering::Relaxed) {
return Ok(()); // Drop the logs, there are no subscribers
}

// There are subscribers, attempt to send the logs
match self
.sender
.try_send(AsyncRecord::from(record, logger_values))
{
Ok(()) => {} // Everything got sent
Err(TrySendError::Full(_failed_log)) => {} // Ignore dropped logs

Err(TrySendError::Disconnected(_failed_log)) => {
return Err("Channel Disconnected");
}
}
Ok(())
}
}
22 changes: 21 additions & 1 deletion lighthouse/environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fs::create_dir_all;
use std::io::{Result as IOResult, Write};
use std::path::PathBuf;
use std::sync::Arc;
use logging::SSELoggingComponents;
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec};
Expand All @@ -36,6 +37,7 @@ use {futures::channel::oneshot, std::cell::RefCell};
pub use task_executor::test_utils::null_logger;

const LOG_CHANNEL_SIZE: usize = 2048;
const SSE_LOG_CHANNEL_SIZE: usize = 2048;
/// The maximum time in seconds the client will wait for all internal tasks to shutdown.
const MAXIMUM_SHUTDOWN_TIME: u64 = 15;

Expand All @@ -57,6 +59,7 @@ pub struct LoggerConfig {
pub max_log_number: usize,
pub compression: bool,
pub is_restricted: bool,
pub sse_logging: bool,
}
impl Default for LoggerConfig {
fn default() -> Self {
Expand All @@ -72,6 +75,7 @@ impl Default for LoggerConfig {
max_log_number: 5,
compression: false,
is_restricted: true,
sse_logging: false,
}
}
}
Expand All @@ -80,6 +84,7 @@ impl Default for LoggerConfig {
pub struct EnvironmentBuilder<E: EthSpec> {
runtime: Option<Arc<Runtime>>,
log: Option<Logger>,
sse_logging_components: Option<SSELoggingComponents>,
eth_spec_instance: E,
eth2_config: Eth2Config,
eth2_network_config: Option<Eth2NetworkConfig>,
Expand All @@ -91,6 +96,7 @@ impl EnvironmentBuilder<MinimalEthSpec> {
Self {
runtime: None,
log: None,
sse_logging_components: None,
eth_spec_instance: MinimalEthSpec,
eth2_config: Eth2Config::minimal(),
eth2_network_config: None,
Expand All @@ -104,6 +110,7 @@ impl EnvironmentBuilder<MainnetEthSpec> {
Self {
runtime: None,
log: None,
sse_logging_components: None,
eth_spec_instance: MainnetEthSpec,
eth2_config: Eth2Config::mainnet(),
eth2_network_config: None,
Expand All @@ -117,6 +124,7 @@ impl EnvironmentBuilder<GnosisEthSpec> {
Self {
runtime: None,
log: None,
sse_logging_components: None,
eth_spec_instance: GnosisEthSpec,
eth2_config: Eth2Config::gnosis(),
eth2_network_config: None,
Expand Down Expand Up @@ -265,14 +273,22 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
.build()
.map_err(|e| format!("Unable to build file logger: {}", e))?;

let log = Logger::root(Duplicate::new(stdout_logger, file_logger).fuse(), o!());

let mut log = Logger::root(Duplicate::new(stdout_logger, file_logger).fuse(), o!());

info!(
log,
"Logging to file";
"path" => format!("{:?}", path)
);

// If the http API is enabled, we may need to send logs to be consumed by subscribers.
if config.sse_logging {
let (sse_logger, components) = SSEDrain::new(SSE_LOG_CHANNEL_SIZE);
self.sse_logging_components = Some(components);

log = Logger::root(Duplicate::new(log, sse_logger).fuse(), o!());
}
self.log = Some(log);

Ok(self)
Expand Down Expand Up @@ -315,6 +331,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
signal: Some(signal),
exit,
log: self.log.ok_or("Cannot build environment without log")?,
sse_logging_components: self.sse_logging_components,
eth_spec_instance: self.eth_spec_instance,
eth2_config: self.eth2_config,
eth2_network_config: self.eth2_network_config.map(Arc::new),
Expand All @@ -332,6 +349,7 @@ pub struct RuntimeContext<E: EthSpec> {
pub eth_spec_instance: E,
pub eth2_config: Eth2Config,
pub eth2_network_config: Option<Arc<Eth2NetworkConfig>>,
pub sse_logging_components: Option<SSEComponents>,
}

impl<E: EthSpec> RuntimeContext<E> {
Expand All @@ -344,6 +362,7 @@ impl<E: EthSpec> RuntimeContext<E> {
eth_spec_instance: self.eth_spec_instance.clone(),
eth2_config: self.eth2_config.clone(),
eth2_network_config: self.eth2_network_config.clone(),
sse_logging_components: None, // We don't clone the
}
}

Expand All @@ -369,6 +388,7 @@ pub struct Environment<E: EthSpec> {
signal: Option<exit_future::Signal>,
exit: exit_future::Exit,
log: Logger,
sse_logging_components: Option<SSEComponents>,
eth_spec_instance: E,
pub eth2_config: Eth2Config,
pub eth2_network_config: Option<Arc<Eth2NetworkConfig>>,
Expand Down
1 change: 1 addition & 0 deletions lighthouse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ fn run<E: EthSpec>(
max_log_number: logfile_max_number,
compression: logfile_compress,
is_restricted: logfile_restricted,
sse_logging: matches.is_present("http"),
};

let builder = environment_builder.initialize_logger(logger_config.clone())?;
Expand Down

0 comments on commit 035ca37

Please sign in to comment.