Skip to content

Commit

Permalink
Crossbeam version
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Mar 8, 2023
1 parent b688cfb commit e9eb455
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
[dependencies]
slog = "2.5.2"
slog-term = "2.6.0"
tokio = "1.14.0"
lighthouse_metrics = { path = "../lighthouse_metrics" }
lazy_static = "1.4.0"
sloggers = { version = "2.1.1", features = ["json"] }
slog-async = "2.7.0"
take_mut = "0.2.2"
crossbeam-channel = "0.5.7"
11 changes: 6 additions & 5 deletions common/logging/src/async_record.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! An object that can be used to pass through a channel and be cloned. It can therefore be used
//! via the broadcast channel.

use slog::{BorrowedKV, Drain, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, KV};
use slog::{
BorrowedKV, Drain, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV,
};
use std::fmt;
use std::sync::Arc;
use take_mut::take;

trait KVClone: KV + Clone + Sized {}

/// Serialized record.
#[derive(Clone)]
pub struct AsyncRecord {
Expand All @@ -15,7 +16,7 @@ pub struct AsyncRecord {
location: Box<slog::RecordLocation>,
tag: String,
logger_values: OwnedKVList,
kv: Box<dyn KVClone + Send>,
kv: Arc<dyn KV + Send>,
}

impl AsyncRecord {
Expand All @@ -33,7 +34,7 @@ impl AsyncRecord {
location: Box::new(*record.location()),
tag: String::from(record.tag()),
logger_values: logger_values.clone(),
kv: ser.finish(),
kv: Arc::new(ser.finish()),
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::time::{Duration, Instant};
pub const MAX_MESSAGE_WIDTH: usize = 40;

pub mod async_record;
pub mod sse_drain;
pub use sse_drain::SSELoggingComponents;
mod sse_drain;
pub use sse_drain::{SSEDrain, SSELoggingComponents};

/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
Expand Down
14 changes: 6 additions & 8 deletions common/logging/src/sse_drain.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
//! This module provides an implementation of `slog::Drain` that optionally writes to a channel if
//! there are subscribers to a HTTP SSE stream.

use crate::async_record::AsyncRecord;
use slog::{Drain, Level, OwnedKVList, Record, KV};
use crossbeam_channel::{Receiver, Sender, TrySendError};
use slog::{Drain, OwnedKVList, Record};
use slog_async::AsyncRecord;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};

/// The components required in the HTTP API task to receive logged events.
#[derive(Clone)]
pub struct SSELoggingComponents {
/// The channel to receive events from.
pub sender: Arc<Sender<AsyncRecord>>,
pub receiver: Receiver<AsyncRecord>,
/// Indicates if there are currently subscribers to the http API.
pub subscribers: Arc<AtomicBool>,
}
Expand All @@ -27,18 +26,17 @@ pub struct SSEDrain {
impl SSEDrain {
/// Create a new SSE drain.
pub fn new(channel_size: usize) -> (Self, SSELoggingComponents) {
let (sender, _receiver) = tokio::sync::broadcast::channel(channel_size);
let (sender, receiver) = crossbeam_channel::bounded::<AsyncRecord>(channel_size);
let subscribers = Arc::new(AtomicBool::new(false));

let drain = SSEDrain {
sender,
subscribers: subscribers.clone(),
};
let sender = Arc::new(sender);
(
drain,
SSELoggingComponents {
sender,
receiver,
subscribers,
},
)
Expand Down
2 changes: 1 addition & 1 deletion lighthouse/environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::fs::create_dir_all;
use std::io::{Result as IOResult, Write};
use std::path::PathBuf;
use std::sync::Arc;
use logging::sse_drain::{SSELoggingComponents, SSEDrain};
use logging::{SSELoggingComponents, SSEDrain};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec};
Expand Down

0 comments on commit e9eb455

Please sign in to comment.