Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Mar 13, 2023
1 parent 5f24f65 commit 5deabe0
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 82 deletions.
1 change: 0 additions & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub struct ClientBuilder<T: BeaconChainTypes> {
http_metrics_config: http_metrics::Config,
slasher: Option<Arc<Slasher<T::EthSpec>>>,
eth_spec_instance: T::EthSpec,

}

impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
Expand Down
69 changes: 32 additions & 37 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use eth2::types::{
};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
use operation_pool::ReceivedPreCapella;
use parking_lot::RwLock;
Expand Down Expand Up @@ -73,7 +74,6 @@ use warp_utils::{
query::multi_key_query,
task::{blocking_json_task, blocking_task},
};
use logging::SSELoggingComponents;

const API_PREFIX: &str = "eth";

Expand Down Expand Up @@ -450,7 +450,6 @@ pub fn serve<T: BeaconChainTypes>(
let inner_components = ctx.sse_logging_components.clone();
let sse_component_filter = warp::any().map(move || inner_components.clone());


// Create a `warp` filter that provides access to local system information.
let system_info = Arc::new(RwLock::new(sysinfo::System::new()));
{
Expand Down Expand Up @@ -3566,52 +3565,48 @@ pub fn serve<T: BeaconChainTypes>(
},
);


// Subscribe to logs via Server Side Events
// /lighthouse/logs
let lighthouse_log_events = warp::path("lighthouse")
let lighthouse_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
.and_then(
|sse_component: Option<SSELoggingComponents>| {
blocking_task(move || {

if let Some(logging_components) = sse_component {
.and_then(|sse_component: Option<SSELoggingComponents>| {
blocking_task(move || {
if let Some(logging_components) = sse_component {
// Build a JSON stream
let s = BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default()
.json_data(json)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(format!("{:?}", e))
let s =
BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default().json_data(json).map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}",e),
))
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}",e),
))
}
});
});

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
return Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
));
}
})

},
);

} else {
return Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
));
}
})
});

// Define the ultimate set of routes that will be provided to the server.
let routes = warp::get()
Expand Down
6 changes: 2 additions & 4 deletions lighthouse/environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use eth2_network_config::Eth2NetworkConfig;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{future, StreamExt};

use logging::SSELoggingComponents;
use serde_derive::{Deserialize, Serialize};
use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger};
use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build};
use std::fs::create_dir_all;
use std::io::{Result as IOResult, Write};
use std::path::PathBuf;
use std::sync::Arc;
use logging::SSELoggingComponents;
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec};
Expand Down Expand Up @@ -311,7 +311,6 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
.build()
.map_err(|e| format!("Unable to build file logger: {}", e))?;


let mut log = Logger::root(Duplicate::new(stdout_logger, file_logger).fuse(), o!());

info!(
Expand All @@ -322,7 +321,7 @@ impl<E: EthSpec> EnvironmentBuilder<E> {

// If the http API is enabled, we may need to send logs to be consumed by subscribers.
if config.sse_logging {
let sse_logger = SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE);
let sse_logger = SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE);
self.sse_logging_components = Some(sse_logger.clone());

log = Logger::root(Duplicate::new(log, sse_logger).fuse(), o!());
Expand Down Expand Up @@ -378,7 +377,6 @@ impl<E: EthSpec> EnvironmentBuilder<E> {
}
}


/// An environment where Lighthouse services can run. Used to start a production beacon node or
/// validator client, or to run tests that involve logging and async task execution.
pub struct Environment<E: EthSpec> {
Expand Down
4 changes: 2 additions & 2 deletions lighthouse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ fn run<E: EthSpec>(
let sse_logging = {
if let Some(bn_matches) = matches.subcommand_matches("beacon_node") {
bn_matches.is_present("http") || bn_matches.is_present("gui")
} else if let Some(vc_matches) = matches.subcommand_matches("validator_client") {
vc_matches.is_present("http")
} else if let Some(vc_matches) = matches.subcommand_matches("validator_client") {
vc_matches.is_present("http")
} else {
false
}
Expand Down
73 changes: 35 additions & 38 deletions validator_client/src/http_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use eth2::lighthouse_vc::{
types::{self as api_types, GenericResponse, Graffiti, PublicKey, PublicKeyBytes},
};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use slog::{crit, info, warn, Logger};
Expand All @@ -29,18 +30,17 @@ use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::observe_system_health_vc;
use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use logging::SSELoggingComponents;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use warp::{
http::{
header::{HeaderValue, CONTENT_TYPE},
response::Response,
StatusCode,
},
Filter,
sse::Event,
Filter,
};

#[derive(Debug)]
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(

let api_token_path_inner = api_token_path.clone();
let api_token_path_filter = warp::any().map(move || api_token_path_inner.clone());

// Filter for SEE Logging events
let inner_components = ctx.sse_logging_components.clone();
let sse_component_filter = warp::any().map(move || inner_components.clone());
Expand Down Expand Up @@ -983,51 +983,48 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
});


// Subscribe to get VC logs via Server side events
// /lighthouse/logs
let get_log_events = warp::path("lighthouse")
let get_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(sse_component_filter)
.and_then(
|sse_component: Option<SSELoggingComponents>| {
warp_utils::task::blocking_task(move || {

if let Some(logging_components) = sse_component {
.and_then(|sse_component: Option<SSELoggingComponents>| {
warp_utils::task::blocking_task(move || {
if let Some(logging_components) = sse_component {
// Build a JSON stream
let s = BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default()
.json_data(json)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(format!("{:?}", e))
let s =
BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match data.to_json_string() {
// Send the json as a Server Sent Event
Ok(json) => Event::default().json_data(json).map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}",e),
))
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}",e),
))
}
});
});

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
return Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
));
}
})

},
);
} else {
return Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
));
}
})
});

let routes = warp::any()
.and(authorization_header_filter)
Expand Down

0 comments on commit 5deabe0

Please sign in to comment.