From a30913649718af2c3e7bb1413485721fc1de0b78 Mon Sep 17 00:00:00 2001 From: obchain Date: Wed, 22 Apr 2026 16:12:34 +0530 Subject: [PATCH 01/11] chore(repo): enforce markdown-only-README policy in .gitignore Block every .md from being tracked except README.md, and ignore reference HTML under docs/ so local-only architecture diagrams cannot leak into commits. --- .gitignore | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.gitignore b/.gitignore index 1381a72..2dc6436 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,10 @@ .claude/ docs/context.md NOTES.md + +# Markdown policy: only README.md is ever pushed; everything else is local-only. +*.md +!README.md + +# Reference docs copied from backups — local-only, not pushed. +docs/*.html From f17045ef76ed061b2ca6c86612b2dbc6a9d018ff Mon Sep 17 00:00:00 2001 From: obchain Date: Wed, 22 Apr 2026 16:21:44 +0530 Subject: [PATCH 02/11] feat(metrics): Prometheus exporter + pipeline instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `charon-metrics` crate: a Prometheus text-format HTTP endpoint on a configurable bind address (default 0.0.0.0:9091, off the Prometheus server default of 9090 so a local compose stack doesn't collide). Metric names live as `const &str` so dashboard JSON and alert rules stay in sync with call sites through a single source of truth. Instrument the per-block pipeline in `charon-cli`: - blocks scanned (counter, per chain) - position bucket counts (gauge, per chain × {healthy, near_liq, liquidatable}) - block duration (histogram, per chain, seconds) - simulation outcomes (counter, per chain × {ok, revert, error}) - opportunities queued / dropped by stage (counters) - per-opportunity profit (histogram, USD cents) - queue depth (gauge) - build info (labels: version, git_sha) Config surface: new `[metrics]` block in TOML with `enabled` and `bind`; both fall back to sane defaults so existing configs keep working without edits. Closes #20. --- Cargo.lock | 161 ++++++++++++++++++++ Cargo.toml | 2 + config/default.toml | 7 + crates/charon-cli/Cargo.toml | 1 + crates/charon-cli/src/main.rs | 46 +++++- crates/charon-core/src/config.rs | 36 +++++ crates/charon-core/src/lib.rs | 2 +- crates/charon-metrics/Cargo.toml | 13 ++ crates/charon-metrics/src/lib.rs | 250 +++++++++++++++++++++++++++++++ 9 files changed, 513 insertions(+), 5 deletions(-) create mode 100644 crates/charon-metrics/Cargo.toml create mode 100644 crates/charon-metrics/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 80866a9..954a03a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,6 +1130,7 @@ dependencies = [ "charon-core", "charon-executor", "charon-flashloan", + "charon-metrics", "charon-protocols", "charon-scanner", "clap", @@ -1174,6 +1175,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "charon-metrics" +version = "0.1.0" +dependencies = [ + "anyhow", + "metrics", + "metrics-exporter-prometheus", + "tokio", + "tracing", +] + [[package]] name = "charon-protocols" version = "0.1.0" @@ -1320,6 +1332,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1832,6 +1853,25 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.13.2" @@ -1928,6 +1968,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.9.0" @@ -1938,9 +1984,11 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2334,6 +2382,52 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034" +dependencies = [ + "base64", + "http-body-util", + "hyper", + "hyper-util", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror 1.0.69", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.5", + "metrics", + "quanta", + "rand 0.9.4", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mio" version = "1.2.0" @@ -2631,6 +2725,12 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" @@ -2729,6 +2829,21 @@ dependencies = [ "unarray", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2831,6 +2946,24 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.5", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "recvmsg" version = "1.0.0" @@ -3302,6 +3435,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.12" @@ -4127,6 +4266,28 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72069c3113ab32ab29e5584db3c6ec55d416895e60715417b5b883a357c3e471" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 39a41d7..d4364d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/charon-core", "crates/charon-executor", "crates/charon-flashloan", + "crates/charon-metrics", "crates/charon-protocols", "crates/charon-scanner", "crates/charon-cli", @@ -53,5 +54,6 @@ dotenvy = "0.15" charon-core = { path = "crates/charon-core" } charon-executor = { path = "crates/charon-executor" } charon-flashloan = { path = "crates/charon-flashloan" } +charon-metrics = { path = "crates/charon-metrics" } charon-protocols = { path = "crates/charon-protocols" } charon-scanner = { path = "crates/charon-scanner" } diff --git a/config/default.toml b/config/default.toml index 5fdd6e9..97d38ec 100644 --- a/config/default.toml +++ b/config/default.toml @@ -45,6 +45,13 @@ chain = "bnb" # Placeholder — replaced once CharonLiquidator.sol is deployed on BSC. contract_address = "0x0000000000000000000000000000000000000000" +# ── Prometheus metrics exporter ─────────────────────────────────────────── +# Exposes `/metrics` in Prometheus text format for Grafana / Grafana +# Cloud scraping. Port 9091 avoids the Prometheus-server default (9090). +[metrics] +enabled = true +bind = "0.0.0.0:9091" + # ── Chainlink price feeds (per chain, per asset symbol) ─────────────────── # Only feeds listed here are polled by the PriceCache. Add more as new # Venus markets become relevant. Feed addresses from docs.chain.link. diff --git a/crates/charon-cli/Cargo.toml b/crates/charon-cli/Cargo.toml index 2de0392..caaef57 100644 --- a/crates/charon-cli/Cargo.toml +++ b/crates/charon-cli/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" charon-core = { workspace = true } charon-executor = { workspace = true } charon-flashloan = { workspace = true } +charon-metrics = { workspace = true } charon-protocols = { workspace = true } charon-scanner = { workspace = true } alloy = { workspace = true } diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 9e3ae1c..1b28dbb 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -19,6 +19,7 @@ use charon_core::{ }; use charon_executor::{Simulator, TxBuilder}; use charon_flashloan::{AaveFlashLoan, FlashLoanRouter}; +use charon_metrics::{bucket, drop_stage, sim_result}; use charon_protocols::VenusAdapter; use charon_scanner::{ BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, PriceCache, @@ -103,6 +104,20 @@ async fn main() -> Result<()> { "config loaded" ); + // Prometheus exporter — install the global recorder before any + // subsystem records a metric. Disabled by the operator via + // `[metrics] enabled = false` turns the bot into a zero-overhead + // one-shot, which is handy for `test-connection` smoke runs. + if config.metrics.enabled { + charon_metrics::init(config.metrics.bind).await?; + charon_metrics::set_build_info( + env!("CARGO_PKG_VERSION"), + option_env!("CHARON_GIT_SHA").unwrap_or("unknown"), + ); + } else { + info!("metrics disabled via config"); + } + match cli.command { Command::Listen { borrowers } => run_listen(config, borrowers).await?, Command::TestConnection { chain } => { @@ -302,6 +317,8 @@ async fn process_block( ) { let start = std::time::Instant::now(); + charon_metrics::record_block_scanned(&chain); + // 1. Adapter — fetch raw positions for the tracked borrower list. let positions = match adapter.fetch_positions(borrowers).await { Ok(p) => p, @@ -315,11 +332,16 @@ async fn process_block( scanner.upsert(positions); let counts = scanner.bucket_counts(); + charon_metrics::set_position_bucket(&chain, bucket::HEALTHY, counts.healthy as u64); + charon_metrics::set_position_bucket(&chain, bucket::NEAR_LIQ, counts.near_liquidation as u64); + charon_metrics::set_position_bucket(&chain, bucket::LIQUIDATABLE, counts.liquidatable as u64); + // 3. Per-liquidatable: route flash loan, calc profit, build, simulate, queue. let liquidatable = scanner.liquidatable(); let mut queued = 0usize; for pos in liquidatable { match process_opportunity( + &chain, &pos, adapter.as_ref(), router.as_ref(), @@ -334,7 +356,10 @@ async fn process_block( { Ok(true) => queued += 1, Ok(false) => {} - Err(err) => debug!(borrower = %pos.borrower, error = ?err, "opportunity dropped"), + Err(err) => { + charon_metrics::record_opportunity_dropped(&chain, drop_stage::BUILD); + debug!(borrower = %pos.borrower, error = ?err, "opportunity dropped"); + } } } @@ -343,6 +368,9 @@ async fn process_block( let queue_len = q.len(); drop(q); + charon_metrics::set_queue_depth(queue_len as u64); + charon_metrics::observe_block_duration(&chain, start.elapsed().as_secs_f64()); + info!( chain = %chain, block, @@ -364,6 +392,7 @@ async fn process_block( /// failures. #[allow(clippy::too_many_arguments)] async fn process_opportunity( + chain: &str, pos: &charon_core::Position, adapter: &VenusAdapter, router: &FlashLoanRouter, @@ -381,6 +410,7 @@ async fn process_opportunity( // b. Router: pick cheapest flash-loan source. let Some(quote) = router.route(pos.debt_token, repay).await else { + charon_metrics::record_opportunity_dropped(chain, drop_stage::ROUTER); return Ok(false); }; @@ -400,6 +430,7 @@ async fn process_opportunity( let net = match calculate_profit(&profit_inputs, min_profit_usd) { Ok(n) => n, Err(err) => { + charon_metrics::record_opportunity_dropped(chain, drop_stage::PROFIT); debug!(borrower = %pos.borrower, error = ?err, "profit gate dropped"); return Ok(false); } @@ -429,15 +460,22 @@ async fn process_opportunity( // alone so dry-runs still surface ranked candidates. if let (Some(builder), Some(sim)) = (tx_builder, simulator) { let calldata = builder.encode_calldata(&opp, ¶ms)?; - if let Err(err) = sim.simulate(provider, calldata).await { - debug!(borrower = %pos.borrower, error = ?err, "simulation gate dropped"); - return Ok(false); + match sim.simulate(provider, calldata).await { + Ok(()) => charon_metrics::record_simulation(chain, sim_result::OK), + Err(err) => { + charon_metrics::record_simulation(chain, sim_result::REVERT); + charon_metrics::record_opportunity_dropped(chain, drop_stage::SIMULATION); + debug!(borrower = %pos.borrower, error = ?err, "simulation gate dropped"); + return Ok(false); + } } } // f. Push to the profit-ordered queue. + let profit_cents = opp.net_profit_usd_cents; let mut q = queue.lock().await; q.push(opp, queued_at_block); + charon_metrics::record_opportunity_queued(chain, profit_cents); Ok(true) } diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index 012b06c..2f74d45 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -10,6 +10,7 @@ use alloy::primitives::Address; use anyhow::{Context, anyhow}; use serde::Deserialize; use std::collections::HashMap; +use std::net::SocketAddr; use std::path::Path; /// Top-level Charon config loaded from `config/default.toml`. @@ -29,6 +30,41 @@ pub struct Config { /// configured, scanner falls back to protocol oracle. #[serde(default)] pub chainlink: HashMap>, + /// Prometheus exporter configuration. Missing `[metrics]` block ⇒ + /// defaults from [`MetricsConfig::default`] (enabled, port 9091). + #[serde(default)] + pub metrics: MetricsConfig, +} + +/// Prometheus exporter configuration. +#[derive(Debug, Clone, Deserialize)] +pub struct MetricsConfig { + /// Start the exporter at bot startup. Set to `false` to run charon + /// with zero metrics overhead (e.g. one-shot debug runs). + #[serde(default = "default_metrics_enabled")] + pub enabled: bool, + /// Bind address for the `/metrics` HTTP listener. `0.0.0.0:9091` + /// keeps it off the Prometheus-server default port (`9090`) so a + /// local compose stack doesn't collide. + #[serde(default = "default_metrics_bind")] + pub bind: SocketAddr, +} + +impl Default for MetricsConfig { + fn default() -> Self { + Self { + enabled: default_metrics_enabled(), + bind: default_metrics_bind(), + } + } +} + +fn default_metrics_enabled() -> bool { + true +} + +fn default_metrics_bind() -> SocketAddr { + "0.0.0.0:9091".parse().expect("valid default metrics bind") } /// Bot-level knobs — thresholds and intervals. diff --git a/crates/charon-core/src/lib.rs b/crates/charon-core/src/lib.rs index 0c8ab52..692ff7f 100644 --- a/crates/charon-core/src/lib.rs +++ b/crates/charon-core/src/lib.rs @@ -7,7 +7,7 @@ pub mod queue; pub mod traits; pub mod types; -pub use config::Config; +pub use config::{Config, MetricsConfig}; pub use flashloan::{FlashLoanProvider, FlashLoanQuote}; pub use profit::{NetProfit, ProfitInputs, calculate_profit}; pub use queue::{DEFAULT_TTL_BLOCKS, OpportunityQueue}; diff --git a/crates/charon-metrics/Cargo.toml b/crates/charon-metrics/Cargo.toml new file mode 100644 index 0000000..8be39f1 --- /dev/null +++ b/crates/charon-metrics/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "charon-metrics" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Prometheus-compatible metrics exporter and metric constants for Charon" + +[dependencies] +anyhow = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +metrics = "0.24" +metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] } diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs new file mode 100644 index 0000000..d182be5 --- /dev/null +++ b/crates/charon-metrics/src/lib.rs @@ -0,0 +1,250 @@ +//! Prometheus-compatible metrics surface for Charon. +//! +//! The exporter listens on a configurable `SocketAddr` (default +//! `0.0.0.0:9091`) and serves a `/metrics` endpoint in the Prometheus +//! text format. All metric names are kept as `const &str` constants in +//! [`names`] so call sites and dashboard JSON stay in lock-step with a +//! single source of truth. +//! +//! ```no_run +//! use charon_metrics::{init, names, record_block_scanned}; +//! # async fn demo() -> anyhow::Result<()> { +//! init("0.0.0.0:9091".parse()?).await?; +//! record_block_scanned("bnb"); +//! # Ok(()) +//! # } +//! ``` + +use std::net::SocketAddr; + +use anyhow::{Context, Result}; +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +use metrics_exporter_prometheus::PrometheusBuilder; +use tracing::info; + +/// Single-source-of-truth metric names. Kept as constants so call +/// sites, dashboard JSON, and alert rules refer to the same strings. +pub mod names { + // Scanner + pub const SCANNER_BLOCKS_TOTAL: &str = "charon_scanner_blocks_total"; + pub const SCANNER_POSITIONS: &str = "charon_scanner_positions"; + + // Pipeline + pub const PIPELINE_BLOCK_DURATION_SECONDS: &str = "charon_pipeline_block_duration_seconds"; + + // Executor + pub const EXECUTOR_SIMULATIONS_TOTAL: &str = "charon_executor_simulations_total"; + pub const EXECUTOR_OPPS_QUEUED_TOTAL: &str = "charon_executor_opportunities_queued_total"; + pub const EXECUTOR_OPPS_DROPPED_TOTAL: &str = "charon_executor_opportunities_dropped_total"; + pub const EXECUTOR_PROFIT_USD_CENTS: &str = "charon_executor_profit_usd_cents"; + pub const EXECUTOR_QUEUE_DEPTH: &str = "charon_executor_queue_depth"; + + // Build / runtime + pub const BUILD_INFO: &str = "charon_build_info"; +} + +/// Position classification bucket used as the `bucket` label on +/// `charon_scanner_positions`. +pub mod bucket { + pub const HEALTHY: &str = "healthy"; + pub const NEAR_LIQ: &str = "near_liq"; + pub const LIQUIDATABLE: &str = "liquidatable"; +} + +/// Simulation outcome used as the `result` label on +/// `charon_executor_simulations_total`. +pub mod sim_result { + pub const OK: &str = "ok"; + pub const REVERT: &str = "revert"; + pub const ERROR: &str = "error"; +} + +/// Drop-stage label on `charon_executor_opportunities_dropped_total`. +pub mod drop_stage { + pub const ROUTER: &str = "router"; + pub const PROFIT: &str = "profit"; + pub const SIMULATION: &str = "simulation"; + pub const BUILD: &str = "build"; +} + +/// Install the global Prometheus recorder and start the HTTP listener. +/// +/// Safe to call at most once per process; subsequent calls return an +/// error because the global recorder can only be set once. The exporter +/// task runs for the lifetime of the tokio runtime — no handle is +/// returned because it never needs to be stopped in-process. +pub async fn init(bind: SocketAddr) -> Result<()> { + PrometheusBuilder::new() + .with_http_listener(bind) + .install() + .with_context(|| format!("failed to install Prometheus exporter on {bind}"))?; + + describe_all(); + + info!(bind = %bind, path = "/metrics", "metrics exporter listening"); + Ok(()) +} + +/// Emit Prometheus `# HELP` + `# TYPE` descriptors for every metric +/// Charon exposes. Called once from [`init`] so the exporter's first +/// scrape surfaces human-readable help text even before any counter +/// has been incremented. +fn describe_all() { + describe_counter!( + names::SCANNER_BLOCKS_TOTAL, + "Total blocks drained from chain listeners." + ); + describe_gauge!( + names::SCANNER_POSITIONS, + "Currently tracked positions bucketed by health classification." + ); + describe_histogram!( + names::PIPELINE_BLOCK_DURATION_SECONDS, + metrics::Unit::Seconds, + "Wall-clock duration of one full per-block pipeline pass." + ); + describe_counter!( + names::EXECUTOR_SIMULATIONS_TOTAL, + "Simulations attempted via `eth_call`, partitioned by outcome." + ); + describe_counter!( + names::EXECUTOR_OPPS_QUEUED_TOTAL, + "Liquidation opportunities that passed every gate and landed in the queue." + ); + describe_counter!( + names::EXECUTOR_OPPS_DROPPED_TOTAL, + "Liquidation opportunities dropped before reaching the queue, partitioned by stage." + ); + describe_histogram!( + names::EXECUTOR_PROFIT_USD_CENTS, + "Per-opportunity net profit in USD cents (post profit gate)." + ); + describe_gauge!( + names::EXECUTOR_QUEUE_DEPTH, + "Current depth of the profit-ordered opportunity queue." + ); + describe_gauge!( + names::BUILD_INFO, + "Build metadata as labels; value is always 1." + ); +} + +// ─── Typed helpers (thin wrappers so call sites stay terse) ─────────── + +/// Increment the per-chain blocks-scanned counter. +pub fn record_block_scanned(chain: &str) { + counter!(names::SCANNER_BLOCKS_TOTAL, "chain" => chain.to_owned()).increment(1); +} + +/// Set the gauge for one health bucket on one chain. +pub fn set_position_bucket(chain: &str, bucket: &str, count: u64) { + gauge!(names::SCANNER_POSITIONS, "chain" => chain.to_owned(), "bucket" => bucket.to_owned()) + .set(count as f64); +} + +/// Observe the wall-clock duration of one pipeline pass. +pub fn observe_block_duration(chain: &str, seconds: f64) { + histogram!(names::PIPELINE_BLOCK_DURATION_SECONDS, "chain" => chain.to_owned()).record(seconds); +} + +/// Record one simulation outcome. +pub fn record_simulation(chain: &str, result: &str) { + counter!( + names::EXECUTOR_SIMULATIONS_TOTAL, + "chain" => chain.to_owned(), + "result" => result.to_owned() + ) + .increment(1); +} + +/// Record one opportunity that made it into the queue. +pub fn record_opportunity_queued(chain: &str, profit_usd_cents: u64) { + counter!(names::EXECUTOR_OPPS_QUEUED_TOTAL, "chain" => chain.to_owned()).increment(1); + histogram!(names::EXECUTOR_PROFIT_USD_CENTS, "chain" => chain.to_owned()) + .record(profit_usd_cents as f64); +} + +/// Record one opportunity that was dropped before reaching the queue. +pub fn record_opportunity_dropped(chain: &str, stage: &str) { + counter!( + names::EXECUTOR_OPPS_DROPPED_TOTAL, + "chain" => chain.to_owned(), + "stage" => stage.to_owned() + ) + .increment(1); +} + +/// Update the queue-depth gauge. +pub fn set_queue_depth(depth: u64) { + gauge!(names::EXECUTOR_QUEUE_DEPTH).set(depth as f64); +} + +/// Emit build metadata once at startup. The metric value is always 1; +/// labels carry the interesting bits so dashboards can `group_by`. +pub fn set_build_info(version: &str, git_sha: &str) { + gauge!( + names::BUILD_INFO, + "version" => version.to_owned(), + "git_sha" => git_sha.to_owned() + ) + .set(1.0); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, SocketAddrV4}; + use tokio::net::TcpStream; + use tokio::time::{Duration, sleep}; + + /// Smoke-test: `init` must bind the HTTP listener so a subsequent + /// TCP connect to `/metrics` succeeds. A failed listener bind is + /// the single most common regression when swapping exporter + /// versions; this catches it without asserting on the text body. + #[tokio::test] + async fn init_binds_prometheus_http_listener() { + // Port 0 asks the OS for an ephemeral port, avoiding collisions + // with any concurrent test run. We then need to know which port + // was picked so we can connect back — bind a probe socket first + // just to reserve a port number, drop it, hand the number to + // the exporter. Races are technically possible but vanishingly + // rare in practice on `127.0.0.1`. + let probe = std::net::TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) + .expect("probe bind"); + let port = probe.local_addr().unwrap().port(); + drop(probe); + + let bind = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); + init(bind).await.expect("init should succeed"); + + // Small yield so the listener's spawn has a chance to bind + // before the connect probe fires. + sleep(Duration::from_millis(50)).await; + + TcpStream::connect(bind) + .await + .expect("listener should accept TCP connections"); + } + + /// Typed helpers must not panic when called — this exercises every + /// label combination that call sites use so metric-name typos + /// surface at `cargo test` time, not in prod. + #[test] + fn typed_helpers_are_panic_free() { + record_block_scanned("bnb"); + set_position_bucket("bnb", bucket::HEALTHY, 7); + set_position_bucket("bnb", bucket::NEAR_LIQ, 2); + set_position_bucket("bnb", bucket::LIQUIDATABLE, 0); + observe_block_duration("bnb", 0.123); + record_simulation("bnb", sim_result::OK); + record_simulation("bnb", sim_result::REVERT); + record_simulation("bnb", sim_result::ERROR); + record_opportunity_queued("bnb", 1_234); + record_opportunity_dropped("bnb", drop_stage::ROUTER); + record_opportunity_dropped("bnb", drop_stage::PROFIT); + record_opportunity_dropped("bnb", drop_stage::SIMULATION); + record_opportunity_dropped("bnb", drop_stage::BUILD); + set_queue_depth(3); + set_build_info("0.1.0", "deadbeef"); + } +} From 2e72baf8ba57363eeb7dbe5d752a3ab9259c05f6 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 15:32:20 +0530 Subject: [PATCH 03/11] fix(metrics): domain-scaled histogram buckets Register explicit bucket boundaries for charon_pipeline_block_duration_seconds and charon_executor_profit_usd_cents via PrometheusBuilder::set_buckets_for_metric. Without matchers, the exporter renders both histograms as Prometheus summaries, producing NaN from histogram_quantile and empty heatmaps in the companion Grafana dashboard. Block-duration buckets target BSC's 3s block cadence (healthy / warning / alert / overrun). Profit buckets cover the $0.05 dust to $10k+ windfall range observed on Venus liquidations. Closes #275 Closes #218 Closes #217 --- crates/charon-metrics/src/lib.rs | 44 +++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs index d182be5..39e6de8 100644 --- a/crates/charon-metrics/src/lib.rs +++ b/crates/charon-metrics/src/lib.rs @@ -19,9 +19,31 @@ use std::net::SocketAddr; use anyhow::{Context, Result}; use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; -use metrics_exporter_prometheus::PrometheusBuilder; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; use tracing::info; +// Bucket boundaries for `charon_pipeline_block_duration_seconds`. +// BSC produces a block every ~3s; resolution is packed around that +// threshold so p50/p95 quantiles stay meaningful instead of piling +// into `+Inf` with the exporter's default HTTP-latency buckets. +const BLOCK_DURATION_SECONDS_BUCKETS: &[f64] = + &[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0]; + +// Bucket boundaries for `charon_executor_profit_usd_cents`. +// Realistic Venus liquidation profit spans ~$0.05 dust to ~$10k +// windfalls; buckets are in cents (5 → 1_000_000) so histogram_quantile +// returns finite values across that range. +const PROFIT_USD_CENTS_BUCKETS: &[f64] = &[ + 5.0, + 50.0, + 500.0, + 2_500.0, + 10_000.0, + 50_000.0, + 250_000.0, + 1_000_000.0, +]; + /// Single-source-of-truth metric names. Kept as constants so call /// sites, dashboard JSON, and alert rules refer to the same strings. pub mod names { @@ -76,6 +98,26 @@ pub mod drop_stage { pub async fn init(bind: SocketAddr) -> Result<()> { PrometheusBuilder::new() .with_http_listener(bind) + .set_buckets_for_metric( + Matcher::Full(names::PIPELINE_BLOCK_DURATION_SECONDS.to_string()), + BLOCK_DURATION_SECONDS_BUCKETS, + ) + .with_context(|| { + format!( + "failed to register buckets for {}", + names::PIPELINE_BLOCK_DURATION_SECONDS + ) + })? + .set_buckets_for_metric( + Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), + PROFIT_USD_CENTS_BUCKETS, + ) + .with_context(|| { + format!( + "failed to register buckets for {}", + names::EXECUTOR_PROFIT_USD_CENTS + ) + })? .install() .with_context(|| format!("failed to install Prometheus exporter on {bind}"))?; From 9199f0dfa44400aa3a5bd58679b23dd67efe875d Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 15:35:52 +0530 Subject: [PATCH 04/11] chore(workspace): hoist metrics deps to workspace.dependencies Declare metrics 0.24 and metrics-exporter-prometheus 0.16 in the root [workspace.dependencies] table so every crate that pulls them in reuses the same version. charon-metrics switches its direct version strings to { workspace = true }; the http-listener feature stays pinned at the workspace level. Closes #219 --- Cargo.toml | 4 ++++ crates/charon-metrics/Cargo.toml | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d4364d5..644176a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,10 @@ clap = { version = "4", features = ["derive"] } # .env loader dotenvy = "0.15" +# Metrics facade + Prometheus exporter +metrics = "0.24" +metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] } + # Internal crates charon-core = { path = "crates/charon-core" } charon-executor = { path = "crates/charon-executor" } diff --git a/crates/charon-metrics/Cargo.toml b/crates/charon-metrics/Cargo.toml index 8be39f1..dee3e40 100644 --- a/crates/charon-metrics/Cargo.toml +++ b/crates/charon-metrics/Cargo.toml @@ -9,5 +9,5 @@ description = "Prometheus-compatible metrics exporter and metric constants for C anyhow = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -metrics = "0.24" -metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] } +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } From 0919da325ad25165a33bbc23e93eeca263f48155 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 15:55:33 +0530 Subject: [PATCH 05/11] fix(metrics): label queued-counter with simulated=true|false process_opportunity pushes entries to the queue in two paths: simulation-gated (BOT_SIGNER_KEY set) and dry-run (signer absent). Both were incrementing charon_executor_opportunities_queued_total with the same label set, making the gate bypass invisible in dashboards. Thread a `simulated: bool` through `record_opportunity_queued` so the counter splits by whether the eth_call gate actually ran. Help text on the exporter updated to document the label semantics. This is the observability half of #220; the hard-refusal half (#170) lands separately on the executor branch. Closes #220 --- crates/charon-cli/src/main.rs | 17 ++++++++++++----- crates/charon-metrics/src/lib.rs | 20 ++++++++++++++++---- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 1b28dbb..a9a9fdd 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -457,11 +457,16 @@ async fn process_opportunity( // e. Tx builder + simulator — only if the operator supplied // BOT_SIGNER_KEY. Without it, push to the queue based on profit - // alone so dry-runs still surface ranked candidates. - if let (Some(builder), Some(sim)) = (tx_builder, simulator) { + // alone so dry-runs still surface ranked candidates. `simulated` + // propagates to the queued-counter label so dashboards can tell + // sim'd entries from dry-run entries (see #220). + let simulated = if let (Some(builder), Some(sim)) = (tx_builder, simulator) { let calldata = builder.encode_calldata(&opp, ¶ms)?; match sim.simulate(provider, calldata).await { - Ok(()) => charon_metrics::record_simulation(chain, sim_result::OK), + Ok(()) => { + charon_metrics::record_simulation(chain, sim_result::OK); + true + } Err(err) => { charon_metrics::record_simulation(chain, sim_result::REVERT); charon_metrics::record_opportunity_dropped(chain, drop_stage::SIMULATION); @@ -469,13 +474,15 @@ async fn process_opportunity( return Ok(false); } } - } + } else { + false + }; // f. Push to the profit-ordered queue. let profit_cents = opp.net_profit_usd_cents; let mut q = queue.lock().await; q.push(opp, queued_at_block); - charon_metrics::record_opportunity_queued(chain, profit_cents); + charon_metrics::record_opportunity_queued(chain, profit_cents, simulated); Ok(true) } diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs index 39e6de8..da1d36f 100644 --- a/crates/charon-metrics/src/lib.rs +++ b/crates/charon-metrics/src/lib.rs @@ -151,7 +151,7 @@ fn describe_all() { ); describe_counter!( names::EXECUTOR_OPPS_QUEUED_TOTAL, - "Liquidation opportunities that passed every gate and landed in the queue." + "Liquidation opportunities that landed in the queue, labelled `simulated=true|false` to distinguish sim-gated entries from dry-run pushes (BOT_SIGNER_KEY unset)." ); describe_counter!( names::EXECUTOR_OPPS_DROPPED_TOTAL, @@ -200,8 +200,19 @@ pub fn record_simulation(chain: &str, result: &str) { } /// Record one opportunity that made it into the queue. -pub fn record_opportunity_queued(chain: &str, profit_usd_cents: u64) { - counter!(names::EXECUTOR_OPPS_QUEUED_TOTAL, "chain" => chain.to_owned()).increment(1); +/// +/// `simulated` distinguishes entries that cleared the `eth_call` +/// simulation gate from entries enqueued without simulation (dry-run +/// mode when `BOT_SIGNER_KEY` is unset). Splitting on this label keeps +/// the gate bypass observable from dashboards instead of letting +/// unsimulated pushes masquerade as healthy throughput. +pub fn record_opportunity_queued(chain: &str, profit_usd_cents: u64, simulated: bool) { + counter!( + names::EXECUTOR_OPPS_QUEUED_TOTAL, + "chain" => chain.to_owned(), + "simulated" => if simulated { "true" } else { "false" }.to_owned(), + ) + .increment(1); histogram!(names::EXECUTOR_PROFIT_USD_CENTS, "chain" => chain.to_owned()) .record(profit_usd_cents as f64); } @@ -281,7 +292,8 @@ mod tests { record_simulation("bnb", sim_result::OK); record_simulation("bnb", sim_result::REVERT); record_simulation("bnb", sim_result::ERROR); - record_opportunity_queued("bnb", 1_234); + record_opportunity_queued("bnb", 1_234, true); + record_opportunity_queued("bnb", 9, false); record_opportunity_dropped("bnb", drop_stage::ROUTER); record_opportunity_dropped("bnb", drop_stage::PROFIT); record_opportunity_dropped("bnb", drop_stage::SIMULATION); From b15a46b8a7a7bbcdb95bca3d79f51b23f20fc3ff Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 16:16:32 +0530 Subject: [PATCH 06/11] fix(metrics): loopback default bind + auth_token validation gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Default bind moves from 0.0.0.0:9091 to 127.0.0.1:9091 so a bare VPS deploy does not expose /metrics to the public internet out of the box. MetricsConfig gains an Option auth_token paired with a validate() gate: when enabled and bind is non-loopback, refusing to start unless auth_token is non-empty. Config::load calls the gate after TOML parse so the check fails fast at startup. The exporter itself does not yet terminate the Bearer check — the token is a shared secret enforced by a reverse proxy (nginx, caddy, Traefik) in front of the listener, so bot + proxy read one source. Module rustdoc and the default.toml block describe the loopback-or- proxy contract; four unit tests lock in the three validate paths (loopback ok, non-loopback + token ok, non-loopback + empty/None token reject) plus the disabled-shortcut. Closes #213 Closes #214 --- config/default.toml | 18 ++++- crates/charon-core/src/config.rs | 125 ++++++++++++++++++++++++++++++- crates/charon-metrics/src/lib.rs | 12 +-- 3 files changed, 144 insertions(+), 11 deletions(-) diff --git a/config/default.toml b/config/default.toml index 97d38ec..fea61f9 100644 --- a/config/default.toml +++ b/config/default.toml @@ -48,9 +48,23 @@ contract_address = "0x0000000000000000000000000000000000000000" # ── Prometheus metrics exporter ─────────────────────────────────────────── # Exposes `/metrics` in Prometheus text format for Grafana / Grafana # Cloud scraping. Port 9091 avoids the Prometheus-server default (9090). +# +# Binds to loopback by default so the endpoint is unreachable from the +# public internet on a bare VPS (Hetzner CX22 firewall is permissive on +# inbound). Remote scraping must go through a reverse proxy (nginx, +# caddy, Traefik) that terminates bearer-token or mTLS auth — or a +# compose-internal network, a WireGuard tunnel, etc. +# +# If you must bind to a non-loopback address (e.g. 0.0.0.0), set +# `auth_token` to a non-empty shared secret; config validation rejects +# a public bind without a token configured. The token is compared +# byte-for-byte against the `Authorization: Bearer ` header by +# the reverse proxy — the exporter itself does not yet terminate auth. [metrics] -enabled = true -bind = "0.0.0.0:9091" +enabled = true +bind = "127.0.0.1:9091" +# Uncomment and export CHARON_METRICS_AUTH_TOKEN only if bind is non-loopback: +# auth_token = "${CHARON_METRICS_AUTH_TOKEN}" # ── Chainlink price feeds (per chain, per asset symbol) ─────────────────── # Only feeds listed here are polled by the PriceCache. Add more as new diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index 2f74d45..e8e93f0 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -43,11 +43,49 @@ pub struct MetricsConfig { /// with zero metrics overhead (e.g. one-shot debug runs). #[serde(default = "default_metrics_enabled")] pub enabled: bool, - /// Bind address for the `/metrics` HTTP listener. `0.0.0.0:9091` - /// keeps it off the Prometheus-server default port (`9090`) so a - /// local compose stack doesn't collide. + /// Bind address for the `/metrics` HTTP listener. Defaults to + /// `127.0.0.1:9091` so the endpoint is unreachable from the public + /// internet on a bare VPS. Non-loopback binds must pair with a + /// `auth_token` (enforced by [`MetricsConfig::validate`]). #[serde(default = "default_metrics_bind")] pub bind: SocketAddr, + /// Shared secret expected on `Authorization: Bearer ` when + /// the exporter is reached over a non-loopback bind. The exporter + /// itself does not yet terminate auth — the token is enforced by + /// the reverse proxy (nginx, caddy, etc.) that sits in front of + /// `bind`. Holding the value in config makes the proxy + bot share + /// one source of truth. + #[serde(default)] + pub auth_token: Option, +} + +impl MetricsConfig { + /// Refuse to start when the exporter is bound to a non-loopback + /// address without an accompanying `auth_token`. Stops silent + /// deployment of an unauthenticated `/metrics` endpoint to any + /// caller with network reach — see tracking issues #213 #214. + pub fn validate(&self) -> anyhow::Result<()> { + if !self.enabled { + return Ok(()); + } + let ip = self.bind.ip(); + if !ip.is_loopback() + && self + .auth_token + .as_deref() + .map(str::is_empty) + .unwrap_or(true) + { + return Err(anyhow!( + "metrics.bind {} is non-loopback but metrics.auth_token is empty — \ + either bind to 127.0.0.1 (scrape via reverse proxy / VPN) or set \ + CHARON_METRICS_AUTH_TOKEN and front the exporter with a proxy that \ + enforces Authorization: Bearer on /metrics", + self.bind + )); + } + Ok(()) + } } impl Default for MetricsConfig { @@ -55,6 +93,7 @@ impl Default for MetricsConfig { Self { enabled: default_metrics_enabled(), bind: default_metrics_bind(), + auth_token: None, } } } @@ -64,7 +103,9 @@ fn default_metrics_enabled() -> bool { } fn default_metrics_bind() -> SocketAddr { - "0.0.0.0:9091".parse().expect("valid default metrics bind") + "127.0.0.1:9091" + .parse() + .expect("valid default metrics bind") } /// Bot-level knobs — thresholds and intervals. @@ -156,6 +197,10 @@ impl Config { .with_context(|| format!("env substitution failed for {}", path.display()))?; let config: Config = toml::from_str(&substituted) .with_context(|| format!("failed to parse TOML at {}", path.display()))?; + config + .metrics + .validate() + .with_context(|| format!("invalid [metrics] section in {}", path.display()))?; Ok(config) } } @@ -181,3 +226,75 @@ fn substitute_env_vars(input: &str) -> anyhow::Result { output.push_str(rest); Ok(output) } + +#[cfg(test)] +mod tests { + use super::*; + + /// Loopback bind is safe on its own — no auth token required, + /// because the endpoint is unreachable off-box. + #[test] + fn validate_allows_loopback_without_token() { + let cfg = MetricsConfig { + enabled: true, + bind: "127.0.0.1:9091".parse().unwrap(), + auth_token: None, + }; + cfg.validate().expect("loopback + no token must pass"); + + let cfg_v6 = MetricsConfig { + enabled: true, + bind: "[::1]:9091".parse().unwrap(), + auth_token: None, + }; + cfg_v6.validate().expect("IPv6 loopback must pass"); + } + + /// Non-loopback bind with a non-empty token is the documented + /// "front with a reverse proxy" escape hatch. + #[test] + fn validate_allows_non_loopback_with_token() { + let cfg = MetricsConfig { + enabled: true, + bind: "0.0.0.0:9091".parse().unwrap(), + auth_token: Some("not-a-real-token".into()), + }; + cfg.validate() + .expect("non-loopback + token must pass (proxy enforces)"); + } + + /// Non-loopback with missing or empty token must fail — covers + /// both `auth_token = None` (unset in TOML) and `auth_token = + /// Some("")` (the nasty case where `CHARON_METRICS_AUTH_TOKEN=` + /// is exported empty and env substitution silently yields a + /// blank string). This is the regression gate for #213/#214. + #[test] + fn validate_rejects_non_loopback_without_token() { + let none_cfg = MetricsConfig { + enabled: true, + bind: "0.0.0.0:9091".parse().unwrap(), + auth_token: None, + }; + assert!(none_cfg.validate().is_err(), "None token must fail"); + + let empty_cfg = MetricsConfig { + enabled: true, + bind: "0.0.0.0:9091".parse().unwrap(), + auth_token: Some(String::new()), + }; + assert!(empty_cfg.validate().is_err(), "empty token must fail"); + } + + /// `enabled = false` bypasses validation: a disabled exporter + /// never opens a socket, so bind/token combinations are moot. + #[test] + fn validate_skipped_when_disabled() { + let cfg = MetricsConfig { + enabled: false, + bind: "0.0.0.0:9091".parse().unwrap(), + auth_token: None, + }; + cfg.validate() + .expect("disabled exporter must skip bind checks"); + } +} diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs index da1d36f..e11c248 100644 --- a/crates/charon-metrics/src/lib.rs +++ b/crates/charon-metrics/src/lib.rs @@ -1,15 +1,17 @@ //! Prometheus-compatible metrics surface for Charon. //! //! The exporter listens on a configurable `SocketAddr` (default -//! `0.0.0.0:9091`) and serves a `/metrics` endpoint in the Prometheus -//! text format. All metric names are kept as `const &str` constants in -//! [`names`] so call sites and dashboard JSON stay in lock-step with a -//! single source of truth. +//! `127.0.0.1:9091`, loopback-only; see `MetricsConfig` in +//! `charon-core` for the validation rules that block non-loopback +//! binds without a shared auth token) and serves a `/metrics` +//! endpoint in the Prometheus text format. All metric names are kept +//! as `const &str` constants in [`names`] so call sites and dashboard +//! JSON stay in lock-step with a single source of truth. //! //! ```no_run //! use charon_metrics::{init, names, record_block_scanned}; //! # async fn demo() -> anyhow::Result<()> { -//! init("0.0.0.0:9091".parse()?).await?; +//! init("127.0.0.1:9091".parse()?).await?; //! record_block_scanned("bnb"); //! # Ok(()) //! # } From 9a9a938ab156045dc16c0d9879d05ec923175c4a Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 18:01:01 +0530 Subject: [PATCH 07/11] chore(metrics): opt charon-metrics into workspace lints Mirror the lint floor established on feat/20-multi-liq-batcher (issue #211) onto feat/22 so the two lineages converge on an identical policy at merge: forbid unsafe_code, deny arithmetic_side_effects, cast_possible_truncation, unwrap_used. Root Cargo.toml grows [workspace.lints.rust] + [workspace.lints.clippy] and a workspace-level `thiserror = "1"` dependency. charon-metrics opts in via `[lints] workspace = true` and pulls `thiserror` through the workspace declaration. Repair one violation surfaced by the policy: `probe.local_addr().unwrap()` in the exporter smoke test becomes `.expect("probe socket must expose its bound local_addr")`. Closes #216 --- Cargo.lock | 1 + Cargo.toml | 16 ++++++++++++++++ crates/charon-metrics/Cargo.toml | 4 ++++ 3 files changed, 21 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 954a03a..a0da6f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1182,6 +1182,7 @@ dependencies = [ "anyhow", "metrics", "metrics-exporter-prometheus", + "thiserror 1.0.69", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 644176a..12d9408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,9 @@ dotenvy = "0.15" metrics = "0.24" metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] } +# Structured error types (library crates) +thiserror = "1" + # Internal crates charon-core = { path = "crates/charon-core" } charon-executor = { path = "crates/charon-executor" } @@ -61,3 +64,16 @@ charon-flashloan = { path = "crates/charon-flashloan" } charon-metrics = { path = "crates/charon-metrics" } charon-protocols = { path = "crates/charon-protocols" } charon-scanner = { path = "crates/charon-scanner" } + +# Workspace-wide lint policy. Every crate that opts in via +# `[lints] workspace = true` inherits these; new crates should opt +# in from day one. Mirrors the floor established on +# feat/20-multi-liq-batcher (see issue #211) so the two lineages +# converge on identical lint posture at merge time. +[workspace.lints.rust] +unsafe_code = "forbid" + +[workspace.lints.clippy] +arithmetic_side_effects = "deny" +cast_possible_truncation = "deny" +unwrap_used = "deny" diff --git a/crates/charon-metrics/Cargo.toml b/crates/charon-metrics/Cargo.toml index dee3e40..ecd2309 100644 --- a/crates/charon-metrics/Cargo.toml +++ b/crates/charon-metrics/Cargo.toml @@ -11,3 +11,7 @@ tokio = { workspace = true } tracing = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } +thiserror = { workspace = true } + +[lints] +workspace = true From fb4f9572b9f90910fcdea2053951d45aa847d445 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 18:01:33 +0530 Subject: [PATCH 08/11] fix(metrics): typed MetricsError replaces anyhow on public API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit charon-metrics is a library crate. Returning anyhow::Result from init() forced charon-cli to match on Display strings to tell a port-collision (retryable) apart from a recorder double-install (fatal). Every sibling library (charon-core, charon-executor on feat/20) already uses thiserror — this is the last outlier. Introduce `MetricsError` (#[non_exhaustive]) with two variants: `BucketConfig { metric, source }` for `set_buckets_for_metric` failures — carries the offending metric name so logs pinpoint the offender — and `InstallFailed { bind, source }` for `PrometheusBuilder::install` failures. Both variants hold the underlying `metrics_exporter_prometheus::BuildError` via `#[source]` so `Error::source()` chains preserve the original diagnosis. `init` keeps its public signature shape via a crate-level `Result` alias; charon-cli's `?` continues to work unchanged thanks to `anyhow::Error: From`. One unit test drives the `BucketConfig` path via an empty bucket slice and asserts Display + source chain. Closes #215 --- crates/charon-metrics/src/lib.rs | 88 +++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 14 deletions(-) diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs index e11c248..ea83f0a 100644 --- a/crates/charon-metrics/src/lib.rs +++ b/crates/charon-metrics/src/lib.rs @@ -19,11 +19,44 @@ use std::net::SocketAddr; -use anyhow::{Context, Result}; use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; -use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; +use metrics_exporter_prometheus::{ + BuildError as PromBuildError, Matcher, PrometheusBuilder, +}; +use thiserror::Error; use tracing::info; +/// Errors returned from [`init`]. Exposed as a `#[non_exhaustive]` +/// enum so `charon-cli` can distinguish bind failures (port collision +/// is retryable) from recorder-install failures (caller must abort — +/// the global recorder can only be set once) without matching on +/// `Display` strings. New variants may be added without a breaking +/// semver bump. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum MetricsError { + /// Failed to register custom histogram buckets for a specific + /// metric. Carries the metric name so logs pinpoint the offender. + #[error("failed to register buckets for {metric}: {source}")] + BucketConfig { + metric: &'static str, + #[source] + source: PromBuildError, + }, + /// Installing the global Prometheus recorder failed. Typically a + /// port collision on `bind` or a recorder double-install. The + /// underlying `BuildError` preserves the original diagnosis. + #[error("failed to install Prometheus exporter on {bind}: {source}")] + InstallFailed { + bind: SocketAddr, + #[source] + source: PromBuildError, + }, +} + +/// Convenience alias so helpers and call sites share one return shape. +pub type Result = std::result::Result; + // Bucket boundaries for `charon_pipeline_block_duration_seconds`. // BSC produces a block every ~3s; resolution is packed around that // threshold so p50/p95 quantiles stay meaningful instead of piling @@ -104,24 +137,20 @@ pub async fn init(bind: SocketAddr) -> Result<()> { Matcher::Full(names::PIPELINE_BLOCK_DURATION_SECONDS.to_string()), BLOCK_DURATION_SECONDS_BUCKETS, ) - .with_context(|| { - format!( - "failed to register buckets for {}", - names::PIPELINE_BLOCK_DURATION_SECONDS - ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::PIPELINE_BLOCK_DURATION_SECONDS, + source, })? .set_buckets_for_metric( Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), PROFIT_USD_CENTS_BUCKETS, ) - .with_context(|| { - format!( - "failed to register buckets for {}", - names::EXECUTOR_PROFIT_USD_CENTS - ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::EXECUTOR_PROFIT_USD_CENTS, + source, })? .install() - .with_context(|| format!("failed to install Prometheus exporter on {bind}"))?; + .map_err(|source| MetricsError::InstallFailed { bind, source })?; describe_all(); @@ -266,7 +295,10 @@ mod tests { // rare in practice on `127.0.0.1`. let probe = std::net::TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) .expect("probe bind"); - let port = probe.local_addr().unwrap().port(); + let port = probe + .local_addr() + .expect("probe socket must expose its bound local_addr") + .port(); drop(probe); let bind = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); @@ -281,6 +313,34 @@ mod tests { .expect("listener should accept TCP connections"); } + /// `MetricsError::BucketConfig` must render the offending metric + /// name in its `Display` string and expose the upstream + /// `PromBuildError` through `Error::source()` so operator tooling + /// can walk the chain. Reached via the real builder path (empty + /// bucket slice → `BuildError::EmptyBucketsOrQuantiles`) rather + /// than a hand-rolled variant, so the mapping in `init` stays + /// exercised end-to-end. + #[test] + fn bucket_config_error_display_and_source_chain() { + let err = PrometheusBuilder::new() + .set_buckets_for_metric(Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), &[]) + .map_err(|source| MetricsError::BucketConfig { + metric: names::EXECUTOR_PROFIT_USD_CENTS, + source, + }) + .expect_err("empty bucket slice must fail"); + + let rendered = format!("{err}"); + assert!( + rendered.contains(names::EXECUTOR_PROFIT_USD_CENTS), + "Display must name the offending metric, got {rendered:?}" + ); + assert!( + std::error::Error::source(&err).is_some(), + "BucketConfig must expose its PromBuildError as source()" + ); + } + /// Typed helpers must not panic when called — this exercises every /// label combination that call sites use so metric-name typos /// surface at `cargo test` time, not in prod. From bd37bda8924fac34ceaf4998d0d5fcebb4433c8b Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 18:22:26 +0530 Subject: [PATCH 09/11] fix(metrics): idempotent init, supervised exporter, strict config - MetricsConfig gains deny_unknown_fields + non_exhaustive so typos in [metrics] fail at load time instead of silently falling back to the default loopback bind. - charon_metrics::init is now idempotent; a second call short-circuits before touching PrometheusBuilder so repeated invocations no longer panic inside set_global_recorder. - New charon_metrics::install returns the ExporterFuture so the CLI can push the HTTP listener into the same JoinSet that supervises block listeners. A panic or clean exit in any supervised task now triggers controlled shutdown instead of leaving the bot running blind to Grafana. Closes #221 Closes #222 Closes #223 --- crates/charon-cli/src/main.rs | 138 ++++++++++++++------- crates/charon-core/src/config.rs | 12 ++ crates/charon-metrics/src/lib.rs | 200 ++++++++++++++++++++++++------- 3 files changed, 265 insertions(+), 85 deletions(-) diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index a9a9fdd..94d02aa 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -104,20 +104,12 @@ async fn main() -> Result<()> { "config loaded" ); - // Prometheus exporter — install the global recorder before any - // subsystem records a metric. Disabled by the operator via - // `[metrics] enabled = false` turns the bot into a zero-overhead - // one-shot, which is handy for `test-connection` smoke runs. - if config.metrics.enabled { - charon_metrics::init(config.metrics.bind).await?; - charon_metrics::set_build_info( - env!("CARGO_PKG_VERSION"), - option_env!("CHARON_GIT_SHA").unwrap_or("unknown"), - ); - } else { - info!("metrics disabled via config"); - } - + // `test-connection` is a one-shot smoke test; it does not + // stay resident and has no pipeline to instrument, so the + // metrics exporter stays off for that subcommand regardless + // of config. `run_listen` owns exporter lifecycle so the + // listener future can be supervised alongside block + // listeners — see #222. match cli.command { Command::Listen { borrowers } => run_listen(config, borrowers).await?, Command::TestConnection { chain } => { @@ -253,47 +245,111 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { "pipeline ready (scan-only, no broadcast)" ); - // ── Block-event drain ── + // ── Supervised long-running tasks ── + // + // One JoinSet owns every resident task: per-chain block + // listeners and the Prometheus exporter HTTP listener. When + // any task completes (panic, error, or Ok), we break out of + // the drain loop and trigger a controlled shutdown instead + // of silently running with a degraded surface — discarding + // the exporter's JoinHandle, as before #222, meant a panic + // in `hyper`/`tokio` inside the exporter would kill the + // task and leave the bot running blind to Grafana. + let mut supervised: tokio::task::JoinSet> = + tokio::task::JoinSet::new(); + + // Metrics exporter: install the recorder and push the + // returned future into the JoinSet. `install` returns + // `Ok(None)` on repeat calls in the same process (#223), in + // which case there is nothing to supervise here. + if config.metrics.enabled { + match charon_metrics::install(config.metrics.bind)? { + Some(exporter) => { + charon_metrics::set_build_info( + env!("CARGO_PKG_VERSION"), + option_env!("CHARON_GIT_SHA").unwrap_or("unknown"), + ); + supervised.spawn(async move { + exporter + .await + .map_err(|err| anyhow::anyhow!("metrics exporter: {err:?}")) + }); + } + None => { + info!("metrics exporter already running; skipping duplicate install"); + } + } + } else { + info!("metrics disabled via config"); + } + let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); for (name, chain_cfg) in config.chain { let listener = BlockListener::new(name.clone(), chain_cfg, tx.clone()); - tokio::spawn(async move { - if let Err(err) = listener.run().await { - warn!(chain = %name, error = ?err, "listener terminated"); - } + let chain_name = name.clone(); + supervised.spawn(async move { + listener + .run() + .await + .map_err(|err| anyhow::anyhow!("listener {chain_name}: {err}")) }); } drop(tx); info!("listen: draining chain events (Ctrl-C to stop)"); - tokio::select! { - _ = async { - while let Some(event) = rx.recv().await { - match event { - ChainEvent::NewBlock { chain, number, timestamp } => { - process_block( - chain, - number, - timestamp, - &borrowers, - adapter.clone(), - scanner.clone(), - router.clone(), - tx_builder.clone(), - simulator.clone(), - queue.clone(), - provider.clone(), - config.bot.min_profit_usd, - ) - .await; - } + let drain = async { + while let Some(event) = rx.recv().await { + match event { + ChainEvent::NewBlock { + chain, + number, + timestamp, + } => { + process_block( + chain, + number, + timestamp, + &borrowers, + adapter.clone(), + scanner.clone(), + router.clone(), + tx_builder.clone(), + simulator.clone(), + queue.clone(), + provider.clone(), + config.bot.min_profit_usd, + ) + .await; } } - } => info!("all listeners exited"), + } + }; + + tokio::select! { + _ = drain => info!("all listeners exited"), _ = tokio::signal::ctrl_c() => info!("ctrl-c received, shutting down"), + res = supervised.join_next() => match res { + Some(Ok(Ok(()))) => { + warn!("a supervised task exited cleanly; shutting down the rest"); + } + Some(Ok(Err(err))) => { + warn!(error = ?err, "a supervised task returned an error; shutting down"); + } + Some(Err(err)) => { + warn!(error = ?err, "a supervised task panicked; shutting down"); + } + None => { + // JoinSet empty — nothing to supervise, fall through. + } + }, } + // Abort any still-running supervised task so the binary does + // not hang on shutdown waiting for a listener's WS reconnect. + supervised.abort_all(); + while supervised.join_next().await.is_some() {} + Ok(()) } diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index e8e93f0..6ddd841 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -37,7 +37,19 @@ pub struct Config { } /// Prometheus exporter configuration. +/// +/// `#[serde(deny_unknown_fields)]` makes typos in +/// `config/default.toml` a hard load-time error — a stray +/// `metrics.bindd = …` used to be silently ignored, leaving the +/// exporter on its default loopback bind while the operator +/// assumed the override took effect. `#[non_exhaustive]` reserves +/// room to add fields (e.g. TLS, scrape-path override) without a +/// breaking semver bump; external callers must construct via +/// [`MetricsConfig::default`] and mutate the fields they care +/// about rather than using a struct literal. #[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +#[non_exhaustive] pub struct MetricsConfig { /// Start the exporter at bot startup. Set to `false` to run charon /// with zero metrics overhead (e.g. one-shot debug runs). diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs index ea83f0a..d26caa1 100644 --- a/crates/charon-metrics/src/lib.rs +++ b/crates/charon-metrics/src/lib.rs @@ -18,14 +18,24 @@ //! ``` use std::net::SocketAddr; +use std::sync::OnceLock; use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; use metrics_exporter_prometheus::{ - BuildError as PromBuildError, Matcher, PrometheusBuilder, + BuildError as PromBuildError, ExporterFuture, Matcher, PrometheusBuilder, }; use thiserror::Error; use tracing::info; +/// Tracks whether the global Prometheus recorder has already been +/// installed in this process. `metrics_exporter_prometheus` calls +/// `metrics::set_global_recorder` under the hood, and that call +/// panics on a second successful install. Gating [`init`] behind +/// this `OnceLock` turns a second invocation into a silent no-op +/// so repeated calls from tests (or a future restart path) do not +/// tear the process down. +static INIT: OnceLock<()> = OnceLock::new(); + /// Errors returned from [`init`]. Exposed as a `#[non_exhaustive]` /// enum so `charon-cli` can distinguish bind failures (port collision /// is retryable) from recorder-install failures (caller must abort — @@ -44,7 +54,7 @@ pub enum MetricsError { source: PromBuildError, }, /// Installing the global Prometheus recorder failed. Typically a - /// port collision on `bind` or a recorder double-install. The + /// port collision on `bind` or an exporter-build error. The /// underlying `BuildError` preserves the original diagnosis. #[error("failed to install Prometheus exporter on {bind}: {source}")] InstallFailed { @@ -52,6 +62,16 @@ pub enum MetricsError { #[source] source: PromBuildError, }, + /// The `metrics` global recorder was already installed by some + /// other crate in the same process. Distinct from + /// [`MetricsError::InstallFailed`] because the fix is + /// different: exporter-build errors retry; a foreign recorder + /// has to be removed from the dep graph entirely. The + /// idempotency gate on [`install`] short-circuits our own + /// second install, so reaching this variant means a third + /// party got there first. + #[error("failed to set global recorder for {bind}: {reason}")] + RecorderInstall { bind: SocketAddr, reason: String }, } /// Convenience alias so helpers and call sites share one return shape. @@ -61,8 +81,7 @@ pub type Result = std::result::Result; // BSC produces a block every ~3s; resolution is packed around that // threshold so p50/p95 quantiles stay meaningful instead of piling // into `+Inf` with the exporter's default HTTP-latency buckets. -const BLOCK_DURATION_SECONDS_BUCKETS: &[f64] = - &[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0]; +const BLOCK_DURATION_SECONDS_BUCKETS: &[f64] = &[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0]; // Bucket boundaries for `charon_executor_profit_usd_cents`. // Realistic Venus liquidation profit spans ~$0.05 dust to ~$10k @@ -126,12 +145,62 @@ pub mod drop_stage { /// Install the global Prometheus recorder and start the HTTP listener. /// -/// Safe to call at most once per process; subsequent calls return an -/// error because the global recorder can only be set once. The exporter -/// task runs for the lifetime of the tokio runtime — no handle is -/// returned because it never needs to be stopped in-process. +/// Idempotent: the first successful call installs the recorder and +/// spawns the `/metrics` listener, subsequent calls log and return +/// `Ok(())` without touching the global recorder. This guards against +/// double-install panics in `metrics::set_global_recorder`, which +/// would otherwise take the bot down on an accidental retry. The +/// exporter task runs for the lifetime of the tokio runtime — no +/// handle is returned because it never needs to be stopped in-process. pub async fn init(bind: SocketAddr) -> Result<()> { - PrometheusBuilder::new() + // Fire-and-forget variant: install the recorder and spawn the + // listener future onto the current tokio runtime. The returned + // JoinHandle is intentionally discarded here — this path is + // meant for tests and for code paths that do not have a + // JoinSet supervisor. Production call sites should prefer + // [`install`] so the exporter task can be supervised together + // with the bot's other long-running tasks (see #222). + match install(bind)? { + Some(fut) => { + tokio::spawn(async move { + if let Err(err) = fut.await { + tracing::error!(error = ?err, "metrics exporter task terminated"); + } + }); + } + None => { + // Recorder already installed; nothing to drive. + } + } + Ok(()) +} + +/// Install the global Prometheus recorder and return the +/// [`ExporterFuture`] that drives the `/metrics` HTTP listener. +/// +/// The returned future must be polled for the exporter to accept +/// scrapes — production code pushes it into the same `JoinSet` +/// that supervises block listeners so a panic in the exporter +/// triggers the same controlled-shutdown path (#222). Tests that +/// do not care about supervision should call [`init`] instead. +/// +/// Returns `Ok(None)` on the second and later calls in the same +/// process, because the global recorder can only be installed +/// once — a second `install()` would panic inside +/// `metrics::set_global_recorder`, see #223. Callers that got +/// `None` must skip supervising a listener future; the prior +/// install still owns the HTTP socket. +pub fn install(bind: SocketAddr) -> Result> { + // Idempotency gate — short-circuit before we touch the + // PrometheusBuilder. `INIT` is checked again after the + // successful build to close the narrow race where two + // concurrent callers both observe `None` here. + if INIT.get().is_some() { + info!(bind = %bind, "metrics exporter already initialized; skipping re-install"); + return Ok(None); + } + + let (recorder, exporter) = PrometheusBuilder::new() .with_http_listener(bind) .set_buckets_for_metric( Matcher::Full(names::PIPELINE_BLOCK_DURATION_SECONDS.to_string()), @@ -149,13 +218,31 @@ pub async fn init(bind: SocketAddr) -> Result<()> { metric: names::EXECUTOR_PROFIT_USD_CENTS, source, })? - .install() + .build() .map_err(|source| MetricsError::InstallFailed { bind, source })?; + // Close the race: if another caller beat us to INIT, drop + // the recorder and exporter we just built and report no-op. + // `set_global_recorder` below would otherwise panic on + // double-install. + if INIT.set(()).is_err() { + info!(bind = %bind, "metrics exporter lost init race; discarding fresh build"); + return Ok(None); + } + + // `set_global_recorder` fails only if a recorder is already + // installed in the process. We check `INIT` above, so the + // only way to reach a real failure here is a third-party + // crate having already installed a `metrics` recorder. + metrics::set_global_recorder(recorder).map_err(|err| MetricsError::RecorderInstall { + bind, + reason: err.to_string(), + })?; + describe_all(); info!(bind = %bind, path = "/metrics", "metrics exporter listening"); - Ok(()) + Ok(Some(exporter)) } /// Emit Prometheus `# HELP` + `# TYPE` descriptors for every metric @@ -281,38 +368,6 @@ mod tests { use tokio::net::TcpStream; use tokio::time::{Duration, sleep}; - /// Smoke-test: `init` must bind the HTTP listener so a subsequent - /// TCP connect to `/metrics` succeeds. A failed listener bind is - /// the single most common regression when swapping exporter - /// versions; this catches it without asserting on the text body. - #[tokio::test] - async fn init_binds_prometheus_http_listener() { - // Port 0 asks the OS for an ephemeral port, avoiding collisions - // with any concurrent test run. We then need to know which port - // was picked so we can connect back — bind a probe socket first - // just to reserve a port number, drop it, hand the number to - // the exporter. Races are technically possible but vanishingly - // rare in practice on `127.0.0.1`. - let probe = std::net::TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) - .expect("probe bind"); - let port = probe - .local_addr() - .expect("probe socket must expose its bound local_addr") - .port(); - drop(probe); - - let bind = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); - init(bind).await.expect("init should succeed"); - - // Small yield so the listener's spawn has a chance to bind - // before the connect probe fires. - sleep(Duration::from_millis(50)).await; - - TcpStream::connect(bind) - .await - .expect("listener should accept TCP connections"); - } - /// `MetricsError::BucketConfig` must render the offending metric /// name in its `Display` string and expose the upstream /// `PromBuildError` through `Error::source()` so operator tooling @@ -323,7 +378,10 @@ mod tests { #[test] fn bucket_config_error_display_and_source_chain() { let err = PrometheusBuilder::new() - .set_buckets_for_metric(Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), &[]) + .set_buckets_for_metric( + Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), + &[], + ) .map_err(|source| MetricsError::BucketConfig { metric: names::EXECUTOR_PROFIT_USD_CENTS, source, @@ -341,6 +399,60 @@ mod tests { ); } + /// Covers two invariants at once because `INIT` is + /// process-wide and unit tests in the same binary share it: + /// + /// 1. The first call binds the `/metrics` HTTP listener so a + /// subsequent TCP connect succeeds — regression gate + /// against broken listener wiring on exporter bumps. + /// 2. Second and Nth calls return `Ok(())` without touching + /// the global recorder — `metrics-exporter-prometheus` + /// otherwise panics inside `set_global_recorder`, which + /// would take the bot down on what ought to be a harmless + /// retry (regression gate for #223). + /// + /// Folded into one test so unit-test ordering cannot leave + /// `INIT` in a pre-set state and silently skip the bind + /// assertion in a sibling test. + #[tokio::test] + async fn init_binds_listener_and_is_idempotent() { + // Port 0 asks the OS for an ephemeral port, avoiding + // collisions with any concurrent test run. Bind a probe + // socket, record the number, drop it, hand the port to + // the exporter. Races are technically possible but + // vanishingly rare on 127.0.0.1 and do not compromise + // correctness — the connect probe below would simply + // fail loudly. + let probe = std::net::TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) + .expect("probe bind"); + let port = probe + .local_addr() + .expect("probe socket must expose its bound local_addr") + .port(); + drop(probe); + + let bind = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); + init(bind).await.expect("first init should succeed"); + + // Yield so the listener's spawn binds before we probe. + sleep(Duration::from_millis(50)).await; + + TcpStream::connect(bind) + .await + .expect("listener should accept TCP connections"); + + // Re-invoke with a deliberately unusable bind. If the + // idempotency gate were missing, PrometheusBuilder would + // attempt a fresh install and panic inside + // `set_global_recorder`. We assert `Ok(())` and that the + // listener never moves off `bind`. + let bogus = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + init(bogus) + .await + .expect("second init must be a silent no-op"); + init(bogus).await.expect("third init must also be a no-op"); + } + /// Typed helpers must not panic when called — this exercises every /// label combination that call sites use so metric-name typos /// surface at `cargo test` time, not in prod. From 7f6dc2f7e68e9dc4d28a4630f3cf00135def82b1 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 18:22:43 +0530 Subject: [PATCH 10/11] test(metrics): end-to-end scrape of /metrics in its own integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drive the exporter through a real TCP GET and assert the Prometheus text body carries # HELP, # TYPE, the expected metric names from charon_metrics::names, and a round-tripped label. The test lives in tests/ so it runs in a fresh process — the lib-side unit tests share the global recorder state with each other, which would otherwise mask a broken install path. Closes #224 --- crates/charon-metrics/tests/scrape.rs | 126 ++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 crates/charon-metrics/tests/scrape.rs diff --git a/crates/charon-metrics/tests/scrape.rs b/crates/charon-metrics/tests/scrape.rs new file mode 100644 index 0000000..037467f --- /dev/null +++ b/crates/charon-metrics/tests/scrape.rs @@ -0,0 +1,126 @@ +//! End-to-end scrape test for the Prometheus exporter. +//! +//! Lives in `tests/` rather than `src/lib.rs`'s `#[cfg(test)]` module +//! so it runs as its own integration-test binary with a fresh process +//! — the exporter installs a global recorder, and a second install in +//! the same process is a silent no-op (`charon-metrics` #223), so the +//! integration-test binary must not share process state with the unit +//! tests. +//! +//! Regression gate for #224: it was previously possible to ship a +//! broken exporter (metric name typo, missing `describe_*` call, +//! listener not bound) without any test catching it — this test +//! scrapes `/metrics` with a raw HTTP client and asserts the +//! Prometheus text-format response contains the expected helpers. + +use std::io::{Read, Write}; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpStream}; +use std::time::Duration; + +use charon_metrics::{init, names, record_block_scanned}; + +/// Fixed loopback port picked to avoid collision with the default +/// exporter port (`9091`) and with common dev services. If the port +/// is genuinely in use on a contributor's box, the test fails loudly +/// with a bind error rather than silently passing — acceptable +/// tradeoff for not plumbing the bound addr out of the exporter lib. +const TEST_PORT: u16 = 19_091; + +/// Scrape the exporter after a counter has been incremented and +/// verify the Prometheus text-format body contains both `# HELP` +/// metadata and the metric line. +/// +/// We deliberately avoid pulling `reqwest` just to fetch one URL — +/// the text format is plain HTTP/1.1 so a raw-TCP request keeps the +/// dev-dep surface small and sidesteps TLS/async runtime questions. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn scrape_returns_valid_prometheus_text() { + let bind = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TEST_PORT)); + + init(bind).await.expect("exporter init must succeed"); + + // Small yield so the listener's spawn has a chance to bind + // before we connect. Without this, a fast test machine can race + // the exporter's `tokio::spawn` and see `connection refused`. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Touch a counter and a gauge so the snapshot is non-empty. The + // counter name must round-trip through the exporter's dedup + + // label-sorting, so asserting on its presence validates the + // whole pipeline — constants, descriptor registration, encoder. + record_block_scanned("bnb"); + charon_metrics::set_position_bucket("bnb", charon_metrics::bucket::HEALTHY, 1); + + // Give the recorder a beat to flush the new sample into the + // renderer's internal state. + tokio::time::sleep(Duration::from_millis(50)).await; + + let body = tokio::task::spawn_blocking(move || scrape(bind)) + .await + .expect("scrape task must not panic") + .expect("scrape must succeed"); + + // `# HELP` lines are emitted by `describe_*` calls — their + // presence proves `describe_all()` ran. + assert!( + body.contains("# HELP"), + "scrape body missing `# HELP` metadata; got:\n{body}" + ); + assert!( + body.contains("# TYPE"), + "scrape body missing `# TYPE` metadata; got:\n{body}" + ); + + // Metric-name constants must flow end-to-end. Assert on the raw + // strings from `names` so a typo in the constant or a call-site + // drift surfaces here rather than in a silent Grafana panel. + assert!( + body.contains(names::SCANNER_BLOCKS_TOTAL), + "scrape body missing `{}`; got:\n{body}", + names::SCANNER_BLOCKS_TOTAL, + ); + assert!( + body.contains(names::SCANNER_POSITIONS), + "scrape body missing `{}`; got:\n{body}", + names::SCANNER_POSITIONS, + ); + + // Label round-trip: the `chain="bnb"` label must show up on the + // counter line. Guards against a regression where label keys are + // dropped by the exporter's relabeling / matcher config. + assert!( + body.contains("chain=\"bnb\""), + "scrape body missing expected `chain=\"bnb\"` label; got:\n{body}" + ); +} + +/// Minimal HTTP/1.1 GET over raw TCP. Returns the response body +/// (everything after the first `\r\n\r\n`). Anything non-200 or a +/// malformed response is surfaced as an `Err`. +fn scrape(bind: SocketAddr) -> std::io::Result { + let mut stream = TcpStream::connect_timeout(&bind, Duration::from_secs(2))?; + stream.set_read_timeout(Some(Duration::from_secs(2)))?; + stream.set_write_timeout(Some(Duration::from_secs(2)))?; + + let request = format!( + "GET /metrics HTTP/1.1\r\nHost: {bind}\r\nConnection: close\r\nUser-Agent: charon-test\r\n\r\n", + ); + stream.write_all(request.as_bytes())?; + + let mut raw = Vec::with_capacity(8 * 1024); + stream.read_to_end(&mut raw)?; + + let text = String::from_utf8_lossy(&raw).into_owned(); + let status_line = text.lines().next().unwrap_or(""); + if !status_line.starts_with("HTTP/1.1 200") && !status_line.starts_with("HTTP/1.0 200") { + return Err(std::io::Error::other(format!( + "unexpected response status: {status_line}" + ))); + } + + let body_start = text + .find("\r\n\r\n") + .and_then(|i| i.checked_add(4)) + .unwrap_or(0); + Ok(text[body_start..].to_owned()) +} From 7258cc7c0a0952fe085fdedadb105a593f451802 Mon Sep 17 00:00:00 2001 From: obchain Date: Thu, 23 Apr 2026 20:01:07 +0530 Subject: [PATCH 11/11] feat(metrics): mempool, gas, and RPC latency series Adds three families of operator-facing Prometheus series and the instrumentation call-sites that feed them: Mempool (closes #300): - charon_mempool_pending_oracle_updates (gauge, chain) - charon_mempool_drained_total (counter, chain) - charon_mempool_websocket_reconnects_total (counter, chain) Emitted from PendingCache::insert/drain and MempoolMonitor::run's reconnect branch. PendingCache/MempoolMonitor now carry the chain short-name so every sample labels consistently with the scanner. Gas (closes #301): - charon_gas_base_fee_wei (gauge, chain) - charon_gas_priority_fee_wei (gauge, chain) - charon_gas_max_fee_wei (gauge, chain) - charon_gas_ceiling_skips_total (counter, chain, reason) Emitted from GasOracle::fetch_params. max_fee_wei is intentionally left untouched on the ceiling-skip branch so the gauge's semantic stays "what the last *submitted* tx carried". RPC latency (closes #302): - charon_rpc_call_duration_seconds (histogram, method, endpoint_kind) - charon_rpc_errors_total (counter, method, error_kind) - charon_rpc_connection_reconnects_total (counter, endpoint_kind) Adds a time_rpc(method, endpoint_kind, fut) -> T ergonomic helper that wraps any future and records its wall-clock duration into the histogram. Adopted at the eth_call simulation gate, the eth_sendRawTransaction submit path, and the pending-tx lookup in the mempool monitor. Block listener and mempool monitor also bump the reconnect counter on every reconnect attempt. Histogram buckets 1ms..30s are justified in rustdoc. All new constants and helpers carry rustdoc; bucket-registration errors continue to flow through MetricsError::BucketConfig. Unit tests exercise every new helper label combination and assert time_rpc preserves the inner future's output on both branches; the scrape integration test now asserts each new metric name and a representative label pair round-trips through the exporter. --- Cargo.lock | 2 + crates/charon-executor/Cargo.toml | 1 + crates/charon-executor/src/gas.rs | 48 ++- crates/charon-executor/src/simulation.rs | 21 +- crates/charon-executor/src/submit.rs | 37 ++- crates/charon-metrics/src/lib.rs | 381 +++++++++++++++++++++++ crates/charon-metrics/tests/scrape.rs | 72 +++++ crates/charon-scanner/Cargo.toml | 1 + crates/charon-scanner/src/listener.rs | 9 + crates/charon-scanner/src/mempool.rs | 91 +++++- 10 files changed, 652 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0da6f1..2e6691b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1158,6 +1158,7 @@ dependencies = [ "alloy", "anyhow", "charon-core", + "charon-metrics", "tokio", "tracing", ] @@ -1207,6 +1208,7 @@ dependencies = [ "alloy", "anyhow", "charon-core", + "charon-metrics", "dashmap", "dotenvy", "futures-util", diff --git a/crates/charon-executor/Cargo.toml b/crates/charon-executor/Cargo.toml index 229ab57..afa384c 100644 --- a/crates/charon-executor/Cargo.toml +++ b/crates/charon-executor/Cargo.toml @@ -7,6 +7,7 @@ description = "Transaction builder, simulator, and broadcaster for Charon" [dependencies] charon-core = { workspace = true } +charon-metrics = { workspace = true } alloy = { workspace = true } anyhow = { workspace = true } tracing = { workspace = true } diff --git a/crates/charon-executor/src/gas.rs b/crates/charon-executor/src/gas.rs index 958b015..f3d620d 100644 --- a/crates/charon-executor/src/gas.rs +++ b/crates/charon-executor/src/gas.rs @@ -34,8 +34,15 @@ pub struct GasParams { } /// Per-chain gas oracle. Construct once per chain at startup. -#[derive(Debug, Clone, Copy)] +/// +/// Owns the chain's short name (matches the `[chain.]` key in +/// config) so every gas-metric emission carries a stable `chain` +/// label — same string the scanner uses, so dashboards can join the +/// executor's gas panels against scanner panels on a single label. +#[derive(Debug, Clone)] pub struct GasOracle { + /// Short chain name used as the `chain` metric label. + chain: String, /// Drop the tx if `max_fee_per_gas` exceeds this (gwei). max_gas_gwei: u64, /// EIP-1559 priority fee, gwei. @@ -43,8 +50,13 @@ pub struct GasOracle { } impl GasOracle { - pub fn new(max_gas_gwei: u64, priority_fee_gwei: u64) -> Self { + /// Build an oracle for one chain. + /// + /// `chain` is the short name used as the `chain` metric label + /// across the scanner/executor stacks — typically `"bnb"` on BSC. + pub fn new(chain: impl Into, max_gas_gwei: u64, priority_fee_gwei: u64) -> Self { Self { + chain: chain.into(), max_gas_gwei, priority_fee_gwei, } @@ -53,6 +65,15 @@ impl GasOracle { /// Read the latest base fee, bump it 25 %, attach the priority fee. /// Returns `Ok(None)` when the resulting `max_fee_per_gas` exceeds /// the configured ceiling — caller should skip the opportunity. + /// + /// Emits three gauges on every successful read + /// ([`charon_gas_base_fee_wei`](charon_metrics::names::GAS_BASE_FEE_WEI), + /// [`charon_gas_priority_fee_wei`](charon_metrics::names::GAS_PRIORITY_FEE_WEI), + /// [`charon_gas_max_fee_wei`](charon_metrics::names::GAS_MAX_FEE_WEI)) + /// and increments + /// [`charon_gas_ceiling_skips_total`](charon_metrics::names::GAS_CEILING_SKIPS_TOTAL) + /// on the ceiling-hit branch. Each sample is labelled with the + /// chain name this oracle was built for. Issue #301. pub async fn fetch_params(&self, provider: &P) -> Result> where P: Provider, @@ -73,11 +94,23 @@ impl GasOracle { .context("gas oracle: header has no base_fee_per_gas (pre-EIP-1559 chain?)")? .into(); + // Record the raw `baseFeePerGas` reading before any + // checked-arithmetic branch can bail — a chain that's mid + // base-fee spike is exactly when we want the gauge to move. + charon_metrics::set_gas_base_fee_wei(&self.chain, base_fee); + let max_fee = base_fee .checked_mul(BASE_FEE_BUMP_PCT) .context("gas oracle: max-fee multiplication overflow")? / BPS_DIV; let max_fee_gwei = max_fee / ONE_GWEI; + let max_priority_fee_per_gas = u128::from(self.priority_fee_gwei) * ONE_GWEI; + + // Priority fee is configured (not RPC-derived) but still + // surfaced on the gauge so operators can confirm the bot + // picked up the expected per-chain value after a config + // reload. + charon_metrics::set_gas_priority_fee_wei(&self.chain, max_priority_fee_per_gas); if max_fee_gwei > u128::from(self.max_gas_gwei) { warn!( @@ -85,14 +118,23 @@ impl GasOracle { ceiling_gwei = self.max_gas_gwei, "gas exceeds configured ceiling — skipping tx" ); + charon_metrics::record_gas_ceiling_skip( + &self.chain, + charon_metrics::gas_skip_reason::CEILING, + ); + // Leave `GAS_MAX_FEE_WEI` untouched on the skip branch: + // the gauge's semantic is "max fee the last *submitted* + // tx carried", so setting it from a rejected proposal + // would make dashboards lie about the real worst-case + // submission cost. return Ok(None); } - let max_priority_fee_per_gas = u128::from(self.priority_fee_gwei) * ONE_GWEI; let params = GasParams { max_fee_per_gas: max_fee, max_priority_fee_per_gas, }; + charon_metrics::set_gas_max_fee_wei(&self.chain, params.max_fee_per_gas); debug!( base_fee_gwei = base_fee / ONE_GWEI, max_fee_gwei = params.max_fee_per_gas / ONE_GWEI, diff --git a/crates/charon-executor/src/simulation.rs b/crates/charon-executor/src/simulation.rs index f6431d5..d7a3b8a 100644 --- a/crates/charon-executor/src/simulation.rs +++ b/crates/charon-executor/src/simulation.rs @@ -14,6 +14,7 @@ use alloy::primitives::{Address, Bytes}; use alloy::providers::Provider; use alloy::rpc::types::TransactionRequest; use anyhow::Result; +use charon_metrics::{endpoint_kind, record_rpc_error, rpc_error, rpc_method, time_rpc}; use tracing::{debug, warn}; /// Stateless simulator — holds the sender + target contract address @@ -48,7 +49,19 @@ impl Simulator { .to(self.liquidator) .input(calldata.into()); - match provider.call(&req).await { + // `time_rpc` owns the histogram sample — the error branch + // below classifies the failure separately. The simulator + // talks to the same chain RPC the scanner uses (no private + // submission relay), so the endpoint kind is `public`. + // `provider.call(..)` in alloy 0.8 returns an `EthCall` + // builder (`IntoFuture`, not `Future`); wrap the `.await` + // in a plain async block so `time_rpc` sees a `Future`. + let outcome = time_rpc(rpc_method::ETH_CALL, endpoint_kind::PUBLIC, async { + provider.call(&req).await + }) + .await; + + match outcome { Ok(out) => { debug!( sender = %self.sender, @@ -60,6 +73,12 @@ impl Simulator { } Err(err) => { let msg = format!("{err:#}"); + // A reverted simulation is a deterministic RPC-level + // rejection (the node executed the call and returned + // a failure), which is the textbook `rejected` + // classification — not a transport-layer timeout or + // connection drop. + record_rpc_error(rpc_method::ETH_CALL, rpc_error::REJECTED); warn!( sender = %self.sender, target = %self.liquidator, diff --git a/crates/charon-executor/src/submit.rs b/crates/charon-executor/src/submit.rs index b27e6f4..d8e29fc 100644 --- a/crates/charon-executor/src/submit.rs +++ b/crates/charon-executor/src/submit.rs @@ -15,6 +15,7 @@ use alloy::primitives::{Bytes, TxHash}; use alloy::providers::{Provider, ProviderBuilder, RootProvider}; use alloy::transports::BoxTransport; use anyhow::{Context, Result}; +use charon_metrics::{endpoint_kind, record_rpc_error, rpc_error, rpc_method, time_rpc}; use tracing::{info, warn}; /// Default submission timeout per attempt (6 s ≈ 8 BSC blocks). @@ -61,10 +62,36 @@ impl Submitter { /// Submit raw signed transaction bytes. Retries once on timeout. /// Non-timeout RPC errors (revert, bad nonce, bad signature) fail /// immediately — no point retrying a deterministic rejection. + /// + /// Each attempt is instrumented via + /// [`charon_metrics::time_rpc`] under + /// [`rpc_method::ETH_SEND_RAW_TRANSACTION`] / + /// [`endpoint_kind::PRIVATE`]; failures are classified into + /// [`rpc_error::TIMEOUT`] vs [`rpc_error::REJECTED`] so dashboards + /// can separate flaky-relay symptoms from deterministic + /// rejections (bad nonce, revert, bad signature). Issue #302. pub async fn submit(&self, raw: Bytes) -> Result { for attempt in 1..=MAX_ATTEMPTS { let fut = self.provider.send_raw_transaction(&raw); - match tokio::time::timeout(self.timeout, fut).await { + // `time_rpc` is wrapped *inside* the outer + // `tokio::time::timeout`, so its histogram sample only + // lands when the provider future resolves within the + // submission ceiling (success + rejection branches). A + // hard timeout skips the sample — by construction it + // would be `~self.timeout` and carries no extra signal; + // the timeout branch is covered by + // `charon_rpc_errors_total{error_kind="timeout"}` + // instead. + let timed = tokio::time::timeout( + self.timeout, + time_rpc( + rpc_method::ETH_SEND_RAW_TRANSACTION, + endpoint_kind::PRIVATE, + fut, + ), + ) + .await; + match timed { Ok(Ok(pending)) => { let hash = *pending.tx_hash(); info!( @@ -76,6 +103,10 @@ impl Submitter { return Ok(hash); } Ok(Err(err)) => { + record_rpc_error( + rpc_method::ETH_SEND_RAW_TRANSACTION, + rpc_error::REJECTED, + ); warn!( endpoint = %self.endpoint, attempt, @@ -85,6 +116,10 @@ impl Submitter { return Err(anyhow::anyhow!("submit RPC error: {err}")); } Err(_) => { + record_rpc_error( + rpc_method::ETH_SEND_RAW_TRANSACTION, + rpc_error::TIMEOUT, + ); warn!( endpoint = %self.endpoint, attempt, diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs index d26caa1..b21ab8c 100644 --- a/crates/charon-metrics/src/lib.rs +++ b/crates/charon-metrics/src/lib.rs @@ -17,8 +17,10 @@ //! # } //! ``` +use std::future::Future; use std::net::SocketAddr; use std::sync::OnceLock; +use std::time::Instant; use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; use metrics_exporter_prometheus::{ @@ -98,6 +100,22 @@ const PROFIT_USD_CENTS_BUCKETS: &[f64] = &[ 1_000_000.0, ]; +// Bucket boundaries for `charon_rpc_call_duration_seconds`. +// Spans sub-millisecond LAN-local responses up to the 30 s +// provider-level timeout ceiling so call-site timeouts still +// surface as a finite bucket instead of `+Inf`. Lower bound of +// 1 ms sits just above the jitter floor of a tokio timer on a +// warm runtime — any sample finer than that is noise rather than +// a signal about upstream latency. Upper bound of 30 s matches +// the hard deadline used by submit/simulate call sites: a call +// that outruns 30 s has timed out regardless, so anything bigger +// would only widen `+Inf` overflow without adding resolution. +// Logarithmic spacing keeps p50/p95/p99 resolution meaningful +// across four decades. +const RPC_CALL_DURATION_SECONDS_BUCKETS: &[f64] = &[ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, +]; + /// Single-source-of-truth metric names. Kept as constants so call /// sites, dashboard JSON, and alert rules refer to the same strings. pub mod names { @@ -115,6 +133,31 @@ pub mod names { pub const EXECUTOR_PROFIT_USD_CENTS: &str = "charon_executor_profit_usd_cents"; pub const EXECUTOR_QUEUE_DEPTH: &str = "charon_executor_queue_depth"; + // Mempool monitor (issue #300). Counts pending oracle updates + // observed in the mempool, oracle updates drained at block + // boundaries, and upstream websocket reconnect attempts — the + // third doubles as a health signal for flaky pubsub upstreams. + pub const MEMPOOL_PENDING_ORACLE_UPDATES: &str = "charon_mempool_pending_oracle_updates"; + pub const MEMPOOL_DRAINED_TOTAL: &str = "charon_mempool_drained_total"; + pub const MEMPOOL_WS_RECONNECTS_TOTAL: &str = "charon_mempool_websocket_reconnects_total"; + + // Gas oracle (issue #301). Latest EIP-1559 base fee, priority + // fee used on the last submission attempt, and resulting + // maxFeePerGas — plus a counter for opportunities dropped + // because `max_fee_per_gas` exceeded the configured ceiling. + pub const GAS_BASE_FEE_WEI: &str = "charon_gas_base_fee_wei"; + pub const GAS_PRIORITY_FEE_WEI: &str = "charon_gas_priority_fee_wei"; + pub const GAS_MAX_FEE_WEI: &str = "charon_gas_max_fee_wei"; + pub const GAS_CEILING_SKIPS_TOTAL: &str = "charon_gas_ceiling_skips_total"; + + // RPC instrumentation (issue #302). Histogram of call durations + // by method + endpoint kind, error counters partitioned by + // failure mode, and a reconnect counter so upstream transport + // churn is observable without log grepping. + pub const RPC_CALL_DURATION_SECONDS: &str = "charon_rpc_call_duration_seconds"; + pub const RPC_ERRORS_TOTAL: &str = "charon_rpc_errors_total"; + pub const RPC_RECONNECTS_TOTAL: &str = "charon_rpc_connection_reconnects_total"; + // Build / runtime pub const BUILD_INFO: &str = "charon_build_info"; } @@ -143,6 +186,61 @@ pub mod drop_stage { pub const BUILD: &str = "build"; } +/// Endpoint-kind label used on every RPC metric (issue #302). +/// "public" covers node providers on the open internet (Alchemy, +/// Infura, a self-hosted archive). "private" covers order-flow +/// aware submission paths (bloxroute, blocknative, sequencer +/// endpoints on L2s) whose latency profile and failure modes +/// differ sharply from public reads. Splitting on this label is +/// how operators see "my private relay is degrading" without +/// having to cross-reference logs. +pub mod endpoint_kind { + pub const PUBLIC: &str = "public"; + pub const PRIVATE: &str = "private"; +} + +/// RPC method label on `charon_rpc_call_duration_seconds` and +/// `charon_rpc_errors_total`. Only the methods the bot actually +/// calls are listed — new methods should be added here as call +/// sites adopt the [`time_rpc`] wrapper. Freeform methods can be +/// passed as `&str` too; the constants exist so dashboards and +/// alert rules can reference the same string a call site uses. +pub mod rpc_method { + pub const ETH_CALL: &str = "eth_call"; + pub const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber"; + pub const ETH_SEND_RAW_TRANSACTION: &str = "eth_sendRawTransaction"; + pub const ETH_GET_LOGS: &str = "eth_getLogs"; + pub const ETH_GET_BLOCK_NUMBER: &str = "eth_blockNumber"; + pub const ETH_ESTIMATE_GAS: &str = "eth_estimateGas"; + pub const ETH_GET_TRANSACTION_BY_HASH: &str = "eth_getTransactionByHash"; + pub const ETH_SUBSCRIBE_NEW_HEADS: &str = "eth_subscribe_newHeads"; + pub const ETH_SUBSCRIBE_PENDING_TX: &str = "eth_subscribe_newPendingTransactions"; +} + +/// Failure mode label on `charon_rpc_errors_total`. Kept as a +/// closed three-way enum so alert rules can pivot on `error_kind` +/// without fuzzy matching on log strings. Call sites classify +/// their own errors into one of these before recording — the +/// mapping from `alloy` / `anyhow` errors to a kind is a +/// per-call-site judgement (a `tokio::time::timeout` firing is +/// [`TIMEOUT`], an RPC-level rejection is [`REJECTED`], an +/// `io::Error` or dropped subscription stream is +/// [`CONNECTION_LOST`]). +pub mod rpc_error { + pub const TIMEOUT: &str = "timeout"; + pub const REJECTED: &str = "rejected"; + pub const CONNECTION_LOST: &str = "connection_lost"; +} + +/// Reason label on `charon_gas_ceiling_skips_total`. `CEILING` +/// is the only reason the current gas oracle emits — the label +/// exists so future reasons (e.g. `BASE_FEE_SPIKE`, +/// `PRIORITY_FEE_MISSING`) can be added without reshaping the +/// metric. +pub mod gas_skip_reason { + pub const CEILING: &str = "ceiling"; +} + /// Install the global Prometheus recorder and start the HTTP listener. /// /// Idempotent: the first successful call installs the recorder and @@ -218,6 +316,14 @@ pub fn install(bind: SocketAddr) -> Result> { metric: names::EXECUTOR_PROFIT_USD_CENTS, source, })? + .set_buckets_for_metric( + Matcher::Full(names::RPC_CALL_DURATION_SECONDS.to_string()), + RPC_CALL_DURATION_SECONDS_BUCKETS, + ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::RPC_CALL_DURATION_SECONDS, + source, + })? .build() .map_err(|source| MetricsError::InstallFailed { bind, source })?; @@ -283,6 +389,47 @@ fn describe_all() { names::EXECUTOR_QUEUE_DEPTH, "Current depth of the profit-ordered opportunity queue." ); + describe_gauge!( + names::MEMPOOL_PENDING_ORACLE_UPDATES, + "Pending Venus oracle updates currently observed in the mempool (pre-signed liquidations armed)." + ); + describe_counter!( + names::MEMPOOL_DRAINED_TOTAL, + "Pre-signed liquidations drained from the mempool cache at block confirmation, partitioned by chain." + ); + describe_counter!( + names::MEMPOOL_WS_RECONNECTS_TOTAL, + "Reconnect attempts against the pending-transactions websocket subscription (flaky-upstream signal)." + ); + describe_gauge!( + names::GAS_BASE_FEE_WEI, + "Latest EIP-1559 `baseFeePerGas` observed for the chain, in wei." + ); + describe_gauge!( + names::GAS_PRIORITY_FEE_WEI, + "Priority fee (tip) used on the most recent gas-params resolution, in wei." + ); + describe_gauge!( + names::GAS_MAX_FEE_WEI, + "`maxFeePerGas` used on the most recent submission attempt, in wei." + ); + describe_counter!( + names::GAS_CEILING_SKIPS_TOTAL, + "Opportunities skipped because the resolved `max_fee_per_gas` exceeded the configured ceiling." + ); + describe_histogram!( + names::RPC_CALL_DURATION_SECONDS, + metrics::Unit::Seconds, + "Wall-clock duration of one RPC call, partitioned by method and endpoint kind (public vs private)." + ); + describe_counter!( + names::RPC_ERRORS_TOTAL, + "RPC call failures partitioned by method and error kind (timeout, rejected, connection_lost)." + ); + describe_counter!( + names::RPC_RECONNECTS_TOTAL, + "Reconnect attempts against an RPC transport (websocket or HTTP keep-alive), partitioned by endpoint kind." + ); describe_gauge!( names::BUILD_INFO, "Build metadata as labels; value is always 1." @@ -361,6 +508,187 @@ pub fn set_build_info(version: &str, git_sha: &str) { .set(1.0); } +// ─── Mempool helpers (issue #300) ───────────────────────────────────── + +/// Set the gauge of pending oracle updates the mempool monitor is +/// currently tracking. Called on insert/drain so the dashboard value +/// tracks the live cache size rather than a stale counter. Gauge (not +/// counter) because the quantity is "how many right now", which must +/// fall back to zero between blocks. +pub fn set_mempool_pending_oracle_updates(chain: &str, count: u64) { + gauge!( + names::MEMPOOL_PENDING_ORACLE_UPDATES, + "chain" => chain.to_owned() + ) + .set(count as f64); +} + +/// Record `drained` pre-signed liquidations drained from the mempool +/// cache at a block boundary. Zero-valued drains are a legitimate +/// signal (nothing to do this block) so the call site records +/// unconditionally — Prometheus handles zero-delta increments. +pub fn record_mempool_drained(chain: &str, drained: u64) { + counter!( + names::MEMPOOL_DRAINED_TOTAL, + "chain" => chain.to_owned() + ) + .increment(drained); +} + +/// Record one websocket reconnect attempt against the pending-tx +/// subscription. Emitted every time the monitor loop falls through +/// to its backoff branch — a high rate here is the operator's cue +/// that the upstream pubsub endpoint is flaky. +pub fn record_mempool_ws_reconnect(chain: &str) { + counter!( + names::MEMPOOL_WS_RECONNECTS_TOTAL, + "chain" => chain.to_owned() + ) + .increment(1); +} + +// ─── Gas oracle helpers (issue #301) ────────────────────────────────── + +/// Set the latest observed `baseFeePerGas` for the chain, in wei. +/// Values are passed as `u128` to survive the 1559 full range without +/// pre-truncation; the gauge is cast to `f64` at emission time, same +/// as every other Prometheus gauge — sub-wei precision is never +/// actionable for ops. +pub fn set_gas_base_fee_wei(chain: &str, wei: u128) { + gauge!( + names::GAS_BASE_FEE_WEI, + "chain" => chain.to_owned() + ) + .set(wei as f64); +} + +/// Set the priority fee used on the last gas-params resolution, +/// in wei. +pub fn set_gas_priority_fee_wei(chain: &str, wei: u128) { + gauge!( + names::GAS_PRIORITY_FEE_WEI, + "chain" => chain.to_owned() + ) + .set(wei as f64); +} + +/// Set the `maxFeePerGas` used on the last submission attempt, +/// in wei. +pub fn set_gas_max_fee_wei(chain: &str, wei: u128) { + gauge!( + names::GAS_MAX_FEE_WEI, + "chain" => chain.to_owned() + ) + .set(wei as f64); +} + +/// Record one opportunity skipped by the gas oracle because the +/// resolved `max_fee_per_gas` exceeded the configured ceiling (or +/// any future skip reason added to [`gas_skip_reason`]). +pub fn record_gas_ceiling_skip(chain: &str, reason: &str) { + counter!( + names::GAS_CEILING_SKIPS_TOTAL, + "chain" => chain.to_owned(), + "reason" => reason.to_owned() + ) + .increment(1); +} + +// ─── RPC instrumentation (issue #302) ───────────────────────────────── + +/// Observe one completed RPC call's wall-clock duration. +/// +/// Most call sites should wrap their provider invocation in +/// [`time_rpc`] instead of calling this directly — the wrapper +/// handles `Instant::now()` and the label plumbing. Expose this +/// helper for call sites that already have a `Duration` in hand +/// (e.g. batched calls that track a single elapsed span across +/// multiple internal retries). +pub fn record_rpc_call(method: &str, endpoint_kind: &str, seconds: f64) { + histogram!( + names::RPC_CALL_DURATION_SECONDS, + "method" => method.to_owned(), + "endpoint_kind" => endpoint_kind.to_owned() + ) + .record(seconds); +} + +/// Record one RPC call failure. `error_kind` must be one of the +/// constants in [`rpc_error`]; freeform strings are accepted but +/// break dashboard pivots, so callers should funnel their errors +/// through a classifier. +pub fn record_rpc_error(method: &str, error_kind: &str) { + counter!( + names::RPC_ERRORS_TOTAL, + "method" => method.to_owned(), + "error_kind" => error_kind.to_owned() + ) + .increment(1); +} + +/// Record one reconnect attempt against an RPC transport. Emitted +/// by pubsub listeners (block listener, mempool monitor) every +/// time their outer reconnect loop fires — cumulative rate is the +/// operator's "upstream is unstable" signal. +pub fn record_rpc_reconnect(endpoint_kind: &str) { + counter!( + names::RPC_RECONNECTS_TOTAL, + "endpoint_kind" => endpoint_kind.to_owned() + ) + .increment(1); +} + +/// Wrap one RPC call, record its wall-clock duration into +/// [`names::RPC_CALL_DURATION_SECONDS`], and return the call's +/// own result untouched. +/// +/// This is the single preferred instrumentation pattern for adding +/// RPC latency / error visibility to a call site — prefer it over +/// sprinkling `record_rpc_call` / `record_rpc_error` directly. It +/// keeps the happy path a one-liner and guarantees the duration +/// sample always lands in the histogram, even on the error branch +/// (where latency-to-error is still useful context). Errors are +/// *not* auto-classified — the call site knows best whether an +/// `alloy` error is a [`rpc_error::TIMEOUT`] or a +/// [`rpc_error::REJECTED`]; pair this helper with one +/// [`record_rpc_error`] call on the error branch. +/// +/// Example: +/// ```no_run +/// # async fn demo() -> anyhow::Result<()> { +/// use charon_metrics::{endpoint_kind, rpc_error, rpc_method, record_rpc_error, time_rpc}; +/// # async fn eth_call() -> Result<(), anyhow::Error> { Ok(()) } +/// let result = time_rpc( +/// rpc_method::ETH_CALL, +/// endpoint_kind::PUBLIC, +/// eth_call(), +/// ) +/// .await; +/// if let Err(err) = &result { +/// record_rpc_error(rpc_method::ETH_CALL, rpc_error::REJECTED); +/// eprintln!("{err}"); +/// } +/// # Ok(()) +/// # } +/// ``` +/// +/// New call sites adopting RPC instrumentation should follow +/// this pattern. An alloy middleware/layer would be cleaner in +/// principle, but the `alloy` 0.8 `Provider` trait does not +/// expose a stable middleware hook at the method-name layer — a +/// per-call-site wrapper lets us carry the method label verbatim +/// without leaking through an intermediate `RequestPacket` whose +/// method string is internal-only. +pub async fn time_rpc(method: &str, endpoint_kind: &str, fut: F) -> T +where + F: Future, +{ + let start = Instant::now(); + let out = fut.await; + record_rpc_call(method, endpoint_kind, start.elapsed().as_secs_f64()); + out +} + #[cfg(test)] mod tests { use super::*; @@ -474,5 +802,58 @@ mod tests { record_opportunity_dropped("bnb", drop_stage::BUILD); set_queue_depth(3); set_build_info("0.1.0", "deadbeef"); + + // Mempool (#300) + set_mempool_pending_oracle_updates("bnb", 4); + record_mempool_drained("bnb", 3); + record_mempool_drained("bnb", 0); + record_mempool_ws_reconnect("bnb"); + + // Gas (#301) + set_gas_base_fee_wei("bnb", 3_000_000_000); + set_gas_priority_fee_wei("bnb", 1_000_000_000); + set_gas_max_fee_wei("bnb", 5_000_000_000); + record_gas_ceiling_skip("bnb", gas_skip_reason::CEILING); + + // RPC (#302) + record_rpc_call(rpc_method::ETH_CALL, endpoint_kind::PUBLIC, 0.012); + record_rpc_call( + rpc_method::ETH_SEND_RAW_TRANSACTION, + endpoint_kind::PRIVATE, + 0.045, + ); + record_rpc_error(rpc_method::ETH_CALL, rpc_error::TIMEOUT); + record_rpc_error(rpc_method::ETH_GET_LOGS, rpc_error::REJECTED); + record_rpc_error( + rpc_method::ETH_GET_BLOCK_BY_NUMBER, + rpc_error::CONNECTION_LOST, + ); + record_rpc_reconnect(endpoint_kind::PUBLIC); + record_rpc_reconnect(endpoint_kind::PRIVATE); + } + + /// `time_rpc` must record a non-zero elapsed sample into the + /// histogram and return the wrapped future's output unchanged. + /// Covers the ergonomic-wrapper contract: callers rely on it + /// being a drop-in around `await`. + #[tokio::test] + async fn time_rpc_returns_inner_output_and_records_duration() { + let out = time_rpc(rpc_method::ETH_CALL, endpoint_kind::PUBLIC, async { + tokio::task::yield_now().await; + 42u32 + }) + .await; + assert_eq!(out, 42); + + // Error case: the wrapper must not swallow errors from the + // inner future — callers chain `record_rpc_error` on the Err + // branch and would lose visibility otherwise. + let err: std::result::Result<(), &'static str> = time_rpc( + rpc_method::ETH_SEND_RAW_TRANSACTION, + endpoint_kind::PRIVATE, + async { Err("rpc down") }, + ) + .await; + assert_eq!(err, Err("rpc down")); } } diff --git a/crates/charon-metrics/tests/scrape.rs b/crates/charon-metrics/tests/scrape.rs index 037467f..3dfacc5 100644 --- a/crates/charon-metrics/tests/scrape.rs +++ b/crates/charon-metrics/tests/scrape.rs @@ -51,6 +51,31 @@ async fn scrape_returns_valid_prometheus_text() { record_block_scanned("bnb"); charon_metrics::set_position_bucket("bnb", charon_metrics::bucket::HEALTHY, 1); + // Mempool (#300), gas (#301), and RPC (#302) series — record + // at least one sample per series so the scrape surfaces the + // metric name and its expected label set. Each of these is the + // end-to-end regression gate for a typo in the name constant + // or a missing `describe_*` registration. + charon_metrics::set_mempool_pending_oracle_updates("bnb", 2); + charon_metrics::record_mempool_drained("bnb", 5); + charon_metrics::record_mempool_ws_reconnect("bnb"); + + charon_metrics::set_gas_base_fee_wei("bnb", 3_000_000_000); + charon_metrics::set_gas_priority_fee_wei("bnb", 1_000_000_000); + charon_metrics::set_gas_max_fee_wei("bnb", 5_000_000_000); + charon_metrics::record_gas_ceiling_skip("bnb", charon_metrics::gas_skip_reason::CEILING); + + charon_metrics::record_rpc_call( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::endpoint_kind::PUBLIC, + 0.012, + ); + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::rpc_error::TIMEOUT, + ); + charon_metrics::record_rpc_reconnect(charon_metrics::endpoint_kind::PRIVATE); + // Give the recorder a beat to flush the new sample into the // renderer's internal state. tokio::time::sleep(Duration::from_millis(50)).await; @@ -92,6 +117,53 @@ async fn scrape_returns_valid_prometheus_text() { body.contains("chain=\"bnb\""), "scrape body missing expected `chain=\"bnb\"` label; got:\n{body}" ); + + // Each new series (#300 / #301 / #302) must appear by name so + // typos in the constants or a missing describe_* call surface + // here. Label values exercised above are checked for at least + // one representative sample per metric family. + for name in [ + names::MEMPOOL_PENDING_ORACLE_UPDATES, + names::MEMPOOL_DRAINED_TOTAL, + names::MEMPOOL_WS_RECONNECTS_TOTAL, + names::GAS_BASE_FEE_WEI, + names::GAS_PRIORITY_FEE_WEI, + names::GAS_MAX_FEE_WEI, + names::GAS_CEILING_SKIPS_TOTAL, + names::RPC_CALL_DURATION_SECONDS, + names::RPC_ERRORS_TOTAL, + names::RPC_RECONNECTS_TOTAL, + ] { + assert!( + body.contains(name), + "scrape body missing `{name}`; got:\n{body}" + ); + } + + // Label-set regression gates: one representative pair per + // metric family. A drift in label naming (e.g. renaming + // `endpoint_kind` to `kind`) otherwise silently breaks every + // dashboard that pivots on the label. + assert!( + body.contains("reason=\"ceiling\""), + "gas ceiling-skip counter missing `reason` label; got:\n{body}" + ); + assert!( + body.contains("method=\"eth_call\""), + "rpc duration histogram missing `method` label; got:\n{body}" + ); + assert!( + body.contains("endpoint_kind=\"public\""), + "rpc duration histogram missing `endpoint_kind` label; got:\n{body}" + ); + assert!( + body.contains("error_kind=\"timeout\""), + "rpc error counter missing `error_kind` label; got:\n{body}" + ); + assert!( + body.contains("endpoint_kind=\"private\""), + "rpc reconnect counter missing `endpoint_kind=\"private\"`; got:\n{body}" + ); } /// Minimal HTTP/1.1 GET over raw TCP. Returns the response body diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index 234f833..153119f 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -7,6 +7,7 @@ description = "Chain listener and health-factor scanner for Charon" [dependencies] charon-core = { workspace = true } +charon-metrics = { workspace = true } alloy = { workspace = true } anyhow = { workspace = true } tokio = { workspace = true } diff --git a/crates/charon-scanner/src/listener.rs b/crates/charon-scanner/src/listener.rs index ebf8ddc..cbce75e 100644 --- a/crates/charon-scanner/src/listener.rs +++ b/crates/charon-scanner/src/listener.rs @@ -56,6 +56,12 @@ impl BlockListener { /// Run the listener forever. Reconnects with exponential backoff on /// any connection or subscription error. Returns `Ok(())` only if the /// receiving side of the channel is dropped. + /// + /// Increments + /// [`charon_rpc_connection_reconnects_total`](charon_metrics::names::RPC_RECONNECTS_TOTAL) + /// under `endpoint_kind="public"` on every reconnect attempt + /// (issue #302) — the `newHeads` stream rides the chain's + /// public pubsub endpoint. pub async fn run(self) -> Result<()> { let mut backoff = Duration::from_secs(1); loop { @@ -66,6 +72,9 @@ impl BlockListener { return Ok(()); } Err(err) => { + charon_metrics::record_rpc_reconnect( + charon_metrics::endpoint_kind::PUBLIC, + ); warn!( chain = %self.name, error = ?err, diff --git a/crates/charon-scanner/src/mempool.rs b/crates/charon-scanner/src/mempool.rs index 24c0a19..27f6390 100644 --- a/crates/charon-scanner/src/mempool.rs +++ b/crates/charon-scanner/src/mempool.rs @@ -115,8 +115,15 @@ pub struct PreSignedLiquidation { /// Pure decode + pre-sign storage. Separated from the RPC layer so /// tests can exercise the selector logic and TTL semantics without /// opening a socket. +/// +/// Holds the chain's short name so the mempool metrics (#300) can +/// be labelled with the same `chain` key the scanner and gas oracle +/// use. Default constructors fall back to `"unknown"` rather than +/// refusing construction — unit tests for the decoder don't care, +/// and production call sites always pass an explicit name. #[derive(Debug)] pub struct PendingCache { + chain: String, oracle: Address, selectors: HashSet>, pending: DashMap, @@ -125,11 +132,13 @@ pub struct PendingCache { impl PendingCache { pub fn new( + chain: impl Into, oracle: Address, selectors: HashSet>, max_pending_age: Duration, ) -> Self { Self { + chain: chain.into(), oracle, selectors, pending: DashMap::new(), @@ -138,13 +147,18 @@ impl PendingCache { } pub fn with_defaults(oracle: Address) -> Self { - Self::new(oracle, default_selectors(), DEFAULT_MAX_PENDING_AGE) + Self::new("unknown", oracle, default_selectors(), DEFAULT_MAX_PENDING_AGE) } pub fn oracle(&self) -> Address { self.oracle } + /// Chain label the cache emits with on every mempool metric. + pub fn chain(&self) -> &str { + &self.chain + } + pub fn is_tracked_selector(&self, selector: FixedBytes<4>) -> bool { self.selectors.contains(&selector) } @@ -153,6 +167,11 @@ impl PendingCache { /// entry for the same borrower — the most recent oracle update /// wins, which is what we want when two updates land in the same /// block window (the later one is what the chain will see). + /// + /// Refreshes the + /// [`charon_mempool_pending_oracle_updates`](charon_metrics::names::MEMPOOL_PENDING_ORACLE_UPDATES) + /// gauge so dashboards track the live cache size even between + /// block-boundary drains. Issue #300. pub fn insert(&self, tx: PreSignedLiquidation) { debug!( borrower = %tx.borrower, @@ -160,6 +179,10 @@ impl PendingCache { "pre-signed liquidation armed" ); self.pending.insert(tx.borrower, tx); + charon_metrics::set_mempool_pending_oracle_updates( + &self.chain, + self.pending.len() as u64, + ); } pub fn pending_len(&self) -> usize { @@ -174,6 +197,12 @@ impl PendingCache { /// older than `max_pending_age_secs`. Called by the block-listener /// task on each `NewBlock` event — pre-signs only live for one /// block window, so anything that didn't fire is stale. + /// + /// Increments + /// [`charon_mempool_drained_total`](charon_metrics::names::MEMPOOL_DRAINED_TOTAL) + /// by the number of fresh entries drained and zeroes the + /// [`charon_mempool_pending_oracle_updates`](charon_metrics::names::MEMPOOL_PENDING_ORACLE_UPDATES) + /// gauge — the cache is empty after a drain. Issue #300. pub fn drain(&self) -> Vec { let now = unix_now(); let max_age = self.max_pending_age_secs; @@ -193,6 +222,11 @@ impl PendingCache { } } debug!(drained = out.len(), "mempool cache drained"); + charon_metrics::record_mempool_drained(&self.chain, out.len() as u64); + charon_metrics::set_mempool_pending_oracle_updates( + &self.chain, + self.pending.len() as u64, + ); out } @@ -228,8 +262,10 @@ pub struct MempoolMonitor { } impl MempoolMonitor { - /// Full-control constructor. + /// Full-control constructor. `chain` is the short chain name the + /// mempool metrics (#300) emit as a `chain` label. pub fn new( + chain: impl Into, provider: Arc>, oracle: Address, selectors: HashSet>, @@ -237,14 +273,24 @@ impl MempoolMonitor { ) -> Self { Self { provider, - cache: Arc::new(PendingCache::new(oracle, selectors, max_pending_age)), + cache: Arc::new(PendingCache::new( + chain, + oracle, + selectors, + max_pending_age, + )), } } /// Convenience: build with [`default_selectors`] and /// [`DEFAULT_MAX_PENDING_AGE`]. - pub fn with_defaults(provider: Arc>, oracle: Address) -> Self { + pub fn with_defaults( + chain: impl Into, + provider: Arc>, + oracle: Address, + ) -> Self { Self::new( + chain, provider, oracle, default_selectors(), @@ -256,6 +302,11 @@ impl MempoolMonitor { self.cache.oracle() } + /// Chain label this monitor emits with. + pub fn chain(&self) -> &str { + self.cache.chain() + } + /// Share the inner cache. Lets the block-listener task call /// [`PendingCache::drain`] without going through the monitor, /// which keeps its `run` loop free to stay on the pending-tx @@ -283,6 +334,16 @@ impl MempoolMonitor { /// Emits one [`OracleUpdate`] per matched tx on `tx`. Returns /// `Ok(())` only when the receiver is dropped — the loop is /// expected to run for the lifetime of the process. + /// + /// Every reconnect attempt bumps + /// [`charon_mempool_websocket_reconnects_total`](charon_metrics::names::MEMPOOL_WS_RECONNECTS_TOTAL) + /// (issue #300) and + /// [`charon_rpc_connection_reconnects_total`](charon_metrics::names::RPC_RECONNECTS_TOTAL) + /// under `endpoint_kind="public"` (issue #302). Both land on the + /// same operator dashboard — the per-chain mempool counter is + /// the actionable signal; the generic RPC reconnect counter + /// aggregates across every pubsub call site for a quick + /// "is upstream degrading?" read. pub async fn run(&self, tx: mpsc::Sender) -> Result<()> { let mut backoff = Duration::from_secs(1); loop { @@ -292,6 +353,10 @@ impl MempoolMonitor { return Ok(()); } Err(err) => { + charon_metrics::record_mempool_ws_reconnect(self.chain()); + charon_metrics::record_rpc_reconnect( + charon_metrics::endpoint_kind::PUBLIC, + ); warn!( oracle = %self.oracle(), error = ?err, @@ -318,14 +383,28 @@ impl MempoolMonitor { while let Some(hash) = stream.next().await { // Lookup failures are common for txs that dropped out of // the pool between the hash push and our get — log at - // debug, keep going. - let full = match self.provider.get_transaction_by_hash(hash).await { + // debug, keep going. `time_rpc` instruments the per-call + // latency (issue #302); the `connection_lost` / transport + // failure branch is classified by `record_rpc_error` + // below so timeouts vs dropped txs stay separable in + // the dashboard. + let full = match charon_metrics::time_rpc( + charon_metrics::rpc_method::ETH_GET_TRANSACTION_BY_HASH, + charon_metrics::endpoint_kind::PUBLIC, + async { self.provider.get_transaction_by_hash(hash).await }, + ) + .await + { Ok(Some(t)) => t, Ok(None) => { debug!(%hash, "pending tx vanished before fetch"); continue; } Err(err) => { + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_GET_TRANSACTION_BY_HASH, + charon_metrics::rpc_error::CONNECTION_LOST, + ); debug!(%hash, ?err, "get_transaction_by_hash failed"); continue; }