diff --git a/.gitignore b/.gitignore index 868b5ea..d3c612e 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,10 @@ node_modules/ .claude/ docs/context.md NOTES.md + +# Markdown policy: only README.md is ever pushed; everything else is local-only. +*.md +!README.md + +# Reference docs copied from backups — local-only, not pushed. +docs/*.html diff --git a/Cargo.lock b/Cargo.lock index 9045ac0..5b4edbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1371,6 +1371,7 @@ dependencies = [ "charon-core", "charon-executor", "charon-flashloan", + "charon-metrics", "charon-protocols", "charon-scanner", "clap", @@ -1404,6 +1405,7 @@ dependencies = [ "alloy", "anyhow", "charon-core", + "charon-metrics", "dotenvy", "httpmock", "reqwest", @@ -1428,6 +1430,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "charon-metrics" +version = "0.1.0" +dependencies = [ + "anyhow", + "metrics", + "metrics-exporter-prometheus", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "charon-protocols" version = "0.1.0" @@ -1450,6 +1464,7 @@ dependencies = [ "anyhow", "async-trait", "charon-core", + "charon-metrics", "dashmap", "dotenvy", "futures-util", @@ -1588,6 +1603,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2192,6 +2216,25 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.4.0", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.13.2" @@ -2377,9 +2420,11 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http 1.4.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2871,6 +2916,42 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper 1.9.0", + "hyper-util", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror 1.0.69", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.5", + "metrics", + "quanta", + "rand 0.9.4", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mio" version = "1.2.0" @@ -3340,6 +3421,21 @@ dependencies = [ "unarray", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -3497,6 +3593,24 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.5", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "recvmsg" version = "1.0.0" @@ -4038,6 +4152,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index 55440fc..a563d69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/charon-core", "crates/charon-executor", "crates/charon-flashloan", + "crates/charon-metrics", "crates/charon-protocols", "crates/charon-scanner", "crates/charon-cli", @@ -61,6 +62,12 @@ rand = "0.8" # Prometheus-style metrics recorder facade metrics = "0.24" +# Prometheus HTTP exporter — feat/22 #222 wires this into charon-metrics +# and the supervised CLI task. `default-features = false` drops the +# built-in tokio runtime hint so the workspace stays on our chosen +# tokio version; `http-listener` is the only feature we need. +metrics-exporter-prometheus = { version = "0.16", default-features = false, features = ["http-listener"] } + # CLI clap = { version = "4", features = ["derive", "env"] } @@ -71,6 +78,7 @@ dotenvy = "0.15" charon-core = { path = "crates/charon-core" } charon-executor = { path = "crates/charon-executor" } charon-flashloan = { path = "crates/charon-flashloan" } +charon-metrics = { path = "crates/charon-metrics" } charon-protocols = { path = "crates/charon-protocols" } charon-scanner = { path = "crates/charon-scanner" } diff --git a/config/default.toml b/config/default.toml index 4e41e9f..84c731e 100644 --- a/config/default.toml +++ b/config/default.toml @@ -66,6 +66,27 @@ data_provider = "0x41393e5e337606dc3821075Af65AeE84D7688CBD" # Populated once CharonLiquidator.sol is deployed on BSC mainnet. Do not # add a zero-address placeholder — config validation rejects it. +# ── Prometheus metrics exporter ─────────────────────────────────────────── +# Exposes `/metrics` in Prometheus text format for Grafana / Grafana +# Cloud scraping. Port 9091 avoids the Prometheus-server default (9090). +# +# Binds to loopback by default so the endpoint is unreachable from the +# public internet on a bare VPS (Hetzner CX22 firewall is permissive on +# inbound). Remote scraping must go through a reverse proxy (nginx, +# caddy, Traefik) that terminates bearer-token or mTLS auth — or a +# compose-internal network, a WireGuard tunnel, etc. +# +# If you must bind to a non-loopback address (e.g. 0.0.0.0), set +# `auth_token` to a non-empty shared secret; config validation rejects +# a public bind without a token configured. The token is compared +# byte-for-byte against the `Authorization: Bearer ` header by +# the reverse proxy — the exporter itself does not yet terminate auth. +[metrics] +enabled = true +bind = "127.0.0.1:9091" +# Uncomment and export CHARON_METRICS_AUTH_TOKEN only if bind is non-loopback: +# auth_token = "${CHARON_METRICS_AUTH_TOKEN}" + # ── Chainlink price feeds (per chain, per asset symbol) ─────────────────── # Only feeds listed here are polled by the PriceCache. Add more as new # Venus markets become relevant. Feed addresses from docs.chain.link. diff --git a/crates/charon-cli/Cargo.toml b/crates/charon-cli/Cargo.toml index 7d57482..8b5fca9 100644 --- a/crates/charon-cli/Cargo.toml +++ b/crates/charon-cli/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" charon-core = { workspace = true } charon-executor = { workspace = true } charon-flashloan = { workspace = true } +charon-metrics = { workspace = true } charon-protocols = { workspace = true } charon-scanner = { workspace = true } alloy = { workspace = true } diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 5019320..6bb3a45 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -61,6 +61,7 @@ use charon_core::{ }; use charon_executor::{Simulator, TxBuilder}; use charon_flashloan::{AaveFlashLoan, FlashLoanRouter}; +use charon_metrics::{bucket, drop_stage, sim_result}; use charon_protocols::VenusAdapter; use charon_scanner::{ BlockListener, ChainEvent, ChainProvider, DEFAULT_MAX_AGE, HealthScanner, MempoolMonitor, @@ -432,6 +433,48 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { let (tx, mut rx) = mpsc::channel::(CHAIN_EVENT_CHANNEL); let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = tokio::task::JoinSet::new(); + // ── Prometheus exporter (#222) ──────────────────────────────────── + // Install the global metrics recorder and push the HTTP-listener + // future onto the same JoinSet that supervises block/mempool tasks + // so a panic in `hyper`/`tokio` inside the exporter triggers the + // same controlled-shutdown path (SIGINT / SIGTERM / supervise). + // `install` returns `Ok(None)` on a repeat call in the same + // process (#223) — nothing to supervise on re-invocation. + if config.metrics.enabled { + match charon_metrics::install(config.metrics.bind) { + Ok(Some(exporter)) => { + charon_metrics::set_build_info( + env!("CARGO_PKG_VERSION"), + option_env!("CHARON_GIT_SHA").unwrap_or("unknown"), + ); + listeners.spawn(async move { + let res: Result<()> = exporter + .await + .map_err(|err| anyhow::anyhow!("metrics exporter: {err:?}")); + ("metrics".to_string(), res) + }); + info!(bind = %config.metrics.bind, "metrics exporter listening on /metrics"); + } + Ok(None) => { + info!( + bind = %config.metrics.bind, + "metrics exporter already installed — skipping duplicate install" + ); + } + Err(err) => { + // Refuse to start with a broken exporter: dashboards + // would silently go dark and an operator would not + // catch it until the next alert fire. + return Err(anyhow::anyhow!( + "failed to install metrics exporter on {}: {err}", + config.metrics.bind + )); + } + } + } else { + info!("metrics exporter disabled via [metrics].enabled = false"); + } + // ── Mempool monitor (#46 / #299) ────────────────────────────────── // Spawn the pending-tx monitor alongside `BlockListener` on the // Venus pipeline's shared provider. Enabled only when the operator @@ -453,7 +496,8 @@ async fn run_listen(config: &Config, borrowers: Vec
) -> Result<()> { (Some(pipeline), Ok(hex)) if !hex.is_empty() => { match Address::from_str(hex.trim()) { Ok(oracle) => { - let monitor = Arc::new(MempoolMonitor::with_defaults( + let monitor = Arc::new(MempoolMonitor::with_defaults_for_chain( + pipeline.chain_name.clone(), pipeline.provider.clone(), oracle, )); @@ -657,6 +701,12 @@ async fn run_block_pipeline( ) { let start = std::time::Instant::now(); + // One-shot block counter per pipeline pass (#222). Counted even + // when `scan_set` is empty — the block still ticked through the + // drain loop and dashboards otherwise silently lose visibility on + // "bot is alive, nothing to scan" intervals. + charon_metrics::record_block_scanned(pipeline.chain_name.as_str()); + // Which borrowers to scan this tick. First real block uses the // operator's seed list; thereafter the scheduler picks buckets // whose cadence fires. @@ -706,6 +756,22 @@ async fn run_block_pipeline( metrics::histogram!("charon_scanner_scan_duration_seconds") .record(start.elapsed().as_secs_f64()); + // Per-bucket position gauges — feat/22 `set_position_bucket`. + // Emitted every tick so dashboards track live bucket sizes + // rather than a stale counter that decays with TTL. + let chain = pipeline.chain_name.as_str(); + charon_metrics::set_position_bucket(chain, bucket::HEALTHY, counts.healthy as u64); + charon_metrics::set_position_bucket( + chain, + bucket::NEAR_LIQ, + counts.near_liquidation as u64, + ); + charon_metrics::set_position_bucket( + chain, + bucket::LIQUIDATABLE, + counts.liquidatable as u64, + ); + // Walk each liquidatable position through the e2e pipeline. Only // opportunities that pass the simulation gate reach the queue. let liquidatable = pipeline.scanner.liquidatable(); @@ -723,6 +789,12 @@ async fn run_block_pipeline( } let queue_len = pipeline.queue.len().await; + // Queue depth + full per-block pipeline duration. The histogram + // uses the domain-scaled buckets registered in + // `charon_metrics::install` so BSC's ~3s heartbeat lands inside + // meaningful quantiles rather than collapsing into `+Inf`. + charon_metrics::set_queue_depth(queue_len as u64); + charon_metrics::observe_block_duration(chain, start.elapsed().as_secs_f64()); info!( chain = %pipeline.chain_name, block, @@ -874,9 +946,12 @@ async fn process_opportunity( } }; + let chain = pipeline.chain_name.as_str(); + // b. Router: pick cheapest flash-loan source for (debt token, // repay amount). let Some(quote) = pipeline.router.route(pos.debt_token, repay).await else { + charon_metrics::record_opportunity_dropped(chain, drop_stage::ROUTER); return Ok(false); }; @@ -899,6 +974,7 @@ async fn process_opportunity( ) { Ok(i) => i, Err(err) => { + charon_metrics::record_opportunity_dropped(chain, drop_stage::PROFIT); debug!(borrower = %pos.borrower, error = ?err, "profit inputs rejected"); return Ok(false); } @@ -906,6 +982,7 @@ async fn process_opportunity( let net = match calculate_profit(&inputs, pipeline.min_profit_usd_1e6) { Ok(n) => n, Err(err) => { + charon_metrics::record_opportunity_dropped(chain, drop_stage::PROFIT); debug!(borrower = %pos.borrower, error = ?err, "profit gate dropped"); return Ok(false); } @@ -952,6 +1029,10 @@ async fn process_opportunity( // broadcast stage assumes every queued entry is known-good // against the latest state. let Some((builder, sim)) = ensure_executor(pipeline.as_ref(), signer_key).await else { + // Scan-only mode: no signer, no simulation, no enqueue. Count + // as a simulation-stage drop so dashboards surface scan-only + // runs without hiding them under router/profit gates. + charon_metrics::record_opportunity_dropped(chain, drop_stage::SIMULATION); debug!( borrower = %pos.borrower, "simulation skipped — no signer configured; opportunity not enqueued" @@ -965,15 +1046,34 @@ async fn process_opportunity( provider: pipeline.provider.as_ref(), }; if let Err(err) = gate.encode_and_simulate(&opp, ¶ms).await { + charon_metrics::record_simulation(chain, sim_result::REVERT); + charon_metrics::record_opportunity_dropped(chain, drop_stage::SIMULATION); debug!(borrower = %pos.borrower, error = ?err, "simulation gate dropped"); return Ok(false); } + charon_metrics::record_simulation(chain, sim_result::OK); - // f. Push to the profit-ordered queue. + // f. Push to the profit-ordered queue. `simulated = true` because + // the production path only reaches here after a successful + // `eth_call` gate — dry-run entries never get here. + let profit_cents = wei_to_usd_cents(opp.net_profit_wei); pipeline.queue.push(opp, block).await; + charon_metrics::record_opportunity_queued(chain, profit_cents, true); Ok(true) } +/// Convert a `net_profit_wei` (debt-token smallest units, assumed +/// 18-decimal stablecoin for v0.1) to USD cents for the profit +/// histogram. Saturates on overflow so a corrupted upper-bound sample +/// never crashes the recorder. Placeholder until the per-token +/// decimals + price bridge lands (#148). +fn wei_to_usd_cents(wei: U256) -> u64 { + // 1 stable unit (18 decimals) ≈ $1 → 100 cents. Divide by 1e16. + let scale = U256::from(10u64).pow(U256::from(16u64)); + let cents = wei / scale; + u64::try_from(cents).unwrap_or(u64::MAX) +} + /// Build the preview [`LiquidationOpportunity`] used as input to /// [`ProfitInputs::from_opportunity`]. The final opportunity stored /// in the queue comes out of [`LiquidationOpportunity::with_profit`] diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index a5e4ff5..8421b9c 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -13,6 +13,7 @@ use secrecy::SecretString; use serde::Deserialize; use std::collections::HashMap; use std::fmt; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; /// Structured error returned by `Config::load` / `Config::from_str`. @@ -73,6 +74,94 @@ pub struct Config { /// configured, scanner falls back to protocol oracle. #[serde(default)] pub chainlink: HashMap>, + /// Prometheus exporter configuration. Missing `[metrics]` block ⇒ + /// defaults from [`MetricsConfig::default`] (enabled, port 9091). + #[serde(default)] + pub metrics: MetricsConfig, +} + +/// Prometheus exporter configuration. +/// +/// `#[serde(deny_unknown_fields)]` makes typos in +/// `config/default.toml` a hard load-time error — a stray +/// `metrics.bindd = …` used to be silently ignored, leaving the +/// exporter on its default loopback bind while the operator +/// assumed the override took effect. `#[non_exhaustive]` reserves +/// room to add fields (e.g. TLS, scrape-path override) without a +/// breaking semver bump; external callers must construct via +/// [`MetricsConfig::default`] and mutate the fields they care +/// about rather than using a struct literal. +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +#[non_exhaustive] +pub struct MetricsConfig { + /// Start the exporter at bot startup. Set to `false` to run charon + /// with zero metrics overhead (e.g. one-shot debug runs). + #[serde(default = "default_metrics_enabled")] + pub enabled: bool, + /// Bind address for the `/metrics` HTTP listener. Defaults to + /// `127.0.0.1:9091` so the endpoint is unreachable from the public + /// internet on a bare VPS. Non-loopback binds must pair with a + /// `auth_token` (enforced by [`MetricsConfig::validate`]). + #[serde(default = "default_metrics_bind")] + pub bind: SocketAddr, + /// Shared secret expected on `Authorization: Bearer ` when + /// the exporter is reached over a non-loopback bind. The exporter + /// itself does not yet terminate auth — the token is enforced by + /// the reverse proxy (nginx, caddy, etc.) that sits in front of + /// `bind`. Holding the value in config makes the proxy + bot share + /// one source of truth. + #[serde(default)] + pub auth_token: Option, +} + +impl MetricsConfig { + /// Refuse to start when the exporter is bound to a non-loopback + /// address without an accompanying `auth_token`. Stops silent + /// deployment of an unauthenticated `/metrics` endpoint to any + /// caller with network reach — see tracking issues #213 #214. + pub fn validate(&self) -> Result<()> { + if !self.enabled { + return Ok(()); + } + let ip = self.bind.ip(); + if !ip.is_loopback() + && self + .auth_token + .as_deref() + .map(str::is_empty) + .unwrap_or(true) + { + return Err(ConfigError::Validation(format!( + "metrics.bind {} is non-loopback but metrics.auth_token is empty — \ + either bind to 127.0.0.1 (scrape via reverse proxy / VPN) or set \ + CHARON_METRICS_AUTH_TOKEN and front the exporter with a proxy that \ + enforces Authorization: Bearer on /metrics", + self.bind + ))); + } + Ok(()) + } +} + +impl Default for MetricsConfig { + fn default() -> Self { + Self { + enabled: default_metrics_enabled(), + bind: default_metrics_bind(), + auth_token: None, + } + } +} + +fn default_metrics_enabled() -> bool { + true +} + +fn default_metrics_bind() -> SocketAddr { + "127.0.0.1:9091" + .parse() + .expect("valid default metrics bind") } impl fmt::Debug for Config { @@ -414,6 +503,11 @@ impl Config { ))); } } + // Metrics exporter: loopback binds are always safe; non-loopback + // binds must carry a non-empty `auth_token` so an operator + // never leaks an unauthenticated `/metrics` endpoint onto the + // public internet (see issues #213 / #214 on feat/22). + self.metrics.validate()?; Ok(()) } } @@ -512,6 +606,83 @@ fn is_valid_env_name(s: &str) -> bool { chars.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_') } +#[cfg(test)] +mod metrics_tests { + //! Tests for `MetricsConfig::validate`, graft from feat/22. These + //! exercise the non-loopback/auth-token gate (#213 / #214) that + //! blocks deploy-time leaks of an unauthenticated `/metrics` + //! endpoint, and the `enabled = false` bypass. + + use super::*; + + /// Loopback bind is safe on its own — no auth token required, + /// because the endpoint is unreachable off-box. + #[test] + fn validate_allows_loopback_without_token() { + let cfg = MetricsConfig { + enabled: true, + bind: "127.0.0.1:9091".parse().expect("loopback parse"), + auth_token: None, + }; + cfg.validate().expect("loopback + no token must pass"); + + let cfg_v6 = MetricsConfig { + enabled: true, + bind: "[::1]:9091".parse().expect("loopback v6 parse"), + auth_token: None, + }; + cfg_v6.validate().expect("IPv6 loopback must pass"); + } + + /// Non-loopback bind with a non-empty token is the documented + /// "front with a reverse proxy" escape hatch. + #[test] + fn validate_allows_non_loopback_with_token() { + let cfg = MetricsConfig { + enabled: true, + bind: "0.0.0.0:9091".parse().expect("non-loopback parse"), + auth_token: Some("not-a-real-token".into()), + }; + cfg.validate() + .expect("non-loopback + token must pass (proxy enforces)"); + } + + /// Non-loopback with missing or empty token must fail — covers + /// both `auth_token = None` (unset in TOML) and `auth_token = + /// Some("")` (the nasty case where `CHARON_METRICS_AUTH_TOKEN=` + /// is exported empty and env substitution silently yields a + /// blank string). This is the regression gate for #213/#214. + #[test] + fn validate_rejects_non_loopback_without_token() { + let none_cfg = MetricsConfig { + enabled: true, + bind: "0.0.0.0:9091".parse().expect("non-loopback parse"), + auth_token: None, + }; + assert!(none_cfg.validate().is_err(), "None token must fail"); + + let empty_cfg = MetricsConfig { + enabled: true, + bind: "0.0.0.0:9091".parse().expect("non-loopback parse"), + auth_token: Some(String::new()), + }; + assert!(empty_cfg.validate().is_err(), "empty token must fail"); + } + + /// `enabled = false` bypasses validation: a disabled exporter + /// never opens a socket, so bind/token combinations are moot. + #[test] + fn validate_skipped_when_disabled() { + let cfg = MetricsConfig { + enabled: false, + bind: "0.0.0.0:9091".parse().expect("non-loopback parse"), + auth_token: None, + }; + cfg.validate() + .expect("disabled exporter must skip bind checks"); + } +} + #[cfg(test)] mod private_rpc_tests { //! Tests for the private-RPC gate and secret redaction on @@ -554,6 +725,7 @@ mod private_rpc_tests { flashloan: HashMap::new(), liquidator: HashMap::new(), chainlink: HashMap::new(), + metrics: MetricsConfig::default(), } } diff --git a/crates/charon-core/src/lib.rs b/crates/charon-core/src/lib.rs index 39332e4..308eaff 100644 --- a/crates/charon-core/src/lib.rs +++ b/crates/charon-core/src/lib.rs @@ -7,7 +7,7 @@ pub mod queue; pub mod traits; pub mod types; -pub use config::{Config, ConfigError}; +pub use config::{Config, ConfigError, MetricsConfig}; pub use flashloan::{FlashLoanError, FlashLoanProvider, FlashLoanQuote}; pub use profit::{NetProfit, Price, ProfitError, ProfitInputs, calculate_profit}; pub use queue::{DEFAULT_TTL_BLOCKS, OpportunityQueue, QueueEntry}; diff --git a/crates/charon-executor/Cargo.toml b/crates/charon-executor/Cargo.toml index 37cebe0..c88bed7 100644 --- a/crates/charon-executor/Cargo.toml +++ b/crates/charon-executor/Cargo.toml @@ -7,6 +7,7 @@ description = "Transaction builder, simulator, and broadcaster for Charon" [dependencies] charon-core = { workspace = true } +charon-metrics = { workspace = true } alloy = { workspace = true } anyhow = { workspace = true } thiserror = { workspace = true } diff --git a/crates/charon-executor/src/gas.rs b/crates/charon-executor/src/gas.rs index 3557605..2e3cd0d 100644 --- a/crates/charon-executor/src/gas.rs +++ b/crates/charon-executor/src/gas.rs @@ -118,6 +118,12 @@ struct CacheEntry { /// [`ChainConfig::priority_fee_gwei`]: charon_core::config::ChainConfig::priority_fee_gwei #[derive(Debug)] pub struct GasOracle { + /// Short chain name used as the `chain` label on every gas + /// metric emitted from this oracle (#301). Empty string means + /// "unlabelled" — unit tests use `new` which defaults to empty; + /// production call sites use `new_for_chain` so dashboards can + /// pivot on chain. + chain: String, /// Drop the tx if `max_fee_per_gas` exceeds this (wei). max_gas_wei: U256, /// EIP-1559 priority fee (gwei). Scaled to wei on every call. @@ -135,10 +141,27 @@ impl GasOracle { /// [`ChainConfig::priority_fee_gwei`]) and gets converted to wei /// inside `fetch_params`. /// + /// Gas-oracle metrics emitted by this constructor carry an empty + /// `chain` label — use [`GasOracle::new_for_chain`] in + /// production so every sample is tagged with the chain key. + /// /// [`BotConfig::max_gas_wei`]: charon_core::config::BotConfig::max_gas_wei /// [`ChainConfig::priority_fee_gwei`]: charon_core::config::ChainConfig::priority_fee_gwei pub fn new(max_gas_wei: U256, priority_fee_gwei: u64) -> Self { + Self::new_for_chain(String::new(), max_gas_wei, priority_fee_gwei) + } + + /// Construct a new oracle labelled with a `chain` short name + /// (e.g. `"bnb"`). Every metric emitted from this oracle carries + /// that label so scanner and executor panels in Grafana can pivot + /// on the same `chain` dimension. + pub fn new_for_chain( + chain: impl Into, + max_gas_wei: U256, + priority_fee_gwei: u64, + ) -> Self { Self { + chain: chain.into(), max_gas_wei, priority_fee_gwei, cache: Mutex::new(None), @@ -201,10 +224,16 @@ impl GasOracle { } }; + // Gauge: latest base fee observed. Emitted on every + // successful read regardless of ceiling outcome so dashboards + // track base-fee trend even during a sustained skip spell. + charon_metrics::set_gas_base_fee_wei(&self.chain, base_fee); + // Priority fee: gwei → wei, checked. let priority_fee_wei = u128::from(self.priority_fee_gwei) .checked_mul(ONE_GWEI) .ok_or(GasError::Overflow)?; + charon_metrics::set_gas_priority_fee_wei(&self.chain, priority_fee_wei); // Canonical EIP-1559 headroom: max_fee = 2 * base + priority. let max_fee = base_fee @@ -217,6 +246,10 @@ impl GasOracle { // `max_fee` into U256 is lossless. let max_fee_u256 = U256::from(max_fee); let decision = if max_fee_u256 > self.max_gas_wei { + charon_metrics::record_gas_ceiling_skip( + &self.chain, + charon_metrics::gas_skip_reason::CEILING, + ); warn!( max_fee_wei = %max_fee_u256, ceiling_wei = %self.max_gas_wei, @@ -231,6 +264,11 @@ impl GasOracle { max_fee_per_gas: max_fee, max_priority_fee_per_gas: priority_fee_wei, }; + // Gauge: the resolved maxFeePerGas actually used when a + // tx goes out. Only emitted on the Proceed branch so the + // "last max_fee_wei before a ceiling skip" reading stays + // stable across the skip window. + charon_metrics::set_gas_max_fee_wei(&self.chain, max_fee); debug!( base_fee_gwei = base_fee / ONE_GWEI, max_fee_gwei = params.max_fee_per_gas / ONE_GWEI, diff --git a/crates/charon-executor/src/simulation.rs b/crates/charon-executor/src/simulation.rs index 49fb1d9..b5f1df1 100644 --- a/crates/charon-executor/src/simulation.rs +++ b/crates/charon-executor/src/simulation.rs @@ -118,7 +118,22 @@ impl Simulator { .with_input(calldata) .with_gas_limit(gas_limit); - match provider.call(&req).await { + // `time_rpc` owns the latency histogram sample; the error + // branch below classifies rejections/timeouts separately so + // Grafana can pivot on `error_kind`. The simulator talks to + // the same chain RPC the scanner uses (no private submission + // relay) so the endpoint is `public`. `provider.call(..)` in + // alloy 0.8 returns an `EthCall` builder that is + // `IntoFuture`, not a `Future`; wrap the await in an async + // block so `time_rpc` sees a real `Future`. + let outcome = charon_metrics::time_rpc( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::endpoint_kind::PUBLIC, + async { provider.call(&req).await }, + ) + .await; + + match outcome { Ok(out) => { debug!( sender = %self.sender, @@ -148,13 +163,26 @@ impl Simulator { // Distinguish a true revert (node replied with error // data) from a transport-level failure. `alloy` // surfaces both on the same error arm, so the - // presence of returndata is the discriminator. + // presence of returndata is the discriminator. The + // RPC error counter is labelled `rejected` on a + // deterministic node-side rejection vs + // `connection_lost` on a transport blip — dashboards + // pivot on that label to separate "upstream unstable" + // from "our calldata keeps reverting". if revert.is_some() { + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::rpc_error::REJECTED, + ); Err(SimulationError::Reverted { selector_hex, data_hex, }) } else { + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::rpc_error::CONNECTION_LOST, + ); Err(SimulationError::Provider(err)) } } diff --git a/crates/charon-executor/src/submit.rs b/crates/charon-executor/src/submit.rs index 253d1b9..1dc554a 100644 --- a/crates/charon-executor/src/submit.rs +++ b/crates/charon-executor/src/submit.rs @@ -215,7 +215,27 @@ impl Submitter { } }; - match tokio::time::timeout(self.timeout, fut).await { + // Wrap the provider send in `time_rpc` so the RPC-latency + // histogram owns the sample regardless of outcome. Successes + // and provider-side rejections both land a duration; the + // hard timeout branch skips the histogram sample by + // construction (its duration would be ~self.timeout and + // carries no extra signal — `charon_rpc_errors_total{ + // error_kind="timeout"}` is the canonical surface for that + // case). `endpoint_kind::PRIVATE` because the submitter only + // ever posts to the per-chain `private_rpc_url`; the scanner + // owns public reads. + let timed = tokio::time::timeout( + self.timeout, + charon_metrics::time_rpc( + charon_metrics::rpc_method::ETH_SEND_RAW_TRANSACTION, + charon_metrics::endpoint_kind::PRIVATE, + fut, + ), + ) + .await; + + match timed { Ok(Ok(hash)) => { info!(endpoint = %self.endpoint_label, %hash, "tx submitted"); Ok(hash) @@ -226,7 +246,21 @@ impl Submitter { error = %err, "submit rejected by RPC" ); - Err(classify_transport_error(err)) + // Tag the error counter with the bucket produced by + // the full JSON-RPC / transport classifier so + // Grafana pivots on the exact same `rejected` vs + // `connection_lost` split that drives the typed + // `SubmitError`. + let classified = classify_transport_error(err); + let kind = match &classified { + SubmitError::ConnectionLost(_) => charon_metrics::rpc_error::CONNECTION_LOST, + _ => charon_metrics::rpc_error::REJECTED, + }; + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_SEND_RAW_TRANSACTION, + kind, + ); + Err(classified) } Err(_) => { warn!( @@ -234,6 +268,10 @@ impl Submitter { timeout_secs = self.timeout.as_secs(), "submit timed out" ); + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_SEND_RAW_TRANSACTION, + charon_metrics::rpc_error::TIMEOUT, + ); Err(SubmitError::Timeout(self.timeout)) } } diff --git a/crates/charon-metrics/Cargo.toml b/crates/charon-metrics/Cargo.toml new file mode 100644 index 0000000..ecd2309 --- /dev/null +++ b/crates/charon-metrics/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "charon-metrics" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Prometheus-compatible metrics exporter and metric constants for Charon" + +[dependencies] +anyhow = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } +thiserror = { workspace = true } + +[lints] +workspace = true diff --git a/crates/charon-metrics/src/lib.rs b/crates/charon-metrics/src/lib.rs new file mode 100644 index 0000000..b21ab8c --- /dev/null +++ b/crates/charon-metrics/src/lib.rs @@ -0,0 +1,859 @@ +//! Prometheus-compatible metrics surface for Charon. +//! +//! The exporter listens on a configurable `SocketAddr` (default +//! `127.0.0.1:9091`, loopback-only; see `MetricsConfig` in +//! `charon-core` for the validation rules that block non-loopback +//! binds without a shared auth token) and serves a `/metrics` +//! endpoint in the Prometheus text format. All metric names are kept +//! as `const &str` constants in [`names`] so call sites and dashboard +//! JSON stay in lock-step with a single source of truth. +//! +//! ```no_run +//! use charon_metrics::{init, names, record_block_scanned}; +//! # async fn demo() -> anyhow::Result<()> { +//! init("127.0.0.1:9091".parse()?).await?; +//! record_block_scanned("bnb"); +//! # Ok(()) +//! # } +//! ``` + +use std::future::Future; +use std::net::SocketAddr; +use std::sync::OnceLock; +use std::time::Instant; + +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +use metrics_exporter_prometheus::{ + BuildError as PromBuildError, ExporterFuture, Matcher, PrometheusBuilder, +}; +use thiserror::Error; +use tracing::info; + +/// Tracks whether the global Prometheus recorder has already been +/// installed in this process. `metrics_exporter_prometheus` calls +/// `metrics::set_global_recorder` under the hood, and that call +/// panics on a second successful install. Gating [`init`] behind +/// this `OnceLock` turns a second invocation into a silent no-op +/// so repeated calls from tests (or a future restart path) do not +/// tear the process down. +static INIT: OnceLock<()> = OnceLock::new(); + +/// Errors returned from [`init`]. Exposed as a `#[non_exhaustive]` +/// enum so `charon-cli` can distinguish bind failures (port collision +/// is retryable) from recorder-install failures (caller must abort — +/// the global recorder can only be set once) without matching on +/// `Display` strings. New variants may be added without a breaking +/// semver bump. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum MetricsError { + /// Failed to register custom histogram buckets for a specific + /// metric. Carries the metric name so logs pinpoint the offender. + #[error("failed to register buckets for {metric}: {source}")] + BucketConfig { + metric: &'static str, + #[source] + source: PromBuildError, + }, + /// Installing the global Prometheus recorder failed. Typically a + /// port collision on `bind` or an exporter-build error. The + /// underlying `BuildError` preserves the original diagnosis. + #[error("failed to install Prometheus exporter on {bind}: {source}")] + InstallFailed { + bind: SocketAddr, + #[source] + source: PromBuildError, + }, + /// The `metrics` global recorder was already installed by some + /// other crate in the same process. Distinct from + /// [`MetricsError::InstallFailed`] because the fix is + /// different: exporter-build errors retry; a foreign recorder + /// has to be removed from the dep graph entirely. The + /// idempotency gate on [`install`] short-circuits our own + /// second install, so reaching this variant means a third + /// party got there first. + #[error("failed to set global recorder for {bind}: {reason}")] + RecorderInstall { bind: SocketAddr, reason: String }, +} + +/// Convenience alias so helpers and call sites share one return shape. +pub type Result = std::result::Result; + +// Bucket boundaries for `charon_pipeline_block_duration_seconds`. +// BSC produces a block every ~3s; resolution is packed around that +// threshold so p50/p95 quantiles stay meaningful instead of piling +// into `+Inf` with the exporter's default HTTP-latency buckets. +const BLOCK_DURATION_SECONDS_BUCKETS: &[f64] = &[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0]; + +// Bucket boundaries for `charon_executor_profit_usd_cents`. +// Realistic Venus liquidation profit spans ~$0.05 dust to ~$10k +// windfalls; buckets are in cents (5 → 1_000_000) so histogram_quantile +// returns finite values across that range. +const PROFIT_USD_CENTS_BUCKETS: &[f64] = &[ + 5.0, + 50.0, + 500.0, + 2_500.0, + 10_000.0, + 50_000.0, + 250_000.0, + 1_000_000.0, +]; + +// Bucket boundaries for `charon_rpc_call_duration_seconds`. +// Spans sub-millisecond LAN-local responses up to the 30 s +// provider-level timeout ceiling so call-site timeouts still +// surface as a finite bucket instead of `+Inf`. Lower bound of +// 1 ms sits just above the jitter floor of a tokio timer on a +// warm runtime — any sample finer than that is noise rather than +// a signal about upstream latency. Upper bound of 30 s matches +// the hard deadline used by submit/simulate call sites: a call +// that outruns 30 s has timed out regardless, so anything bigger +// would only widen `+Inf` overflow without adding resolution. +// Logarithmic spacing keeps p50/p95/p99 resolution meaningful +// across four decades. +const RPC_CALL_DURATION_SECONDS_BUCKETS: &[f64] = &[ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, +]; + +/// Single-source-of-truth metric names. Kept as constants so call +/// sites, dashboard JSON, and alert rules refer to the same strings. +pub mod names { + // Scanner + pub const SCANNER_BLOCKS_TOTAL: &str = "charon_scanner_blocks_total"; + pub const SCANNER_POSITIONS: &str = "charon_scanner_positions"; + + // Pipeline + pub const PIPELINE_BLOCK_DURATION_SECONDS: &str = "charon_pipeline_block_duration_seconds"; + + // Executor + pub const EXECUTOR_SIMULATIONS_TOTAL: &str = "charon_executor_simulations_total"; + pub const EXECUTOR_OPPS_QUEUED_TOTAL: &str = "charon_executor_opportunities_queued_total"; + pub const EXECUTOR_OPPS_DROPPED_TOTAL: &str = "charon_executor_opportunities_dropped_total"; + pub const EXECUTOR_PROFIT_USD_CENTS: &str = "charon_executor_profit_usd_cents"; + pub const EXECUTOR_QUEUE_DEPTH: &str = "charon_executor_queue_depth"; + + // Mempool monitor (issue #300). Counts pending oracle updates + // observed in the mempool, oracle updates drained at block + // boundaries, and upstream websocket reconnect attempts — the + // third doubles as a health signal for flaky pubsub upstreams. + pub const MEMPOOL_PENDING_ORACLE_UPDATES: &str = "charon_mempool_pending_oracle_updates"; + pub const MEMPOOL_DRAINED_TOTAL: &str = "charon_mempool_drained_total"; + pub const MEMPOOL_WS_RECONNECTS_TOTAL: &str = "charon_mempool_websocket_reconnects_total"; + + // Gas oracle (issue #301). Latest EIP-1559 base fee, priority + // fee used on the last submission attempt, and resulting + // maxFeePerGas — plus a counter for opportunities dropped + // because `max_fee_per_gas` exceeded the configured ceiling. + pub const GAS_BASE_FEE_WEI: &str = "charon_gas_base_fee_wei"; + pub const GAS_PRIORITY_FEE_WEI: &str = "charon_gas_priority_fee_wei"; + pub const GAS_MAX_FEE_WEI: &str = "charon_gas_max_fee_wei"; + pub const GAS_CEILING_SKIPS_TOTAL: &str = "charon_gas_ceiling_skips_total"; + + // RPC instrumentation (issue #302). Histogram of call durations + // by method + endpoint kind, error counters partitioned by + // failure mode, and a reconnect counter so upstream transport + // churn is observable without log grepping. + pub const RPC_CALL_DURATION_SECONDS: &str = "charon_rpc_call_duration_seconds"; + pub const RPC_ERRORS_TOTAL: &str = "charon_rpc_errors_total"; + pub const RPC_RECONNECTS_TOTAL: &str = "charon_rpc_connection_reconnects_total"; + + // Build / runtime + pub const BUILD_INFO: &str = "charon_build_info"; +} + +/// Position classification bucket used as the `bucket` label on +/// `charon_scanner_positions`. +pub mod bucket { + pub const HEALTHY: &str = "healthy"; + pub const NEAR_LIQ: &str = "near_liq"; + pub const LIQUIDATABLE: &str = "liquidatable"; +} + +/// Simulation outcome used as the `result` label on +/// `charon_executor_simulations_total`. +pub mod sim_result { + pub const OK: &str = "ok"; + pub const REVERT: &str = "revert"; + pub const ERROR: &str = "error"; +} + +/// Drop-stage label on `charon_executor_opportunities_dropped_total`. +pub mod drop_stage { + pub const ROUTER: &str = "router"; + pub const PROFIT: &str = "profit"; + pub const SIMULATION: &str = "simulation"; + pub const BUILD: &str = "build"; +} + +/// Endpoint-kind label used on every RPC metric (issue #302). +/// "public" covers node providers on the open internet (Alchemy, +/// Infura, a self-hosted archive). "private" covers order-flow +/// aware submission paths (bloxroute, blocknative, sequencer +/// endpoints on L2s) whose latency profile and failure modes +/// differ sharply from public reads. Splitting on this label is +/// how operators see "my private relay is degrading" without +/// having to cross-reference logs. +pub mod endpoint_kind { + pub const PUBLIC: &str = "public"; + pub const PRIVATE: &str = "private"; +} + +/// RPC method label on `charon_rpc_call_duration_seconds` and +/// `charon_rpc_errors_total`. Only the methods the bot actually +/// calls are listed — new methods should be added here as call +/// sites adopt the [`time_rpc`] wrapper. Freeform methods can be +/// passed as `&str` too; the constants exist so dashboards and +/// alert rules can reference the same string a call site uses. +pub mod rpc_method { + pub const ETH_CALL: &str = "eth_call"; + pub const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber"; + pub const ETH_SEND_RAW_TRANSACTION: &str = "eth_sendRawTransaction"; + pub const ETH_GET_LOGS: &str = "eth_getLogs"; + pub const ETH_GET_BLOCK_NUMBER: &str = "eth_blockNumber"; + pub const ETH_ESTIMATE_GAS: &str = "eth_estimateGas"; + pub const ETH_GET_TRANSACTION_BY_HASH: &str = "eth_getTransactionByHash"; + pub const ETH_SUBSCRIBE_NEW_HEADS: &str = "eth_subscribe_newHeads"; + pub const ETH_SUBSCRIBE_PENDING_TX: &str = "eth_subscribe_newPendingTransactions"; +} + +/// Failure mode label on `charon_rpc_errors_total`. Kept as a +/// closed three-way enum so alert rules can pivot on `error_kind` +/// without fuzzy matching on log strings. Call sites classify +/// their own errors into one of these before recording — the +/// mapping from `alloy` / `anyhow` errors to a kind is a +/// per-call-site judgement (a `tokio::time::timeout` firing is +/// [`TIMEOUT`], an RPC-level rejection is [`REJECTED`], an +/// `io::Error` or dropped subscription stream is +/// [`CONNECTION_LOST`]). +pub mod rpc_error { + pub const TIMEOUT: &str = "timeout"; + pub const REJECTED: &str = "rejected"; + pub const CONNECTION_LOST: &str = "connection_lost"; +} + +/// Reason label on `charon_gas_ceiling_skips_total`. `CEILING` +/// is the only reason the current gas oracle emits — the label +/// exists so future reasons (e.g. `BASE_FEE_SPIKE`, +/// `PRIORITY_FEE_MISSING`) can be added without reshaping the +/// metric. +pub mod gas_skip_reason { + pub const CEILING: &str = "ceiling"; +} + +/// Install the global Prometheus recorder and start the HTTP listener. +/// +/// Idempotent: the first successful call installs the recorder and +/// spawns the `/metrics` listener, subsequent calls log and return +/// `Ok(())` without touching the global recorder. This guards against +/// double-install panics in `metrics::set_global_recorder`, which +/// would otherwise take the bot down on an accidental retry. The +/// exporter task runs for the lifetime of the tokio runtime — no +/// handle is returned because it never needs to be stopped in-process. +pub async fn init(bind: SocketAddr) -> Result<()> { + // Fire-and-forget variant: install the recorder and spawn the + // listener future onto the current tokio runtime. The returned + // JoinHandle is intentionally discarded here — this path is + // meant for tests and for code paths that do not have a + // JoinSet supervisor. Production call sites should prefer + // [`install`] so the exporter task can be supervised together + // with the bot's other long-running tasks (see #222). + match install(bind)? { + Some(fut) => { + tokio::spawn(async move { + if let Err(err) = fut.await { + tracing::error!(error = ?err, "metrics exporter task terminated"); + } + }); + } + None => { + // Recorder already installed; nothing to drive. + } + } + Ok(()) +} + +/// Install the global Prometheus recorder and return the +/// [`ExporterFuture`] that drives the `/metrics` HTTP listener. +/// +/// The returned future must be polled for the exporter to accept +/// scrapes — production code pushes it into the same `JoinSet` +/// that supervises block listeners so a panic in the exporter +/// triggers the same controlled-shutdown path (#222). Tests that +/// do not care about supervision should call [`init`] instead. +/// +/// Returns `Ok(None)` on the second and later calls in the same +/// process, because the global recorder can only be installed +/// once — a second `install()` would panic inside +/// `metrics::set_global_recorder`, see #223. Callers that got +/// `None` must skip supervising a listener future; the prior +/// install still owns the HTTP socket. +pub fn install(bind: SocketAddr) -> Result> { + // Idempotency gate — short-circuit before we touch the + // PrometheusBuilder. `INIT` is checked again after the + // successful build to close the narrow race where two + // concurrent callers both observe `None` here. + if INIT.get().is_some() { + info!(bind = %bind, "metrics exporter already initialized; skipping re-install"); + return Ok(None); + } + + let (recorder, exporter) = PrometheusBuilder::new() + .with_http_listener(bind) + .set_buckets_for_metric( + Matcher::Full(names::PIPELINE_BLOCK_DURATION_SECONDS.to_string()), + BLOCK_DURATION_SECONDS_BUCKETS, + ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::PIPELINE_BLOCK_DURATION_SECONDS, + source, + })? + .set_buckets_for_metric( + Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), + PROFIT_USD_CENTS_BUCKETS, + ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::EXECUTOR_PROFIT_USD_CENTS, + source, + })? + .set_buckets_for_metric( + Matcher::Full(names::RPC_CALL_DURATION_SECONDS.to_string()), + RPC_CALL_DURATION_SECONDS_BUCKETS, + ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::RPC_CALL_DURATION_SECONDS, + source, + })? + .build() + .map_err(|source| MetricsError::InstallFailed { bind, source })?; + + // Close the race: if another caller beat us to INIT, drop + // the recorder and exporter we just built and report no-op. + // `set_global_recorder` below would otherwise panic on + // double-install. + if INIT.set(()).is_err() { + info!(bind = %bind, "metrics exporter lost init race; discarding fresh build"); + return Ok(None); + } + + // `set_global_recorder` fails only if a recorder is already + // installed in the process. We check `INIT` above, so the + // only way to reach a real failure here is a third-party + // crate having already installed a `metrics` recorder. + metrics::set_global_recorder(recorder).map_err(|err| MetricsError::RecorderInstall { + bind, + reason: err.to_string(), + })?; + + describe_all(); + + info!(bind = %bind, path = "/metrics", "metrics exporter listening"); + Ok(Some(exporter)) +} + +/// Emit Prometheus `# HELP` + `# TYPE` descriptors for every metric +/// Charon exposes. Called once from [`init`] so the exporter's first +/// scrape surfaces human-readable help text even before any counter +/// has been incremented. +fn describe_all() { + describe_counter!( + names::SCANNER_BLOCKS_TOTAL, + "Total blocks drained from chain listeners." + ); + describe_gauge!( + names::SCANNER_POSITIONS, + "Currently tracked positions bucketed by health classification." + ); + describe_histogram!( + names::PIPELINE_BLOCK_DURATION_SECONDS, + metrics::Unit::Seconds, + "Wall-clock duration of one full per-block pipeline pass." + ); + describe_counter!( + names::EXECUTOR_SIMULATIONS_TOTAL, + "Simulations attempted via `eth_call`, partitioned by outcome." + ); + describe_counter!( + names::EXECUTOR_OPPS_QUEUED_TOTAL, + "Liquidation opportunities that landed in the queue, labelled `simulated=true|false` to distinguish sim-gated entries from dry-run pushes (BOT_SIGNER_KEY unset)." + ); + describe_counter!( + names::EXECUTOR_OPPS_DROPPED_TOTAL, + "Liquidation opportunities dropped before reaching the queue, partitioned by stage." + ); + describe_histogram!( + names::EXECUTOR_PROFIT_USD_CENTS, + "Per-opportunity net profit in USD cents (post profit gate)." + ); + describe_gauge!( + names::EXECUTOR_QUEUE_DEPTH, + "Current depth of the profit-ordered opportunity queue." + ); + describe_gauge!( + names::MEMPOOL_PENDING_ORACLE_UPDATES, + "Pending Venus oracle updates currently observed in the mempool (pre-signed liquidations armed)." + ); + describe_counter!( + names::MEMPOOL_DRAINED_TOTAL, + "Pre-signed liquidations drained from the mempool cache at block confirmation, partitioned by chain." + ); + describe_counter!( + names::MEMPOOL_WS_RECONNECTS_TOTAL, + "Reconnect attempts against the pending-transactions websocket subscription (flaky-upstream signal)." + ); + describe_gauge!( + names::GAS_BASE_FEE_WEI, + "Latest EIP-1559 `baseFeePerGas` observed for the chain, in wei." + ); + describe_gauge!( + names::GAS_PRIORITY_FEE_WEI, + "Priority fee (tip) used on the most recent gas-params resolution, in wei." + ); + describe_gauge!( + names::GAS_MAX_FEE_WEI, + "`maxFeePerGas` used on the most recent submission attempt, in wei." + ); + describe_counter!( + names::GAS_CEILING_SKIPS_TOTAL, + "Opportunities skipped because the resolved `max_fee_per_gas` exceeded the configured ceiling." + ); + describe_histogram!( + names::RPC_CALL_DURATION_SECONDS, + metrics::Unit::Seconds, + "Wall-clock duration of one RPC call, partitioned by method and endpoint kind (public vs private)." + ); + describe_counter!( + names::RPC_ERRORS_TOTAL, + "RPC call failures partitioned by method and error kind (timeout, rejected, connection_lost)." + ); + describe_counter!( + names::RPC_RECONNECTS_TOTAL, + "Reconnect attempts against an RPC transport (websocket or HTTP keep-alive), partitioned by endpoint kind." + ); + describe_gauge!( + names::BUILD_INFO, + "Build metadata as labels; value is always 1." + ); +} + +// ─── Typed helpers (thin wrappers so call sites stay terse) ─────────── + +/// Increment the per-chain blocks-scanned counter. +pub fn record_block_scanned(chain: &str) { + counter!(names::SCANNER_BLOCKS_TOTAL, "chain" => chain.to_owned()).increment(1); +} + +/// Set the gauge for one health bucket on one chain. +pub fn set_position_bucket(chain: &str, bucket: &str, count: u64) { + gauge!(names::SCANNER_POSITIONS, "chain" => chain.to_owned(), "bucket" => bucket.to_owned()) + .set(count as f64); +} + +/// Observe the wall-clock duration of one pipeline pass. +pub fn observe_block_duration(chain: &str, seconds: f64) { + histogram!(names::PIPELINE_BLOCK_DURATION_SECONDS, "chain" => chain.to_owned()).record(seconds); +} + +/// Record one simulation outcome. +pub fn record_simulation(chain: &str, result: &str) { + counter!( + names::EXECUTOR_SIMULATIONS_TOTAL, + "chain" => chain.to_owned(), + "result" => result.to_owned() + ) + .increment(1); +} + +/// Record one opportunity that made it into the queue. +/// +/// `simulated` distinguishes entries that cleared the `eth_call` +/// simulation gate from entries enqueued without simulation (dry-run +/// mode when `BOT_SIGNER_KEY` is unset). Splitting on this label keeps +/// the gate bypass observable from dashboards instead of letting +/// unsimulated pushes masquerade as healthy throughput. +pub fn record_opportunity_queued(chain: &str, profit_usd_cents: u64, simulated: bool) { + counter!( + names::EXECUTOR_OPPS_QUEUED_TOTAL, + "chain" => chain.to_owned(), + "simulated" => if simulated { "true" } else { "false" }.to_owned(), + ) + .increment(1); + histogram!(names::EXECUTOR_PROFIT_USD_CENTS, "chain" => chain.to_owned()) + .record(profit_usd_cents as f64); +} + +/// Record one opportunity that was dropped before reaching the queue. +pub fn record_opportunity_dropped(chain: &str, stage: &str) { + counter!( + names::EXECUTOR_OPPS_DROPPED_TOTAL, + "chain" => chain.to_owned(), + "stage" => stage.to_owned() + ) + .increment(1); +} + +/// Update the queue-depth gauge. +pub fn set_queue_depth(depth: u64) { + gauge!(names::EXECUTOR_QUEUE_DEPTH).set(depth as f64); +} + +/// Emit build metadata once at startup. The metric value is always 1; +/// labels carry the interesting bits so dashboards can `group_by`. +pub fn set_build_info(version: &str, git_sha: &str) { + gauge!( + names::BUILD_INFO, + "version" => version.to_owned(), + "git_sha" => git_sha.to_owned() + ) + .set(1.0); +} + +// ─── Mempool helpers (issue #300) ───────────────────────────────────── + +/// Set the gauge of pending oracle updates the mempool monitor is +/// currently tracking. Called on insert/drain so the dashboard value +/// tracks the live cache size rather than a stale counter. Gauge (not +/// counter) because the quantity is "how many right now", which must +/// fall back to zero between blocks. +pub fn set_mempool_pending_oracle_updates(chain: &str, count: u64) { + gauge!( + names::MEMPOOL_PENDING_ORACLE_UPDATES, + "chain" => chain.to_owned() + ) + .set(count as f64); +} + +/// Record `drained` pre-signed liquidations drained from the mempool +/// cache at a block boundary. Zero-valued drains are a legitimate +/// signal (nothing to do this block) so the call site records +/// unconditionally — Prometheus handles zero-delta increments. +pub fn record_mempool_drained(chain: &str, drained: u64) { + counter!( + names::MEMPOOL_DRAINED_TOTAL, + "chain" => chain.to_owned() + ) + .increment(drained); +} + +/// Record one websocket reconnect attempt against the pending-tx +/// subscription. Emitted every time the monitor loop falls through +/// to its backoff branch — a high rate here is the operator's cue +/// that the upstream pubsub endpoint is flaky. +pub fn record_mempool_ws_reconnect(chain: &str) { + counter!( + names::MEMPOOL_WS_RECONNECTS_TOTAL, + "chain" => chain.to_owned() + ) + .increment(1); +} + +// ─── Gas oracle helpers (issue #301) ────────────────────────────────── + +/// Set the latest observed `baseFeePerGas` for the chain, in wei. +/// Values are passed as `u128` to survive the 1559 full range without +/// pre-truncation; the gauge is cast to `f64` at emission time, same +/// as every other Prometheus gauge — sub-wei precision is never +/// actionable for ops. +pub fn set_gas_base_fee_wei(chain: &str, wei: u128) { + gauge!( + names::GAS_BASE_FEE_WEI, + "chain" => chain.to_owned() + ) + .set(wei as f64); +} + +/// Set the priority fee used on the last gas-params resolution, +/// in wei. +pub fn set_gas_priority_fee_wei(chain: &str, wei: u128) { + gauge!( + names::GAS_PRIORITY_FEE_WEI, + "chain" => chain.to_owned() + ) + .set(wei as f64); +} + +/// Set the `maxFeePerGas` used on the last submission attempt, +/// in wei. +pub fn set_gas_max_fee_wei(chain: &str, wei: u128) { + gauge!( + names::GAS_MAX_FEE_WEI, + "chain" => chain.to_owned() + ) + .set(wei as f64); +} + +/// Record one opportunity skipped by the gas oracle because the +/// resolved `max_fee_per_gas` exceeded the configured ceiling (or +/// any future skip reason added to [`gas_skip_reason`]). +pub fn record_gas_ceiling_skip(chain: &str, reason: &str) { + counter!( + names::GAS_CEILING_SKIPS_TOTAL, + "chain" => chain.to_owned(), + "reason" => reason.to_owned() + ) + .increment(1); +} + +// ─── RPC instrumentation (issue #302) ───────────────────────────────── + +/// Observe one completed RPC call's wall-clock duration. +/// +/// Most call sites should wrap their provider invocation in +/// [`time_rpc`] instead of calling this directly — the wrapper +/// handles `Instant::now()` and the label plumbing. Expose this +/// helper for call sites that already have a `Duration` in hand +/// (e.g. batched calls that track a single elapsed span across +/// multiple internal retries). +pub fn record_rpc_call(method: &str, endpoint_kind: &str, seconds: f64) { + histogram!( + names::RPC_CALL_DURATION_SECONDS, + "method" => method.to_owned(), + "endpoint_kind" => endpoint_kind.to_owned() + ) + .record(seconds); +} + +/// Record one RPC call failure. `error_kind` must be one of the +/// constants in [`rpc_error`]; freeform strings are accepted but +/// break dashboard pivots, so callers should funnel their errors +/// through a classifier. +pub fn record_rpc_error(method: &str, error_kind: &str) { + counter!( + names::RPC_ERRORS_TOTAL, + "method" => method.to_owned(), + "error_kind" => error_kind.to_owned() + ) + .increment(1); +} + +/// Record one reconnect attempt against an RPC transport. Emitted +/// by pubsub listeners (block listener, mempool monitor) every +/// time their outer reconnect loop fires — cumulative rate is the +/// operator's "upstream is unstable" signal. +pub fn record_rpc_reconnect(endpoint_kind: &str) { + counter!( + names::RPC_RECONNECTS_TOTAL, + "endpoint_kind" => endpoint_kind.to_owned() + ) + .increment(1); +} + +/// Wrap one RPC call, record its wall-clock duration into +/// [`names::RPC_CALL_DURATION_SECONDS`], and return the call's +/// own result untouched. +/// +/// This is the single preferred instrumentation pattern for adding +/// RPC latency / error visibility to a call site — prefer it over +/// sprinkling `record_rpc_call` / `record_rpc_error` directly. It +/// keeps the happy path a one-liner and guarantees the duration +/// sample always lands in the histogram, even on the error branch +/// (where latency-to-error is still useful context). Errors are +/// *not* auto-classified — the call site knows best whether an +/// `alloy` error is a [`rpc_error::TIMEOUT`] or a +/// [`rpc_error::REJECTED`]; pair this helper with one +/// [`record_rpc_error`] call on the error branch. +/// +/// Example: +/// ```no_run +/// # async fn demo() -> anyhow::Result<()> { +/// use charon_metrics::{endpoint_kind, rpc_error, rpc_method, record_rpc_error, time_rpc}; +/// # async fn eth_call() -> Result<(), anyhow::Error> { Ok(()) } +/// let result = time_rpc( +/// rpc_method::ETH_CALL, +/// endpoint_kind::PUBLIC, +/// eth_call(), +/// ) +/// .await; +/// if let Err(err) = &result { +/// record_rpc_error(rpc_method::ETH_CALL, rpc_error::REJECTED); +/// eprintln!("{err}"); +/// } +/// # Ok(()) +/// # } +/// ``` +/// +/// New call sites adopting RPC instrumentation should follow +/// this pattern. An alloy middleware/layer would be cleaner in +/// principle, but the `alloy` 0.8 `Provider` trait does not +/// expose a stable middleware hook at the method-name layer — a +/// per-call-site wrapper lets us carry the method label verbatim +/// without leaking through an intermediate `RequestPacket` whose +/// method string is internal-only. +pub async fn time_rpc(method: &str, endpoint_kind: &str, fut: F) -> T +where + F: Future, +{ + let start = Instant::now(); + let out = fut.await; + record_rpc_call(method, endpoint_kind, start.elapsed().as_secs_f64()); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, SocketAddrV4}; + use tokio::net::TcpStream; + use tokio::time::{Duration, sleep}; + + /// `MetricsError::BucketConfig` must render the offending metric + /// name in its `Display` string and expose the upstream + /// `PromBuildError` through `Error::source()` so operator tooling + /// can walk the chain. Reached via the real builder path (empty + /// bucket slice → `BuildError::EmptyBucketsOrQuantiles`) rather + /// than a hand-rolled variant, so the mapping in `init` stays + /// exercised end-to-end. + #[test] + fn bucket_config_error_display_and_source_chain() { + let err = PrometheusBuilder::new() + .set_buckets_for_metric( + Matcher::Full(names::EXECUTOR_PROFIT_USD_CENTS.to_string()), + &[], + ) + .map_err(|source| MetricsError::BucketConfig { + metric: names::EXECUTOR_PROFIT_USD_CENTS, + source, + }) + .expect_err("empty bucket slice must fail"); + + let rendered = format!("{err}"); + assert!( + rendered.contains(names::EXECUTOR_PROFIT_USD_CENTS), + "Display must name the offending metric, got {rendered:?}" + ); + assert!( + std::error::Error::source(&err).is_some(), + "BucketConfig must expose its PromBuildError as source()" + ); + } + + /// Covers two invariants at once because `INIT` is + /// process-wide and unit tests in the same binary share it: + /// + /// 1. The first call binds the `/metrics` HTTP listener so a + /// subsequent TCP connect succeeds — regression gate + /// against broken listener wiring on exporter bumps. + /// 2. Second and Nth calls return `Ok(())` without touching + /// the global recorder — `metrics-exporter-prometheus` + /// otherwise panics inside `set_global_recorder`, which + /// would take the bot down on what ought to be a harmless + /// retry (regression gate for #223). + /// + /// Folded into one test so unit-test ordering cannot leave + /// `INIT` in a pre-set state and silently skip the bind + /// assertion in a sibling test. + #[tokio::test] + async fn init_binds_listener_and_is_idempotent() { + // Port 0 asks the OS for an ephemeral port, avoiding + // collisions with any concurrent test run. Bind a probe + // socket, record the number, drop it, hand the port to + // the exporter. Races are technically possible but + // vanishingly rare on 127.0.0.1 and do not compromise + // correctness — the connect probe below would simply + // fail loudly. + let probe = std::net::TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) + .expect("probe bind"); + let port = probe + .local_addr() + .expect("probe socket must expose its bound local_addr") + .port(); + drop(probe); + + let bind = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); + init(bind).await.expect("first init should succeed"); + + // Yield so the listener's spawn binds before we probe. + sleep(Duration::from_millis(50)).await; + + TcpStream::connect(bind) + .await + .expect("listener should accept TCP connections"); + + // Re-invoke with a deliberately unusable bind. If the + // idempotency gate were missing, PrometheusBuilder would + // attempt a fresh install and panic inside + // `set_global_recorder`. We assert `Ok(())` and that the + // listener never moves off `bind`. + let bogus = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + init(bogus) + .await + .expect("second init must be a silent no-op"); + init(bogus).await.expect("third init must also be a no-op"); + } + + /// Typed helpers must not panic when called — this exercises every + /// label combination that call sites use so metric-name typos + /// surface at `cargo test` time, not in prod. + #[test] + fn typed_helpers_are_panic_free() { + record_block_scanned("bnb"); + set_position_bucket("bnb", bucket::HEALTHY, 7); + set_position_bucket("bnb", bucket::NEAR_LIQ, 2); + set_position_bucket("bnb", bucket::LIQUIDATABLE, 0); + observe_block_duration("bnb", 0.123); + record_simulation("bnb", sim_result::OK); + record_simulation("bnb", sim_result::REVERT); + record_simulation("bnb", sim_result::ERROR); + record_opportunity_queued("bnb", 1_234, true); + record_opportunity_queued("bnb", 9, false); + record_opportunity_dropped("bnb", drop_stage::ROUTER); + record_opportunity_dropped("bnb", drop_stage::PROFIT); + record_opportunity_dropped("bnb", drop_stage::SIMULATION); + record_opportunity_dropped("bnb", drop_stage::BUILD); + set_queue_depth(3); + set_build_info("0.1.0", "deadbeef"); + + // Mempool (#300) + set_mempool_pending_oracle_updates("bnb", 4); + record_mempool_drained("bnb", 3); + record_mempool_drained("bnb", 0); + record_mempool_ws_reconnect("bnb"); + + // Gas (#301) + set_gas_base_fee_wei("bnb", 3_000_000_000); + set_gas_priority_fee_wei("bnb", 1_000_000_000); + set_gas_max_fee_wei("bnb", 5_000_000_000); + record_gas_ceiling_skip("bnb", gas_skip_reason::CEILING); + + // RPC (#302) + record_rpc_call(rpc_method::ETH_CALL, endpoint_kind::PUBLIC, 0.012); + record_rpc_call( + rpc_method::ETH_SEND_RAW_TRANSACTION, + endpoint_kind::PRIVATE, + 0.045, + ); + record_rpc_error(rpc_method::ETH_CALL, rpc_error::TIMEOUT); + record_rpc_error(rpc_method::ETH_GET_LOGS, rpc_error::REJECTED); + record_rpc_error( + rpc_method::ETH_GET_BLOCK_BY_NUMBER, + rpc_error::CONNECTION_LOST, + ); + record_rpc_reconnect(endpoint_kind::PUBLIC); + record_rpc_reconnect(endpoint_kind::PRIVATE); + } + + /// `time_rpc` must record a non-zero elapsed sample into the + /// histogram and return the wrapped future's output unchanged. + /// Covers the ergonomic-wrapper contract: callers rely on it + /// being a drop-in around `await`. + #[tokio::test] + async fn time_rpc_returns_inner_output_and_records_duration() { + let out = time_rpc(rpc_method::ETH_CALL, endpoint_kind::PUBLIC, async { + tokio::task::yield_now().await; + 42u32 + }) + .await; + assert_eq!(out, 42); + + // Error case: the wrapper must not swallow errors from the + // inner future — callers chain `record_rpc_error` on the Err + // branch and would lose visibility otherwise. + let err: std::result::Result<(), &'static str> = time_rpc( + rpc_method::ETH_SEND_RAW_TRANSACTION, + endpoint_kind::PRIVATE, + async { Err("rpc down") }, + ) + .await; + assert_eq!(err, Err("rpc down")); + } +} diff --git a/crates/charon-metrics/tests/scrape.rs b/crates/charon-metrics/tests/scrape.rs new file mode 100644 index 0000000..3dfacc5 --- /dev/null +++ b/crates/charon-metrics/tests/scrape.rs @@ -0,0 +1,198 @@ +//! End-to-end scrape test for the Prometheus exporter. +//! +//! Lives in `tests/` rather than `src/lib.rs`'s `#[cfg(test)]` module +//! so it runs as its own integration-test binary with a fresh process +//! — the exporter installs a global recorder, and a second install in +//! the same process is a silent no-op (`charon-metrics` #223), so the +//! integration-test binary must not share process state with the unit +//! tests. +//! +//! Regression gate for #224: it was previously possible to ship a +//! broken exporter (metric name typo, missing `describe_*` call, +//! listener not bound) without any test catching it — this test +//! scrapes `/metrics` with a raw HTTP client and asserts the +//! Prometheus text-format response contains the expected helpers. + +use std::io::{Read, Write}; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpStream}; +use std::time::Duration; + +use charon_metrics::{init, names, record_block_scanned}; + +/// Fixed loopback port picked to avoid collision with the default +/// exporter port (`9091`) and with common dev services. If the port +/// is genuinely in use on a contributor's box, the test fails loudly +/// with a bind error rather than silently passing — acceptable +/// tradeoff for not plumbing the bound addr out of the exporter lib. +const TEST_PORT: u16 = 19_091; + +/// Scrape the exporter after a counter has been incremented and +/// verify the Prometheus text-format body contains both `# HELP` +/// metadata and the metric line. +/// +/// We deliberately avoid pulling `reqwest` just to fetch one URL — +/// the text format is plain HTTP/1.1 so a raw-TCP request keeps the +/// dev-dep surface small and sidesteps TLS/async runtime questions. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn scrape_returns_valid_prometheus_text() { + let bind = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TEST_PORT)); + + init(bind).await.expect("exporter init must succeed"); + + // Small yield so the listener's spawn has a chance to bind + // before we connect. Without this, a fast test machine can race + // the exporter's `tokio::spawn` and see `connection refused`. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Touch a counter and a gauge so the snapshot is non-empty. The + // counter name must round-trip through the exporter's dedup + + // label-sorting, so asserting on its presence validates the + // whole pipeline — constants, descriptor registration, encoder. + record_block_scanned("bnb"); + charon_metrics::set_position_bucket("bnb", charon_metrics::bucket::HEALTHY, 1); + + // Mempool (#300), gas (#301), and RPC (#302) series — record + // at least one sample per series so the scrape surfaces the + // metric name and its expected label set. Each of these is the + // end-to-end regression gate for a typo in the name constant + // or a missing `describe_*` registration. + charon_metrics::set_mempool_pending_oracle_updates("bnb", 2); + charon_metrics::record_mempool_drained("bnb", 5); + charon_metrics::record_mempool_ws_reconnect("bnb"); + + charon_metrics::set_gas_base_fee_wei("bnb", 3_000_000_000); + charon_metrics::set_gas_priority_fee_wei("bnb", 1_000_000_000); + charon_metrics::set_gas_max_fee_wei("bnb", 5_000_000_000); + charon_metrics::record_gas_ceiling_skip("bnb", charon_metrics::gas_skip_reason::CEILING); + + charon_metrics::record_rpc_call( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::endpoint_kind::PUBLIC, + 0.012, + ); + charon_metrics::record_rpc_error( + charon_metrics::rpc_method::ETH_CALL, + charon_metrics::rpc_error::TIMEOUT, + ); + charon_metrics::record_rpc_reconnect(charon_metrics::endpoint_kind::PRIVATE); + + // Give the recorder a beat to flush the new sample into the + // renderer's internal state. + tokio::time::sleep(Duration::from_millis(50)).await; + + let body = tokio::task::spawn_blocking(move || scrape(bind)) + .await + .expect("scrape task must not panic") + .expect("scrape must succeed"); + + // `# HELP` lines are emitted by `describe_*` calls — their + // presence proves `describe_all()` ran. + assert!( + body.contains("# HELP"), + "scrape body missing `# HELP` metadata; got:\n{body}" + ); + assert!( + body.contains("# TYPE"), + "scrape body missing `# TYPE` metadata; got:\n{body}" + ); + + // Metric-name constants must flow end-to-end. Assert on the raw + // strings from `names` so a typo in the constant or a call-site + // drift surfaces here rather than in a silent Grafana panel. + assert!( + body.contains(names::SCANNER_BLOCKS_TOTAL), + "scrape body missing `{}`; got:\n{body}", + names::SCANNER_BLOCKS_TOTAL, + ); + assert!( + body.contains(names::SCANNER_POSITIONS), + "scrape body missing `{}`; got:\n{body}", + names::SCANNER_POSITIONS, + ); + + // Label round-trip: the `chain="bnb"` label must show up on the + // counter line. Guards against a regression where label keys are + // dropped by the exporter's relabeling / matcher config. + assert!( + body.contains("chain=\"bnb\""), + "scrape body missing expected `chain=\"bnb\"` label; got:\n{body}" + ); + + // Each new series (#300 / #301 / #302) must appear by name so + // typos in the constants or a missing describe_* call surface + // here. Label values exercised above are checked for at least + // one representative sample per metric family. + for name in [ + names::MEMPOOL_PENDING_ORACLE_UPDATES, + names::MEMPOOL_DRAINED_TOTAL, + names::MEMPOOL_WS_RECONNECTS_TOTAL, + names::GAS_BASE_FEE_WEI, + names::GAS_PRIORITY_FEE_WEI, + names::GAS_MAX_FEE_WEI, + names::GAS_CEILING_SKIPS_TOTAL, + names::RPC_CALL_DURATION_SECONDS, + names::RPC_ERRORS_TOTAL, + names::RPC_RECONNECTS_TOTAL, + ] { + assert!( + body.contains(name), + "scrape body missing `{name}`; got:\n{body}" + ); + } + + // Label-set regression gates: one representative pair per + // metric family. A drift in label naming (e.g. renaming + // `endpoint_kind` to `kind`) otherwise silently breaks every + // dashboard that pivots on the label. + assert!( + body.contains("reason=\"ceiling\""), + "gas ceiling-skip counter missing `reason` label; got:\n{body}" + ); + assert!( + body.contains("method=\"eth_call\""), + "rpc duration histogram missing `method` label; got:\n{body}" + ); + assert!( + body.contains("endpoint_kind=\"public\""), + "rpc duration histogram missing `endpoint_kind` label; got:\n{body}" + ); + assert!( + body.contains("error_kind=\"timeout\""), + "rpc error counter missing `error_kind` label; got:\n{body}" + ); + assert!( + body.contains("endpoint_kind=\"private\""), + "rpc reconnect counter missing `endpoint_kind=\"private\"`; got:\n{body}" + ); +} + +/// Minimal HTTP/1.1 GET over raw TCP. Returns the response body +/// (everything after the first `\r\n\r\n`). Anything non-200 or a +/// malformed response is surfaced as an `Err`. +fn scrape(bind: SocketAddr) -> std::io::Result { + let mut stream = TcpStream::connect_timeout(&bind, Duration::from_secs(2))?; + stream.set_read_timeout(Some(Duration::from_secs(2)))?; + stream.set_write_timeout(Some(Duration::from_secs(2)))?; + + let request = format!( + "GET /metrics HTTP/1.1\r\nHost: {bind}\r\nConnection: close\r\nUser-Agent: charon-test\r\n\r\n", + ); + stream.write_all(request.as_bytes())?; + + let mut raw = Vec::with_capacity(8 * 1024); + stream.read_to_end(&mut raw)?; + + let text = String::from_utf8_lossy(&raw).into_owned(); + let status_line = text.lines().next().unwrap_or(""); + if !status_line.starts_with("HTTP/1.1 200") && !status_line.starts_with("HTTP/1.0 200") { + return Err(std::io::Error::other(format!( + "unexpected response status: {status_line}" + ))); + } + + let body_start = text + .find("\r\n\r\n") + .and_then(|i| i.checked_add(4)) + .unwrap_or(0); + Ok(text[body_start..].to_owned()) +} diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index f1cc6e9..52fd74e 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -7,6 +7,7 @@ description = "Chain listener and health-factor scanner for Charon" [dependencies] charon-core = { workspace = true } +charon-metrics = { workspace = true } alloy = { workspace = true } anyhow = { workspace = true } thiserror = { workspace = true } diff --git a/crates/charon-scanner/src/listener.rs b/crates/charon-scanner/src/listener.rs index ca6c8cd..9460f76 100644 --- a/crates/charon-scanner/src/listener.rs +++ b/crates/charon-scanner/src/listener.rs @@ -77,6 +77,14 @@ impl BlockListener { /// Run the listener forever. Reconnects with jittered exponential backoff /// on any connection or subscription error. Returns `Ok(())` only if the /// receiving side of the channel is dropped. + /// + /// Increments both the per-chain listener counters + /// (`charon_listener_connects_total`, + /// `charon_listener_disconnects_total`) and + /// [`charon_rpc_connection_reconnects_total`](charon_metrics::names::RPC_RECONNECTS_TOTAL) + /// under `endpoint_kind="public"` on every reconnect attempt + /// (issue #302) — the `newHeads` stream rides the chain's + /// public pubsub endpoint. pub async fn run(mut self) -> Result<()> { let mut backoff = Duration::from_secs(1); loop { @@ -93,6 +101,13 @@ impl BlockListener { "chain" => self.name.clone() ) .increment(1); + // Cross-chain RPC-reconnect counter (#302). + // `newHeads` rides the public chain RPC, so + // `endpoint_kind::PUBLIC` is the right label — + // the submitter (PRIVATE) owns its own counter. + charon_metrics::record_rpc_reconnect( + charon_metrics::endpoint_kind::PUBLIC, + ); let jitter_ms = rand::thread_rng() .gen_range(0..=(backoff.as_millis() as u64).saturating_div(4)); let wait = backoff + Duration::from_millis(jitter_ms); diff --git a/crates/charon-scanner/src/mempool.rs b/crates/charon-scanner/src/mempool.rs index e82bfc7..d9c773e 100644 --- a/crates/charon-scanner/src/mempool.rs +++ b/crates/charon-scanner/src/mempool.rs @@ -393,6 +393,13 @@ pub struct PendingCache { selectors: HashSet>, pending: DashMap, max_pending_age_secs: u64, + /// Short chain name used as the `chain` label on every mempool + /// metric emitted from this cache (issues #300 / #222). Empty + /// string means "unlabelled" — unit tests use `new` / + /// `with_defaults` which default to empty; production callers + /// use `new_for_chain` / `with_defaults_for_chain` so dashboards + /// pivot on the same chain dimension the scanner uses. + chain: String, } impl PendingCache { @@ -400,12 +407,26 @@ impl PendingCache { oracle: Address, selectors: HashSet>, max_pending_age: Duration, + ) -> Self { + Self::new_for_chain(String::new(), oracle, selectors, max_pending_age) + } + + /// Constructor that binds a `chain` short-name onto every metric + /// emission. Label is the same string the scanner uses (`"bnb"` + /// on BSC) so Grafana joins mempool, scanner, and executor + /// panels on one dimension. + pub fn new_for_chain( + chain: impl Into, + oracle: Address, + selectors: HashSet>, + max_pending_age: Duration, ) -> Self { Self { oracle, selectors, pending: DashMap::new(), max_pending_age_secs: max_pending_age.as_secs(), + chain: chain.into(), } } @@ -413,6 +434,24 @@ impl PendingCache { Self::new(oracle, default_selectors(), DEFAULT_MAX_PENDING_AGE) } + /// Chain-labelled variant of [`PendingCache::with_defaults`]. + /// Use in production so `charon_mempool_*` samples carry the + /// `chain` label. + pub fn with_defaults_for_chain(chain: impl Into, oracle: Address) -> Self { + Self::new_for_chain( + chain, + oracle, + default_selectors(), + DEFAULT_MAX_PENDING_AGE, + ) + } + + /// Chain short-name this cache labels its metrics with. Empty + /// when constructed via the non-chain-aware constructors. + pub fn chain(&self) -> &str { + &self.chain + } + pub fn oracle(&self) -> Address { self.oracle } @@ -432,6 +471,15 @@ impl PendingCache { "pre-signed liquidation armed" ); self.pending.insert(tx.borrower, tx); + // Gauge: live pending-oracle-update count. `pending_len()` + // reads the DashMap length after the insert lands, so a + // concurrent drain racing this insert still surfaces a + // well-defined value (count is eventually consistent, which + // is what dashboards need — not per-event accuracy). + charon_metrics::set_mempool_pending_oracle_updates( + &self.chain, + self.pending.len() as u64, + ); } pub fn pending_len(&self) -> usize { @@ -525,6 +573,16 @@ impl PendingCache { stale, "mempool cache drained for block" ); + // Counter: drained pre-signs this block, + gauge snapshot of + // the remaining pending set after re-queue. Dashboards graph + // these together so a spike in drained+a flat pending means + // "happy path", whereas drained=0 + pending climbing means + // "triggers not landing on-chain, TTL about to sweep". + charon_metrics::record_mempool_drained(&self.chain, out.len() as u64); + charon_metrics::set_mempool_pending_oracle_updates( + &self.chain, + self.pending.len() as u64, + ); out } @@ -603,7 +661,9 @@ pub struct MempoolMonitor { } impl MempoolMonitor { - /// Full-control constructor. + /// Full-control constructor. Metrics emitted from the inner + /// cache carry an empty `chain` label — use + /// [`MempoolMonitor::new_for_chain`] in production. pub fn new( provider: Arc>, oracle: Address, @@ -616,6 +676,25 @@ impl MempoolMonitor { } } + /// Chain-labelled full-control constructor. + pub fn new_for_chain( + chain: impl Into, + provider: Arc>, + oracle: Address, + selectors: HashSet>, + max_pending_age: Duration, + ) -> Self { + Self { + provider, + cache: Arc::new(PendingCache::new_for_chain( + chain, + oracle, + selectors, + max_pending_age, + )), + } + } + /// Convenience: build with [`default_selectors`] and /// [`DEFAULT_MAX_PENDING_AGE`]. pub fn with_defaults(provider: Arc>, oracle: Address) -> Self { @@ -627,6 +706,24 @@ impl MempoolMonitor { ) } + /// Chain-labelled variant of [`MempoolMonitor::with_defaults`]. + /// Use in production so `charon_mempool_*` samples carry the + /// `chain` label — the CLI wires this with the venus pipeline's + /// chain name. + pub fn with_defaults_for_chain( + chain: impl Into, + provider: Arc>, + oracle: Address, + ) -> Self { + Self::new_for_chain( + chain, + provider, + oracle, + default_selectors(), + DEFAULT_MAX_PENDING_AGE, + ) + } + pub fn oracle(&self) -> Address { self.cache.oracle() } @@ -670,6 +767,18 @@ impl MempoolMonitor { backoff_secs = backoff.as_secs(), "mempool subscription error, reconnecting after backoff" ); + // Two counters on one event: the mempool-specific + // reconnect counter carries the `chain` label so + // operators can pinpoint which chain's upstream + // is flaky, and the cross-chain RPC reconnect + // counter (#302) pivots on `endpoint_kind` so the + // "public upstream is wobbling" dashboard also + // picks this up without depending on the mempool + // being wired. + charon_metrics::record_mempool_ws_reconnect(self.cache.chain()); + charon_metrics::record_rpc_reconnect( + charon_metrics::endpoint_kind::PUBLIC, + ); tokio::time::sleep(backoff).await; backoff = backoff_with_jitter(backoff, MAX_BACKOFF); }