Skip to content

Commit

Permalink
Getting closer
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Mar 9, 2023
1 parent e9eb455 commit cd6edbc
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 143 deletions.
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 14 additions & 55 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3572,74 +3572,33 @@ pub fn serve<T: BeaconChainTypes>(
let get_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
.and_then(
|topics_res: Result<api_types::EventQuery, warp::Rejection>,
chain: Arc<BeaconChain<T>>| {
|sse_component: Option<SSELoggingComponents>| {
blocking_task(move || {
let topics = topics_res?;
// for each topic subscribed spawn a new subscription
let mut receivers = Vec::with_capacity(topics.topics.len());

if let Some(event_handler) = chain.event_handler.as_ref() {
for topic in topics.topics {
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
api_types::EventTopic::VoluntaryExit => {
event_handler.subscribe_exit()
}
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
api_types::EventTopic::ContributionAndProof => {
event_handler.subscribe_contributions()
}
api_types::EventTopic::LateHead => {
event_handler.subscribe_late_head()
}
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
};
if let Some(logging_components) = sse_component {
// Build a JSON stream
let
let s = BroadcastStream::new(sse_component.sender.subscribe()).map(|msg| {
// Serialize to json

receivers.push(BroadcastStream::new(receiver).map(|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("{:?}", e),
)),
}
}));
}


;

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
return Err(warp_utils::reject::custom_server_error(
"event handler was not initialized".to_string(),
"SSE Logging is not enabled".to_string(),
));
}

let s = futures::stream::select_all(receivers);

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
})

},
);



// Define the ultimate set of routes that will be provided to the server.
let routes = warp::get()
.and(
Expand Down
4 changes: 3 additions & 1 deletion common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
[dependencies]
slog = "2.5.2"
slog-term = "2.6.0"
tokio = "1.14.0"
lighthouse_metrics = { path = "../lighthouse_metrics" }
lazy_static = "1.4.0"
sloggers = { version = "2.1.1", features = ["json"] }
slog-async = "2.7.0"
take_mut = "0.2.2"
crossbeam-channel = "0.5.7"
parking_lot = "0.12.1"
serde = "1.0.153"
155 changes: 147 additions & 8 deletions common/logging/src/async_record.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
//! An object that can be used to pass through a channel and be cloned. It can therefore be used
//! via the broadcast channel.

use parking_lot::Mutex;
use serde::ser::SerializeMap;
use serde::serde_if_integer128;
use serde::Serialize;
use slog::{
BorrowedKV, Drain, Key, Level, OwnedKVList, Record, RecordStatic, Serializer, SingleKV, KV,
};
use std::cell::RefCell;
use std::fmt;
use std::fmt::Write;
use std::io;
use std::sync::Arc;
use take_mut::take;

thread_local! {
static TL_BUF: RefCell<String> = RefCell::new(String::with_capacity(128))
}

/// Serialized record.
#[derive(Clone)]
pub struct AsyncRecord {
Expand All @@ -16,7 +27,7 @@ pub struct AsyncRecord {
location: Box<slog::RecordLocation>,
tag: String,
logger_values: OwnedKVList,
kv: Arc<dyn KV + Send>,
kv: Arc<Mutex<dyn KV + Send>>,
}

impl AsyncRecord {
Expand All @@ -34,7 +45,7 @@ impl AsyncRecord {
location: Box::new(*record.location()),
tag: String::from(record.tag()),
logger_values: logger_values.clone(),
kv: Arc::new(ser.finish()),
kv: Arc::new(Mutex::new(ser.finish())),
}
}

Expand All @@ -46,8 +57,9 @@ impl AsyncRecord {
tag: &self.tag,
};

let kv = self.kv.lock();
drain.log(
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&self.kv)),
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))),
&self.logger_values,
)
}
Expand All @@ -60,8 +72,9 @@ impl AsyncRecord {
tag: &self.tag,
};

let kv = self.kv.lock();
f(
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&self.kv)),
&Record::new(&rs, &format_args!("{}", self.msg), BorrowedKV(&(*kv))),
&self.logger_values,
)
}
Expand Down Expand Up @@ -167,11 +180,137 @@ impl Serializer for ToSendSerializer {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
}

#[cfg(feature = "nested-values")]
fn emit_serde(&mut self, key: Key, value: &slog::SerdeValue) -> slog::Result {
let val = value.to_sendable();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
impl Serialize for AsyncRecord {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};
let mut map_serializer = SerdeSerializer::new(serializer)?;
let kv = self.kv.lock();
let message = format_args!("{}", self.msg);
let record = Record::new(&rs, &message, BorrowedKV(&(*kv)));

self.logger_values
.serialize(&record, &mut map_serializer)
.map_err(|e| serde::ser::Error::custom(e))?;
record
.kv()
.serialize(&record, &mut map_serializer)
.map_err(serde::ser::Error::custom)?;
map_serializer.end()
}
}

struct SerdeSerializer<S: serde::Serializer> {
/// Current state of map serializing: `serde::Serializer::MapState`
ser_map: S::SerializeMap,
}

impl<S: serde::Serializer> SerdeSerializer<S> {
fn new(ser: S) -> Result<Self, S::Error> {
let ser_map = ser.serialize_map(None)?;
Ok(SerdeSerializer { ser_map })
}

/// Finish serialization, and return the serializer
fn end(self) -> Result<S::Ok, S::Error> {
self.ser_map.end()
}
}

// NOTE: This is borrowed from slog_json
macro_rules! impl_m(
($s:expr, $key:expr, $val:expr) => ({
let k_s: &str = $key.as_ref();
$s.ser_map.serialize_entry(k_s, $val)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("serde serialization error: {}", e)))?;
Ok(())
});
);

impl<S> slog::Serializer for SerdeSerializer<S>
where
S: serde::Serializer,
{
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
impl_m!(self, key, &val)
}

fn emit_unit(&mut self, key: Key) -> slog::Result {
impl_m!(self, key, &())
}

fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
impl_m!(self, key, &val)
}

fn emit_none(&mut self, key: Key) -> slog::Result {
let val: Option<()> = None;
impl_m!(self, key, &val)
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
impl_m!(self, key, &val)
}
serde_if_integer128! {
fn emit_u128(&mut self, key: Key, val: u128) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_i128(&mut self, key: Key, val: i128) -> slog::Result {
impl_m!(self, key, &val)
}
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
impl_m!(self, key, &val)
}
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
TL_BUF.with(|buf| {
let mut buf = buf.borrow_mut();

buf.write_fmt(*val).unwrap();

let res = { || impl_m!(self, key, &*buf) }();
buf.clear();
res
})
}
}
4 changes: 2 additions & 2 deletions common/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::time::{Duration, Instant};
pub const MAX_MESSAGE_WIDTH: usize = 40;

pub mod async_record;
mod sse_drain;
pub use sse_drain::{SSEDrain, SSELoggingComponents};
mod sse_logging_components;
pub use sse_logging_components::SSELoggingComponents;

/// The minimum interval between log messages indicating that a queue is full.
const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
Expand Down
Loading

0 comments on commit cd6edbc

Please sign in to comment.