From ba5cca67760843b7ad4b10c51c1fe5492998aace Mon Sep 17 00:00:00 2001 From: d4m014 <271264252+d4m014@users.noreply.github.com> Date: Tue, 5 May 2026 21:22:10 +0100 Subject: [PATCH 1/5] handle same port values for API and metrics endpoints --- bin/ethlambda/src/main.rs | 28 ++++++++++++++++++++++++++-- crates/net/rpc/src/lib.rs | 32 +++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 83c768d2..7628446f 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -35,6 +35,7 @@ use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; +use ethlambda_rpc::{RpcConfig}; const ASCII_ART: &str = r#" _ _ _ _ _ @@ -107,10 +108,15 @@ async fn main() -> eyre::Result<()> { options.attestation_committee_count, ); - let api_socket = SocketAddr::new(options.http_address, options.api_port); - let metrics_socket = SocketAddr::new(options.http_address, options.metrics_port); + // let api_socket = SocketAddr::new(options.http_address, options.api_port); + // let metrics_socket = SocketAddr::new(options.http_address, options.metrics_port); let node_p2p_key = read_hex_file_bytes(&options.node_key); let p2p_socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), options.gossipsub_port); + let rpc_config = RpcConfig { + http_address: options.http_address, + api_port: options.api_port, + metrics_port: options.metrics_port, + }; println!("{ASCII_ART}"); @@ -205,6 +211,7 @@ async fn main() -> eyre::Result<()> { }) .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; +<<<<<<< Updated upstream let shutdown_token = CancellationToken::new(); let metrics_shutdown = shutdown_token.clone(); let api_shutdown = shutdown_token.clone(); @@ -218,6 +225,23 @@ async fn main() -> eyre::Result<()> { let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown) .await .inspect_err(|err| error!(%err, "API server failed")); +======= + // tokio::spawn(async move { + // let _ = ethlambda_rpc::start_metrics_server(metrics_socket) + // .await + // .inspect_err(|err| error!(%err, "Metrics server failed")); + // }); + // tokio::spawn(async move { + // let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) + // .await + // .inspect_err(|err| error!(%err, "API server failed")); + // }); + + tokio::spawn(async move { + let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator) + .await + .inspect_err(|err| error!(%err, "rpc server failed")); +>>>>>>> Stashed changes }); info!("Node initialized"); diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index e756e2f2..a9da9f30 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use axum::{ Extension, Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get, @@ -17,6 +17,7 @@ mod fork_choice; mod heap_profiling; pub mod metrics; +<<<<<<< Updated upstream pub async fn start_api_server( address: SocketAddr, store: Store, @@ -50,6 +51,35 @@ pub async fn start_metrics_server( shutdown.cancelled().await; }) .await?; +======= +pub struct RpcConfig { + pub http_address: IpAddr, + pub api_port: u16, + pub metrics_port: u16, +} + +pub async fn start_rpc_server(config: RpcConfig, store: Store, aggregator: AggregatorController) -> Result<(), std::io::Error> { + let api_router = build_api_router(store).layer(Extension(aggregator)); + let metrics_router = metrics::start_prometheus_metrics_api(); + let debug_router = build_debug_router(); + + if config.api_port == config.metrics_port { + let app = Router::new().merge(api_router).merge(metrics_router).merge(debug_router); + let addr = SocketAddr::new(config.http_address, config.api_port); + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app).await?; + } else { + let api_addr = SocketAddr::new(config.http_address, config.api_port); + let metrics_addr = SocketAddr::new(config.http_address, config.metrics_port); + let api_listener = tokio::net::TcpListener::bind(api_addr).await?; + let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?; + let metrics_app = Router::new().merge(metrics_router).merge(debug_router); + tokio::try_join!( + axum::serve(api_listener, api_router), + axum::serve(metrics_listener, metrics_app), + )?; + } +>>>>>>> Stashed changes Ok(()) } From 0803464248e0d40f7d7e9375dd00ab3509f6fad8 Mon Sep 17 00:00:00 2001 From: d4m014 <271264252+d4m014@users.noreply.github.com> Date: Tue, 5 May 2026 21:53:43 +0100 Subject: [PATCH 2/5] resolve conflicts --- bin/ethlambda/src/main.rs | 43 ++++++----------------------- crates/net/rpc/src/lib.rs | 58 ++++++++++++--------------------------- 2 files changed, 27 insertions(+), 74 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 7628446f..f635af12 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -113,10 +113,10 @@ async fn main() -> eyre::Result<()> { let node_p2p_key = read_hex_file_bytes(&options.node_key); let p2p_socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), options.gossipsub_port); let rpc_config = RpcConfig { - http_address: options.http_address, - api_port: options.api_port, - metrics_port: options.metrics_port, - }; + http_address: options.http_address, + api_port: options.api_port, + metrics_port: options.metrics_port, + }; println!("{ASCII_ART}"); @@ -211,37 +211,13 @@ async fn main() -> eyre::Result<()> { }) .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; -<<<<<<< Updated upstream let shutdown_token = CancellationToken::new(); - let metrics_shutdown = shutdown_token.clone(); - let api_shutdown = shutdown_token.clone(); - - let metrics_handle = tokio::spawn(async move { - let _ = ethlambda_rpc::start_metrics_server(metrics_socket, metrics_shutdown) - .await - .inspect_err(|err| error!(%err, "Metrics server failed")); - }); - let api_handle = tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown) - .await - .inspect_err(|err| error!(%err, "API server failed")); -======= - // tokio::spawn(async move { - // let _ = ethlambda_rpc::start_metrics_server(metrics_socket) - // .await - // .inspect_err(|err| error!(%err, "Metrics server failed")); - // }); - // tokio::spawn(async move { - // let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) - // .await - // .inspect_err(|err| error!(%err, "API server failed")); - // }); + let rpc_shutdown = shutdown_token.clone(); - tokio::spawn(async move { - let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator) + let rpc_handle = tokio::spawn(async move { + let _ = ethlambda_rpc::start_rpc_server(rpc_config, store, aggregator, rpc_shutdown) .await - .inspect_err(|err| error!(%err, "rpc server failed")); ->>>>>>> Stashed changes + .inspect_err(|err| error!(%err, "RPC server failed")); }); info!("Node initialized"); @@ -274,8 +250,7 @@ async fn main() -> eyre::Result<()> { blockchain_ref.join().await; p2p_ref.join().await; - let _ = api_handle.await; - let _ = metrics_handle.await; + let _ = rpc_handle.await; info!("Shutdown complete"); diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index a9da9f30..6c38efb1 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -17,48 +17,18 @@ mod fork_choice; mod heap_profiling; pub mod metrics; -<<<<<<< Updated upstream -pub async fn start_api_server( - address: SocketAddr, - store: Store, - aggregator: AggregatorController, - shutdown: CancellationToken, -) -> Result<(), std::io::Error> { - let api_router = build_api_router(store).layer(Extension(aggregator)); - - let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, api_router) - .with_graceful_shutdown(async move { - shutdown.cancelled().await; - }) - .await?; - - Ok(()) -} - -pub async fn start_metrics_server( - address: SocketAddr, - shutdown: CancellationToken, -) -> Result<(), std::io::Error> { - let metrics_router = metrics::start_prometheus_metrics_api(); - let debug_router = build_debug_router(); - - let app = Router::new().merge(metrics_router).merge(debug_router); - - let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, app) - .with_graceful_shutdown(async move { - shutdown.cancelled().await; - }) - .await?; -======= pub struct RpcConfig { pub http_address: IpAddr, pub api_port: u16, pub metrics_port: u16, } -pub async fn start_rpc_server(config: RpcConfig, store: Store, aggregator: AggregatorController) -> Result<(), std::io::Error> { +pub async fn start_rpc_server( + config: RpcConfig, + store: Store, + aggregator: AggregatorController, + shutdown: CancellationToken, +) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); @@ -67,19 +37,27 @@ pub async fn start_rpc_server(config: RpcConfig, store: Store, aggregator: Aggre let app = Router::new().merge(api_router).merge(metrics_router).merge(debug_router); let addr = SocketAddr::new(config.http_address, config.api_port); let listener = tokio::net::TcpListener::bind(addr).await?; - axum::serve(listener, app).await?; + axum::serve(listener, app) + .with_graceful_shutdown(async move { + shutdown.cancelled().await; + }) + .await?; } else { let api_addr = SocketAddr::new(config.http_address, config.api_port); let metrics_addr = SocketAddr::new(config.http_address, config.metrics_port); let api_listener = tokio::net::TcpListener::bind(api_addr).await?; let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?; let metrics_app = Router::new().merge(metrics_router).merge(debug_router); + let metrics_shutdown = shutdown.clone(); tokio::try_join!( - axum::serve(api_listener, api_router), - axum::serve(metrics_listener, metrics_app), + axum::serve(api_listener, api_router).with_graceful_shutdown(async move { + shutdown.cancelled().await; + }), + axum::serve(metrics_listener, metrics_app).with_graceful_shutdown(async move { + metrics_shutdown.cancelled().await; + }), )?; } ->>>>>>> Stashed changes Ok(()) } From 760471a261d6a05a6969d7f3246a121039284621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 6 May 2026 17:12:33 -0300 Subject: [PATCH 3/5] chore: remove commented code --- bin/ethlambda/src/main.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f635af12..8de096c3 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -108,8 +108,6 @@ async fn main() -> eyre::Result<()> { options.attestation_committee_count, ); - // let api_socket = SocketAddr::new(options.http_address, options.api_port); - // let metrics_socket = SocketAddr::new(options.http_address, options.metrics_port); let node_p2p_key = read_hex_file_bytes(&options.node_key); let p2p_socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), options.gossipsub_port); let rpc_config = RpcConfig { From 5ad0d867e81cdc7f04ad484b9d3b704fddfa4588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 6 May 2026 17:12:45 -0300 Subject: [PATCH 4/5] chore: fmt --- bin/ethlambda/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 8de096c3..888c6c1d 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -35,7 +35,7 @@ use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; -use ethlambda_rpc::{RpcConfig}; +use ethlambda_rpc::RpcConfig; const ASCII_ART: &str = r#" _ _ _ _ _ From 4881cc97b73f9e0c6496cbbaec220776a537117e Mon Sep 17 00:00:00 2001 From: d4m014 <271264252+d4m014@users.noreply.github.com> Date: Wed, 6 May 2026 22:24:41 +0100 Subject: [PATCH 5/5] fixes per review feedback --- bin/ethlambda/src/main.rs | 2 +- crates/net/rpc/src/lib.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 6851ee81..5c4ce16f 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -34,8 +34,8 @@ use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; -use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; use ethlambda_rpc::RpcConfig; +use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; const ASCII_ART: &str = r#" _ _ _ _ _ diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 6c38efb1..c0f0104b 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -17,6 +17,7 @@ mod fork_choice; mod heap_profiling; pub mod metrics; +#[derive(Debug, Clone)] pub struct RpcConfig { pub http_address: IpAddr, pub api_port: u16, @@ -34,7 +35,10 @@ pub async fn start_rpc_server( let debug_router = build_debug_router(); if config.api_port == config.metrics_port { - let app = Router::new().merge(api_router).merge(metrics_router).merge(debug_router); + let app = Router::new() + .merge(api_router) + .merge(metrics_router) + .merge(debug_router); let addr = SocketAddr::new(config.http_address, config.api_port); let listener = tokio::net::TcpListener::bind(addr).await?; axum::serve(listener, app) @@ -65,7 +69,7 @@ pub async fn start_rpc_server( /// Build the API router with the given store. /// /// The aggregator controller is threaded in via `Extension` by the caller -/// (see `start_api_server`) so existing store-backed handlers don't need to +/// (see `start_rpc_server`) so existing store-backed handlers don't need to /// know about it and admin handlers extract it independently. fn build_api_router(store: Store) -> Router { Router::new()