Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt};

use ethlambda_blockchain::BlockChain;
use ethlambda_rpc::RpcConfig;
use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend};

const ASCII_ART: &str = r#"
Expand Down Expand Up @@ -108,10 +109,13 @@ async fn main() -> eyre::Result<()> {
ethlambda_blockchain::metrics::set_node_info("ethlambda", version::CLIENT_VERSION);
ethlambda_blockchain::metrics::set_node_start_time();

let api_socket = SocketAddr::new(options.http_address, options.api_port);
let metrics_socket = SocketAddr::new(options.http_address, options.metrics_port);
let node_p2p_key = read_hex_file_bytes(&options.node_key);
let p2p_socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), options.gossipsub_port);
let rpc_config = RpcConfig {
http_address: options.http_address,
api_port: options.api_port,
metrics_port: options.metrics_port,
};

println!("{ASCII_ART}");

Expand Down Expand Up @@ -226,18 +230,12 @@ async fn main() -> eyre::Result<()> {
.inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?;

let shutdown_token = CancellationToken::new();
let metrics_shutdown = shutdown_token.clone();
let api_shutdown = shutdown_token.clone();
let rpc_shutdown = shutdown_token.clone();

let metrics_handle = tokio::spawn(async move {
let _ = ethlambda_rpc::start_metrics_server(metrics_socket, metrics_shutdown)
.await
.inspect_err(|err| error!(%err, "Metrics server failed"));
});
let api_handle = tokio::spawn(async move {
let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown)
let rpc_handle = tokio::spawn(async move {
let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator, rpc_shutdown)
.await
.inspect_err(|err| error!(%err, "API server failed"));
.inspect_err(|err| error!(%err, "RPC server failed"));
});

info!("Node initialized");
Expand Down Expand Up @@ -270,8 +268,7 @@ async fn main() -> eyre::Result<()> {

blockchain_ref.join().await;
p2p_ref.join().await;
let _ = api_handle.await;
let _ = metrics_handle.await;
let _ = rpc_handle.await;

info!("Shutdown complete");

Expand Down
66 changes: 39 additions & 27 deletions crates/net/rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};

use axum::{
Extension, Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get,
Expand All @@ -17,47 +17,59 @@ mod fork_choice;
mod heap_profiling;
pub mod metrics;

pub async fn start_api_server(
address: SocketAddr,
#[derive(Debug, Clone)]
pub struct RpcConfig {
pub http_address: IpAddr,
pub api_port: u16,
pub metrics_port: u16,
}

pub async fn start_rpc_server(
config: RpcConfig,
store: Store,
aggregator: AggregatorController,
shutdown: CancellationToken,
) -> Result<(), std::io::Error> {
let api_router = build_api_router(store).layer(Extension(aggregator));

let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, api_router)
.with_graceful_shutdown(async move {
shutdown.cancelled().await;
})
.await?;

Ok(())
}

pub async fn start_metrics_server(
address: SocketAddr,
shutdown: CancellationToken,
) -> Result<(), std::io::Error> {
let metrics_router = metrics::start_prometheus_metrics_api();
let debug_router = build_debug_router();

let app = Router::new().merge(metrics_router).merge(debug_router);

let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
shutdown.cancelled().await;
})
.await?;
if config.api_port == config.metrics_port {
let app = Router::new()
.merge(api_router)
.merge(metrics_router)
.merge(debug_router);
let addr = SocketAddr::new(config.http_address, config.api_port);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
shutdown.cancelled().await;
})
.await?;
} else {
let api_addr = SocketAddr::new(config.http_address, config.api_port);
let metrics_addr = SocketAddr::new(config.http_address, config.metrics_port);
let api_listener = tokio::net::TcpListener::bind(api_addr).await?;
let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?;
let metrics_app = Router::new().merge(metrics_router).merge(debug_router);
let metrics_shutdown = shutdown.clone();
tokio::try_join!(
axum::serve(api_listener, api_router).with_graceful_shutdown(async move {
shutdown.cancelled().await;
}),
axum::serve(metrics_listener, metrics_app).with_graceful_shutdown(async move {
metrics_shutdown.cancelled().await;
}),
)?;
}

Ok(())
}

/// Build the API router with the given store.
///
/// The aggregator controller is threaded in via `Extension` by the caller
/// (see `start_api_server`) so existing store-backed handlers don't need to
/// (see `start_rpc_server`) so existing store-backed handlers don't need to
/// know about it and admin handlers extract it independently.
fn build_api_router(store: Store) -> Router {
Router::new()
Expand Down