diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index fcd55c74..5c4ce16f 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -34,6 +34,7 @@ use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; +use ethlambda_rpc::RpcConfig; use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; const ASCII_ART: &str = r#" @@ -108,10 +109,13 @@ async fn main() -> eyre::Result<()> { ethlambda_blockchain::metrics::set_node_info("ethlambda", version::CLIENT_VERSION); ethlambda_blockchain::metrics::set_node_start_time(); - let api_socket = SocketAddr::new(options.http_address, options.api_port); - let metrics_socket = SocketAddr::new(options.http_address, options.metrics_port); let node_p2p_key = read_hex_file_bytes(&options.node_key); let p2p_socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), options.gossipsub_port); + let rpc_config = RpcConfig { + http_address: options.http_address, + api_port: options.api_port, + metrics_port: options.metrics_port, + }; println!("{ASCII_ART}"); @@ -226,18 +230,12 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; let shutdown_token = CancellationToken::new(); - let metrics_shutdown = shutdown_token.clone(); - let api_shutdown = shutdown_token.clone(); + let rpc_shutdown = shutdown_token.clone(); - let metrics_handle = tokio::spawn(async move { - let _ = ethlambda_rpc::start_metrics_server(metrics_socket, metrics_shutdown) - .await - .inspect_err(|err| error!(%err, "Metrics server failed")); - }); - let api_handle = tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown) + let rpc_handle = tokio::spawn(async move { + let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator, rpc_shutdown) .await - .inspect_err(|err| error!(%err, "API server failed")); + .inspect_err(|err| error!(%err, "RPC server failed")); }); info!("Node initialized"); @@ -270,8 +268,7 @@ async fn main() -> eyre::Result<()> { blockchain_ref.join().await; p2p_ref.join().await; - let _ = api_handle.await; - let _ = metrics_handle.await; + let _ = rpc_handle.await; info!("Shutdown complete"); diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index e756e2f2..c0f0104b 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use axum::{ Extension, Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get, @@ -17,39 +17,51 @@ mod fork_choice; mod heap_profiling; pub mod metrics; -pub async fn start_api_server( - address: SocketAddr, +#[derive(Debug, Clone)] +pub struct RpcConfig { + pub http_address: IpAddr, + pub api_port: u16, + pub metrics_port: u16, +} + +pub async fn start_rpc_server( + config: RpcConfig, store: Store, aggregator: AggregatorController, shutdown: CancellationToken, ) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); - - let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, api_router) - .with_graceful_shutdown(async move { - shutdown.cancelled().await; - }) - .await?; - - Ok(()) -} - -pub async fn start_metrics_server( - address: SocketAddr, - shutdown: CancellationToken, -) -> Result<(), std::io::Error> { let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); - let app = Router::new().merge(metrics_router).merge(debug_router); - - let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, app) - .with_graceful_shutdown(async move { - shutdown.cancelled().await; - }) - .await?; + if config.api_port == config.metrics_port { + let app = Router::new() + .merge(api_router) + .merge(metrics_router) + .merge(debug_router); + let addr = SocketAddr::new(config.http_address, config.api_port); + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app) + .with_graceful_shutdown(async move { + shutdown.cancelled().await; + }) + .await?; + } else { + let api_addr = SocketAddr::new(config.http_address, config.api_port); + let metrics_addr = SocketAddr::new(config.http_address, config.metrics_port); + let api_listener = tokio::net::TcpListener::bind(api_addr).await?; + let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?; + let metrics_app = Router::new().merge(metrics_router).merge(debug_router); + let metrics_shutdown = shutdown.clone(); + tokio::try_join!( + axum::serve(api_listener, api_router).with_graceful_shutdown(async move { + shutdown.cancelled().await; + }), + axum::serve(metrics_listener, metrics_app).with_graceful_shutdown(async move { + metrics_shutdown.cancelled().await; + }), + )?; + } Ok(()) } @@ -57,7 +69,7 @@ pub async fn start_metrics_server( /// Build the API router with the given store. /// /// The aggregator controller is threaded in via `Extension` by the caller -/// (see `start_api_server`) so existing store-backed handlers don't need to +/// (see `start_rpc_server`) so existing store-backed handlers don't need to /// know about it and admin handlers extract it independently. fn build_api_router(store: Store) -> Router { Router::new()