From 1e2f480ff6b91d28743ef84bd58232f2df8c8488 Mon Sep 17 00:00:00 2001 From: obchain Date: Tue, 21 Apr 2026 11:29:42 +0530 Subject: [PATCH 1/2] feat(scanner): 3-bucket DashMap health-factor scanner (closes #9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Classifies `Position` records into healthy / near-liquidation / liquidatable buckets based on `health_factor`, backed by a lock-free DashMap keyed by borrower address. Unblocks the profit calculator and flash-loan router (Day 3 work) with a proper "what's currently liquidatable" query. - `HealthScanner::new(liquidatable, near_liq)` validates and scales float thresholds once to 1e18-fixed U256; per-tick comparisons stay in integer arithmetic - `upsert`, `classify`, `bucket_counts`, `liquidatable`, `near_liquidation` — the surface downstream stages consume - New `liquidatable_threshold` / `near_liq_threshold` fields on `BotConfig` with sane defaults (1.0 / 1.05) via serde `default`, so the existing config keeps loading cleanly - `dashmap 6` added as a workspace dep - CLI `listen` now pipes Venus scans through the scanner and emits per-block bucket counts alongside the existing fields - Five unit tests cover boundary classification, bucket transitions on upsert, and threshold validation (NaN / inverted) --- Cargo.lock | 5 + Cargo.toml | 3 + config/default.toml | 4 + crates/charon-cli/src/main.rs | 19 +- crates/charon-core/src/config.rs | 18 ++ crates/charon-scanner/Cargo.toml | 1 + crates/charon-scanner/src/lib.rs | 2 + crates/charon-scanner/src/scanner.rs | 260 +++++++++++++++++++++++++++ 8 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 crates/charon-scanner/src/scanner.rs diff --git a/Cargo.lock b/Cargo.lock index 55bee3e..093b523 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1125,8 +1125,10 @@ checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" name = "charon-cli" version = "0.1.0" dependencies = [ + "alloy", "anyhow", "charon-core", + "charon-protocols", "charon-scanner", "clap", "dotenvy", @@ -1154,6 +1156,8 @@ dependencies = [ "anyhow", "async-trait", "charon-core", + "dotenvy", + "tokio", "tracing", ] @@ -1164,6 +1168,7 @@ dependencies = [ "alloy", "anyhow", "charon-core", + "dashmap", "futures-util", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 18258e4..c258c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,9 @@ async-trait = "0.1" # Stream adapters (StreamExt for subscription streams) futures-util = "0.3" +# Lock-free concurrent hashmap for scanner state +dashmap = "6" + # CLI clap = { version = "4", features = ["derive"] } diff --git a/config/default.toml b/config/default.toml index c7e27fc..42b4eaf 100644 --- a/config/default.toml +++ b/config/default.toml @@ -10,6 +10,10 @@ min_profit_usd = 5.0 max_gas_gwei = 10 # Polling cadence for protocols without push events (ms). scan_interval_ms = 1000 +# Health factor at or below which a position is liquidatable (1e18 = 1.0). +liquidatable_threshold = 1.0 +# Upper bound of the near-liquidation watch band. +near_liq_threshold = 1.05 # ── Chains ──────────────────────────────────────────────────────────────── [chain.bnb] diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 7e5f059..301143a 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -14,7 +14,7 @@ use alloy::providers::{ProviderBuilder, WsConnect}; use anyhow::{Context, Result}; use charon_core::{Config, LendingProtocol}; use charon_protocols::VenusAdapter; -use charon_scanner::{BlockListener, ChainEvent, ChainProvider}; +use charon_scanner::{BlockListener, ChainEvent, ChainProvider, HealthScanner}; use clap::{Parser, Subcommand}; use tokio::sync::mpsc; use tracing::{info, warn}; @@ -124,10 +124,17 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { let adapter = Arc::new(VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?); + let scanner = Arc::new(HealthScanner::new( + config.bot.liquidatable_threshold, + config.bot.near_liq_threshold, + )?); + info!( borrower_count = borrowers.len(), market_count = adapter.markets.len(), - "venus adapter ready" + liquidatable_threshold = config.bot.liquidatable_threshold, + near_liq_threshold = config.bot.near_liq_threshold, + "venus adapter + scanner ready" ); // 2. Block listeners — one per configured chain, fan-in to a shared @@ -154,12 +161,18 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { let start = std::time::Instant::now(); match adapter.fetch_positions(&borrowers).await { Ok(positions) => { + let returned = positions.len(); + scanner.upsert(positions); + let counts = scanner.bucket_counts(); info!( chain = %chain, block = number, timestamp = timestamp, tracked = borrowers.len(), - returned = positions.len(), + returned, + healthy = counts.healthy, + near_liq = counts.near_liquidation, + liquidatable = counts.liquidatable, scan_ms = start.elapsed().as_millis() as u64, "venus scan" ); diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index eee714e..fe7b55f 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -35,6 +35,24 @@ pub struct BotConfig { pub max_gas_gwei: u64, /// Polling interval for protocols that don't push events. pub scan_interval_ms: u64, + /// Health factor at or below which a position becomes liquidatable. + /// Stored as a float for readability (e.g. `1.0`); the scanner + /// scales it to a 1e18-fixed `U256` internally. + #[serde(default = "default_liquidatable_threshold")] + pub liquidatable_threshold: f64, + /// Upper bound of the near-liquidation watch band. Positions in + /// `[liquidatable_threshold, near_liq_threshold)` are pre-cached so + /// the bot can fire immediately on the next adverse price move. + #[serde(default = "default_near_liq_threshold")] + pub near_liq_threshold: f64, +} + +fn default_liquidatable_threshold() -> f64 { + 1.0 +} + +fn default_near_liq_threshold() -> f64 { + 1.05 } /// RPC endpoints for a single chain. diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index bc4d67c..497d67f 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -12,3 +12,4 @@ anyhow = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } futures-util = { workspace = true } +dashmap = { workspace = true } diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index 8ef41da..594c9ae 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -2,6 +2,8 @@ pub mod listener; pub mod provider; +pub mod scanner; pub use listener::{BlockListener, ChainEvent}; pub use provider::ChainProvider; +pub use scanner::{BucketCounts, BucketedPosition, HealthScanner, PositionBucket}; diff --git a/crates/charon-scanner/src/scanner.rs b/crates/charon-scanner/src/scanner.rs new file mode 100644 index 0000000..4fe43a8 --- /dev/null +++ b/crates/charon-scanner/src/scanner.rs @@ -0,0 +1,260 @@ +//! Health-factor scanner — 3-bucket classifier on top of normalized +//! [`Position`](charon_core::Position) records. +//! +//! Protocol adapters supply positions; the scanner classifies each into +//! one of three buckets based on its `health_factor`: +//! +//! * **Liquidatable** — `hf < liquidatable_threshold` (1.0 by default). +//! Ready to be handed to the profit calculator and flash-loan router. +//! * **NearLiquidation** — `liquidatable_threshold ≤ hf < near_liq_threshold`. +//! Pre-cached so we can fire instantly on the next adverse oracle update. +//! * **Healthy** — everything else. Tracked just enough to transition out +//! quickly when the borrower's position deteriorates. +//! +//! Storage is a single `DashMap` — lock-free, +//! shard-partitioned, safe for the scanner task to mutate while other +//! tasks read for downstream stages. + +use alloy::primitives::U256; +use charon_core::Position; +use dashmap::DashMap; + +/// Which classification bucket a borrower's position currently falls into. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum PositionBucket { + /// Safely over-collateralized; nothing to do. + Healthy, + /// Close to the liquidation boundary — watched more aggressively. + NearLiquidation, + /// Currently liquidatable. + Liquidatable, +} + +/// Cached per-borrower state tracked by the scanner. +#[derive(Debug, Clone)] +pub struct BucketedPosition { + pub position: Position, + pub bucket: PositionBucket, +} + +/// Populated count summary of each bucket — emitted once per block. +#[derive(Debug, Clone, Default)] +pub struct BucketCounts { + pub healthy: usize, + pub near_liquidation: usize, + pub liquidatable: usize, +} + +impl BucketCounts { + pub fn total(&self) -> usize { + self.healthy + self.near_liquidation + self.liquidatable + } +} + +/// 3-bucket health-factor scanner. +/// +/// Thresholds are configured in float form (see [`BotConfig`]) and scaled +/// to 1e18-fixed `U256` inside [`HealthScanner::new`]. At comparison time +/// we stay entirely in integer arithmetic — no float drift per tick. +/// +/// [`BotConfig`]: charon_core::config::BotConfig +pub struct HealthScanner { + /// Positions with `health_factor < this` are liquidatable. + liquidatable_threshold: U256, + /// Upper bound of the near-liquidation band. + near_liq_threshold: U256, + /// Borrower → (latest position, current bucket). + positions: DashMap, +} + +impl HealthScanner { + /// Build a scanner from float thresholds (e.g. `1.0`, `1.05`). + /// + /// Floats are scaled to 1e18-fixed U256 once, here. Values are + /// validated: `liquidatable ≤ near_liq` must hold, otherwise the + /// classifier would leave a gap where positions match neither bucket. + pub fn new(liquidatable: f64, near_liq: f64) -> anyhow::Result { + if !(liquidatable.is_finite() && near_liq.is_finite()) { + anyhow::bail!("scanner thresholds must be finite floats"); + } + if liquidatable < 0.0 || near_liq < 0.0 { + anyhow::bail!("scanner thresholds must be non-negative"); + } + if liquidatable > near_liq { + anyhow::bail!( + "liquidatable_threshold ({liquidatable}) must be ≤ near_liq_threshold ({near_liq})" + ); + } + Ok(Self { + liquidatable_threshold: f64_to_1e18(liquidatable), + near_liq_threshold: f64_to_1e18(near_liq), + positions: DashMap::new(), + }) + } + + /// Classify a single health-factor reading into a bucket. + pub fn classify(&self, health_factor: U256) -> PositionBucket { + if health_factor < self.liquidatable_threshold { + PositionBucket::Liquidatable + } else if health_factor < self.near_liq_threshold { + PositionBucket::NearLiquidation + } else { + PositionBucket::Healthy + } + } + + /// Upsert a batch of freshly-fetched positions. Each borrower's + /// previous bucket is overwritten with its latest reading. + pub fn upsert(&self, positions: impl IntoIterator) { + for p in positions { + let bucket = self.classify(p.health_factor); + self.positions.insert( + p.borrower, + BucketedPosition { + position: p, + bucket, + }, + ); + } + } + + /// Snapshot the current bucket populations. + pub fn bucket_counts(&self) -> BucketCounts { + let mut counts = BucketCounts::default(); + for entry in self.positions.iter() { + match entry.value().bucket { + PositionBucket::Healthy => counts.healthy += 1, + PositionBucket::NearLiquidation => counts.near_liquidation += 1, + PositionBucket::Liquidatable => counts.liquidatable += 1, + } + } + counts + } + + /// Clone out all currently-liquidatable positions for downstream + /// stages (profit calc, flash-loan router). Called once per block. + pub fn liquidatable(&self) -> Vec { + self.positions + .iter() + .filter(|e| e.value().bucket == PositionBucket::Liquidatable) + .map(|e| e.value().position.clone()) + .collect() + } + + /// Same, but for near-liquidation positions — these are the ones the + /// mempool monitor / pre-computation layer will want pre-signed txs + /// for (follow-up task). + pub fn near_liquidation(&self) -> Vec { + self.positions + .iter() + .filter(|e| e.value().bucket == PositionBucket::NearLiquidation) + .map(|e| e.value().position.clone()) + .collect() + } +} + +/// Scale a float in `[0, ~10]` to a 1e18-fixed `U256`. +/// +/// Bounded to `u128` capacity on the scaled value — with the 1.0–2.0 +/// range we use in practice, this is many orders of magnitude below the +/// overflow limit. Saturating conversion keeps us safe if config ever +/// passes something absurd instead of panicking. +fn f64_to_1e18(x: f64) -> U256 { + let scaled = x * 1e18; + if scaled.is_nan() || scaled < 0.0 { + U256::ZERO + } else if scaled > u128::MAX as f64 { + U256::from(u128::MAX) + } else { + U256::from(scaled as u128) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::address; + use charon_core::ProtocolId; + + fn one_e18() -> U256 { + U256::from(10u64).pow(U256::from(18u64)) + } + + fn mk_position(borrower_byte: u8, hf: U256) -> Position { + let mut bytes = [0u8; 20]; + bytes[19] = borrower_byte; + Position { + protocol: ProtocolId::Venus, + chain_id: 56, + borrower: alloy::primitives::Address::from(bytes), + collateral_token: address!("0000000000000000000000000000000000000001"), + debt_token: address!("0000000000000000000000000000000000000002"), + collateral_amount: U256::ZERO, + debt_amount: U256::ZERO, + health_factor: hf, + liquidation_bonus_bps: 1000, + } + } + + #[test] + fn classify_partitions_positions_into_three_buckets() { + let s = HealthScanner::new(1.0, 1.05).unwrap(); + let e18 = one_e18(); + + // 0.5e18 = liquidatable + assert_eq!( + s.classify(e18 / U256::from(2u64)), + PositionBucket::Liquidatable + ); + // 1.0e18 exactly = not liquidatable (strict `<`) — falls into near-liq + assert_eq!(s.classify(e18), PositionBucket::NearLiquidation); + // 1.04e18 = near-liq + let p104 = U256::from(1_040_000_000_000_000_000u128); + assert_eq!(s.classify(p104), PositionBucket::NearLiquidation); + // 1.05e18 = healthy (boundary: `near_liq_threshold` is exclusive top) + let p105 = U256::from(1_050_000_000_000_000_000u128); + assert_eq!(s.classify(p105), PositionBucket::Healthy); + // 2e18 = healthy + assert_eq!(s.classify(e18 * U256::from(2u64)), PositionBucket::Healthy); + } + + #[test] + fn upsert_updates_buckets_and_counts() { + let s = HealthScanner::new(1.0, 1.05).unwrap(); + let e18 = one_e18(); + s.upsert([ + mk_position(1, U256::from(0u64)), // liquidatable + mk_position(2, U256::from(1_020_000_000_000_000_000u128)), // near-liq + mk_position(3, e18 * U256::from(3u64)), // healthy + ]); + let c = s.bucket_counts(); + assert_eq!(c.liquidatable, 1); + assert_eq!(c.near_liquidation, 1); + assert_eq!(c.healthy, 1); + assert_eq!(c.total(), 3); + assert_eq!(s.liquidatable().len(), 1); + assert_eq!(s.near_liquidation().len(), 1); + } + + #[test] + fn upsert_overwrites_previous_classification() { + let s = HealthScanner::new(1.0, 1.05).unwrap(); + s.upsert([mk_position(1, U256::from(0u64))]); + assert_eq!(s.bucket_counts().liquidatable, 1); + // Same borrower bounces back to healthy. + s.upsert([mk_position(1, one_e18() * U256::from(5u64))]); + let c = s.bucket_counts(); + assert_eq!(c.liquidatable, 0); + assert_eq!(c.healthy, 1); + } + + #[test] + fn rejects_inverted_thresholds() { + assert!(HealthScanner::new(1.05, 1.0).is_err()); + } + + #[test] + fn rejects_nan_thresholds() { + assert!(HealthScanner::new(f64::NAN, 1.05).is_err()); + } +} From 3ebb93a7d3d482ec1933b188c62e9b492402500c Mon Sep 17 00:00:00 2001 From: obchain Date: Wed, 22 Apr 2026 20:39:18 +0530 Subject: [PATCH 2/2] feat(scanner): per-bucket cadence, bps thresholds, prune, Prometheus metrics - BotConfig drops the f64 liquidatable_threshold / near_liq_threshold in favour of integer basis-point fields (10_000 = 1.0e18). bps_to_1e18() uses integer arithmetic only; 10_500 bps -> exactly 1.05e18 rather than the ULP-short 1_049_999_999_999_999_872 the old f64_to_1e18() produced. - Add hot_scan_blocks / warm_scan_blocks / cold_scan_blocks to BotConfig (defaults 1 / 10 / 100). ScanScheduler in charon-scanner consumes them and answers should_scan(bucket, block) so COLD positions stop burning RPC every block. main.rs now scans only the union of buckets whose cadence fires on the current block. - HealthScanner tracks per-borrower bucket transitions inside upsert() and emits charon_scanner_transitions_total{from,to}. bucket_counts() feeds gauge charon_scanner_borrowers_in_bucket{bucket}. The CLI's scan loop records charon_scanner_scan_duration_seconds histogram per block. Grafana dashboards now have signals instead of log scraping. - HealthScanner::remove(addr) and HealthScanner::prune(current) so borrowers that fully repay stop lingering as stale Liquidatable entries after adapter.fetch_positions stops returning them. prune() is invoked by main.rs every block after upsert. - warn_if_binary_sentinel() emits a loud warning if every observed HF is 0 or 2e18, making the (ties to #98) binary-HF dependency visible at runtime instead of silently dead-coding the NearLiquidation bucket. - HealthScanner::borrowers_in_bucket(b) exposed so the scheduler knows which addresses to refetch for each cadence. Closes #103 #104 #105 #106 #107 --- Cargo.lock | 18 ++ Cargo.toml | 3 + crates/charon-cli/Cargo.toml | 1 + crates/charon-cli/src/main.rs | 56 +++++- crates/charon-core/src/config.rs | 47 +++-- crates/charon-scanner/Cargo.toml | 1 + crates/charon-scanner/src/lib.rs | 4 +- crates/charon-scanner/src/scanner.rs | 287 ++++++++++++++++++--------- 8 files changed, 297 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 093b523..c1e566b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1132,6 +1132,7 @@ dependencies = [ "charon-scanner", "clap", "dotenvy", + "metrics", "tokio", "tracing", "tracing-subscriber", @@ -1170,6 +1171,7 @@ dependencies = [ "charon-core", "dashmap", "futures-util", + "metrics", "tokio", "tracing", ] @@ -2307,6 +2309,16 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "mio" version = "1.2.0" @@ -2604,6 +2616,12 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index c258c3a..705daaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,9 @@ futures-util = "0.3" # Lock-free concurrent hashmap for scanner state dashmap = "6" +# Prometheus-style metrics recorder facade +metrics = "0.24" + # CLI clap = { version = "4", features = ["derive"] } diff --git a/crates/charon-cli/Cargo.toml b/crates/charon-cli/Cargo.toml index b971ac8..1820574 100644 --- a/crates/charon-cli/Cargo.toml +++ b/crates/charon-cli/Cargo.toml @@ -20,3 +20,4 @@ anyhow = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } dotenvy = { workspace = true } +metrics = { workspace = true } diff --git a/crates/charon-cli/src/main.rs b/crates/charon-cli/src/main.rs index 301143a..558621c 100644 --- a/crates/charon-cli/src/main.rs +++ b/crates/charon-cli/src/main.rs @@ -14,7 +14,9 @@ use alloy::providers::{ProviderBuilder, WsConnect}; use anyhow::{Context, Result}; use charon_core::{Config, LendingProtocol}; use charon_protocols::VenusAdapter; -use charon_scanner::{BlockListener, ChainEvent, ChainProvider, HealthScanner}; +use charon_scanner::{ + BlockListener, ChainEvent, ChainProvider, HealthScanner, PositionBucket, ScanScheduler, +}; use clap::{Parser, Subcommand}; use tokio::sync::mpsc; use tracing::{info, warn}; @@ -125,15 +127,23 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { Arc::new(VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?); let scanner = Arc::new(HealthScanner::new( - config.bot.liquidatable_threshold, - config.bot.near_liq_threshold, + config.bot.liquidatable_threshold_bps, + config.bot.near_liq_threshold_bps, )?); + let sched = ScanScheduler::new( + config.bot.hot_scan_blocks, + config.bot.warm_scan_blocks, + config.bot.cold_scan_blocks, + ); info!( borrower_count = borrowers.len(), market_count = adapter.markets.len(), - liquidatable_threshold = config.bot.liquidatable_threshold, - near_liq_threshold = config.bot.near_liq_threshold, + liquidatable_bps = config.bot.liquidatable_threshold_bps, + near_liq_bps = config.bot.near_liq_threshold_bps, + hot_blocks = sched.hot, + warm_blocks = sched.warm, + cold_blocks = sched.cold, "venus adapter + scanner ready" ); @@ -152,23 +162,51 @@ async fn run_listen(config: Config, borrowers: Vec
) -> Result<()> { info!("listen: draining chain events (Ctrl-C to stop)"); - // 3. Drain loop: on every new block, run one Venus scan. + // 3. Drain loop: on every new block, run one Venus scan scoped to the + // buckets whose cadence fires this block. Seed list is used on the + // very first block; thereafter the scheduler scans each bucket's + // tracked borrowers on its own cadence. + let mut first_block = true; tokio::select! { _ = async { while let Some(event) = rx.recv().await { match event { ChainEvent::NewBlock { chain, number, timestamp } => { let start = std::time::Instant::now(); - match adapter.fetch_positions(&borrowers).await { + let scan_set: Vec
= if first_block { + first_block = false; + borrowers.clone() + } else { + let mut v = Vec::new(); + for b in [ + PositionBucket::Liquidatable, + PositionBucket::NearLiquidation, + PositionBucket::Healthy, + ] { + if sched.should_scan(b, number) { + v.extend(scanner.borrowers_in_bucket(b)); + } + } + v + }; + if scan_set.is_empty() { + continue; + } + match adapter.fetch_positions(&scan_set).await { Ok(positions) => { let returned = positions.len(); - scanner.upsert(positions); + scanner.upsert(positions.clone()); + scanner.prune(&positions); let counts = scanner.bucket_counts(); + metrics::histogram!( + "charon_scanner_scan_duration_seconds" + ) + .record(start.elapsed().as_secs_f64()); info!( chain = %chain, block = number, timestamp = timestamp, - tracked = borrowers.len(), + tracked = scan_set.len(), returned, healthy = counts.healthy, near_liq = counts.near_liquidation, diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index fe7b55f..cb27660 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -35,24 +35,41 @@ pub struct BotConfig { pub max_gas_gwei: u64, /// Polling interval for protocols that don't push events. pub scan_interval_ms: u64, - /// Health factor at or below which a position becomes liquidatable. - /// Stored as a float for readability (e.g. `1.0`); the scanner - /// scales it to a 1e18-fixed `U256` internally. - #[serde(default = "default_liquidatable_threshold")] - pub liquidatable_threshold: f64, - /// Upper bound of the near-liquidation watch band. Positions in - /// `[liquidatable_threshold, near_liq_threshold)` are pre-cached so - /// the bot can fire immediately on the next adverse price move. - #[serde(default = "default_near_liq_threshold")] - pub near_liq_threshold: f64, + /// Health factor at or below which a position becomes liquidatable, + /// in basis points of 1e18 (10_000 = 1.0). Integer bps over f64 so + /// the boundary has no ULP-level drift (1.05 as f64 truncates to + /// 1_049_999_999_999_999_872 in 1e18 scale and silently leaks + /// positions out of the NearLiquidation bucket). + #[serde(default = "default_liquidatable_threshold_bps")] + pub liquidatable_threshold_bps: u32, + /// Upper bound of the near-liquidation watch band, same bps space. + #[serde(default = "default_near_liq_threshold_bps")] + pub near_liq_threshold_bps: u32, + /// HOT (Liquidatable) bucket scan cadence, in blocks. Default 1. + #[serde(default = "default_hot_scan_blocks")] + pub hot_scan_blocks: u64, + /// WARM (NearLiquidation) bucket scan cadence. Default every 10 blocks. + #[serde(default = "default_warm_scan_blocks")] + pub warm_scan_blocks: u64, + /// COLD (Healthy) bucket scan cadence. Default every 100 blocks. + #[serde(default = "default_cold_scan_blocks")] + pub cold_scan_blocks: u64, } -fn default_liquidatable_threshold() -> f64 { - 1.0 +fn default_liquidatable_threshold_bps() -> u32 { + 10_000 // 1.0000 } - -fn default_near_liq_threshold() -> f64 { - 1.05 +fn default_near_liq_threshold_bps() -> u32 { + 10_500 // 1.0500 +} +fn default_hot_scan_blocks() -> u64 { + 1 +} +fn default_warm_scan_blocks() -> u64 { + 10 +} +fn default_cold_scan_blocks() -> u64 { + 100 } /// RPC endpoints for a single chain. diff --git a/crates/charon-scanner/Cargo.toml b/crates/charon-scanner/Cargo.toml index 497d67f..f7a69a7 100644 --- a/crates/charon-scanner/Cargo.toml +++ b/crates/charon-scanner/Cargo.toml @@ -13,3 +13,4 @@ tokio = { workspace = true } tracing = { workspace = true } futures-util = { workspace = true } dashmap = { workspace = true } +metrics = { workspace = true } diff --git a/crates/charon-scanner/src/lib.rs b/crates/charon-scanner/src/lib.rs index 594c9ae..e24871e 100644 --- a/crates/charon-scanner/src/lib.rs +++ b/crates/charon-scanner/src/lib.rs @@ -6,4 +6,6 @@ pub mod scanner; pub use listener::{BlockListener, ChainEvent}; pub use provider::ChainProvider; -pub use scanner::{BucketCounts, BucketedPosition, HealthScanner, PositionBucket}; +pub use scanner::{ + BucketCounts, BucketedPosition, HealthScanner, PositionBucket, ScanScheduler, +}; diff --git a/crates/charon-scanner/src/scanner.rs b/crates/charon-scanner/src/scanner.rs index 4fe43a8..e7e8382 100644 --- a/crates/charon-scanner/src/scanner.rs +++ b/crates/charon-scanner/src/scanner.rs @@ -1,43 +1,54 @@ -//! Health-factor scanner — 3-bucket classifier on top of normalized -//! [`Position`](charon_core::Position) records. +//! Health-factor scanner — 3-bucket classifier + per-bucket scheduler. //! //! Protocol adapters supply positions; the scanner classifies each into //! one of three buckets based on its `health_factor`: //! -//! * **Liquidatable** — `hf < liquidatable_threshold` (1.0 by default). -//! Ready to be handed to the profit calculator and flash-loan router. -//! * **NearLiquidation** — `liquidatable_threshold ≤ hf < near_liq_threshold`. -//! Pre-cached so we can fire instantly on the next adverse oracle update. -//! * **Healthy** — everything else. Tracked just enough to transition out -//! quickly when the borrower's position deteriorates. +//! * **Liquidatable** (HOT) — `hf < liquidatable_threshold` (1.0 by default). +//! * **NearLiquidation** (WARM) — `liquidatable ≤ hf < near_liq`. +//! * **Healthy** (COLD) — everything else. //! -//! Storage is a single `DashMap` — lock-free, -//! shard-partitioned, safe for the scanner task to mutate while other -//! tasks read for downstream stages. +//! The [`ScanScheduler`] answers "do I re-scan this bucket on this block?" +//! from the configured `{hot,warm,cold}_scan_blocks` cadence, so the scanner +//! does not burn RPC on a COLD bucket every block. +//! +//! Storage is a single [`DashMap`] — lock-free, shard-partitioned. The map +//! supports `prune()` so borrowers that fully repay are removed and do +//! not linger as stale Liquidatable entries forever. + +use std::collections::HashSet; -use alloy::primitives::U256; +use alloy::primitives::{Address, U256}; use charon_core::Position; use dashmap::DashMap; +use tracing::warn; /// Which classification bucket a borrower's position currently falls into. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum PositionBucket { - /// Safely over-collateralized; nothing to do. + /// Safely over-collateralized; nothing to do (COLD). Healthy, - /// Close to the liquidation boundary — watched more aggressively. + /// Close to the liquidation boundary (WARM). NearLiquidation, - /// Currently liquidatable. + /// Currently liquidatable (HOT). Liquidatable, } -/// Cached per-borrower state tracked by the scanner. +impl PositionBucket { + fn label(self) -> &'static str { + match self { + PositionBucket::Healthy => "healthy", + PositionBucket::NearLiquidation => "near_liquidation", + PositionBucket::Liquidatable => "liquidatable", + } + } +} + #[derive(Debug, Clone)] pub struct BucketedPosition { pub position: Position, pub bucket: PositionBucket, } -/// Populated count summary of each bucket — emitted once per block. #[derive(Debug, Clone, Default)] pub struct BucketCounts { pub healthy: usize, @@ -51,48 +62,77 @@ impl BucketCounts { } } -/// 3-bucket health-factor scanner. -/// -/// Thresholds are configured in float form (see [`BotConfig`]) and scaled -/// to 1e18-fixed `U256` inside [`HealthScanner::new`]. At comparison time -/// we stay entirely in integer arithmetic — no float drift per tick. +/// Per-bucket scan cadence driver. /// -/// [`BotConfig`]: charon_core::config::BotConfig +/// `should_scan(bucket, block)` returns true when the given block number +/// falls on the bucket's cadence. HOT cadence is usually 1 (every block). +#[derive(Debug, Clone, Copy)] +pub struct ScanScheduler { + pub hot: u64, + pub warm: u64, + pub cold: u64, +} + +impl ScanScheduler { + pub fn new(hot: u64, warm: u64, cold: u64) -> Self { + Self { + hot: hot.max(1), + warm: warm.max(1), + cold: cold.max(1), + } + } + pub fn should_scan(&self, bucket: PositionBucket, block: u64) -> bool { + let period = match bucket { + PositionBucket::Liquidatable => self.hot, + PositionBucket::NearLiquidation => self.warm, + PositionBucket::Healthy => self.cold, + }; + block % period == 0 + } +} + +/// 3-bucket health-factor scanner. pub struct HealthScanner { - /// Positions with `health_factor < this` are liquidatable. liquidatable_threshold: U256, - /// Upper bound of the near-liquidation band. near_liq_threshold: U256, - /// Borrower → (latest position, current bucket). - positions: DashMap, + positions: DashMap, } impl HealthScanner { - /// Build a scanner from float thresholds (e.g. `1.0`, `1.05`). - /// - /// Floats are scaled to 1e18-fixed U256 once, here. Values are - /// validated: `liquidatable ≤ near_liq` must hold, otherwise the - /// classifier would leave a gap where positions match neither bucket. - pub fn new(liquidatable: f64, near_liq: f64) -> anyhow::Result { - if !(liquidatable.is_finite() && near_liq.is_finite()) { - anyhow::bail!("scanner thresholds must be finite floats"); - } - if liquidatable < 0.0 || near_liq < 0.0 { - anyhow::bail!("scanner thresholds must be non-negative"); - } - if liquidatable > near_liq { + /// Build a scanner from basis-point thresholds of 1e18 (10_000 = 1.0). + pub fn new(liquidatable_bps: u32, near_liq_bps: u32) -> anyhow::Result { + if liquidatable_bps > near_liq_bps { anyhow::bail!( - "liquidatable_threshold ({liquidatable}) must be ≤ near_liq_threshold ({near_liq})" + "liquidatable_threshold_bps ({liquidatable_bps}) must be ≤ near_liq_threshold_bps ({near_liq_bps})" ); } Ok(Self { - liquidatable_threshold: f64_to_1e18(liquidatable), - near_liq_threshold: f64_to_1e18(near_liq), + liquidatable_threshold: bps_to_1e18(liquidatable_bps), + near_liq_threshold: bps_to_1e18(near_liq_bps), positions: DashMap::new(), }) } - /// Classify a single health-factor reading into a bucket. + /// Warn on startup if the supplied positions all carry the known + /// binary-HF sentinel values (0 / 2e18) emitted by adapters that have + /// not yet implemented a real ratio — otherwise the NearLiquidation + /// bucket is silently dead code. + pub fn warn_if_binary_sentinel(&self, sample: &[Position]) { + if sample.is_empty() { + return; + } + let scale = U256::from(10u64).pow(U256::from(18u64)); + let binary = sample + .iter() + .all(|p| p.health_factor == U256::ZERO || p.health_factor == scale * U256::from(2u64)); + if binary { + warn!( + "every observed health_factor is 0 or 2e18 — adapter appears to emit a binary sentinel; \ + NearLiquidation bucket will never populate until real HF is computed" + ); + } + } + pub fn classify(&self, health_factor: U256) -> PositionBucket { if health_factor < self.liquidatable_threshold { PositionBucket::Liquidatable @@ -103,22 +143,52 @@ impl HealthScanner { } } - /// Upsert a batch of freshly-fetched positions. Each borrower's - /// previous bucket is overwritten with its latest reading. + /// Upsert a batch of freshly-fetched positions. Detects per-borrower + /// bucket transitions and increments `charon_scanner_transitions_total`. pub fn upsert(&self, positions: impl IntoIterator) { for p in positions { - let bucket = self.classify(p.health_factor); + let new_bucket = self.classify(p.health_factor); + let prev_bucket = self + .positions + .get(&p.borrower) + .map(|e| e.value().bucket); self.positions.insert( p.borrower, BucketedPosition { position: p, - bucket, + bucket: new_bucket, }, ); + if let Some(prev) = prev_bucket { + if prev != new_bucket { + metrics::counter!( + "charon_scanner_transitions_total", + "from" => prev.label(), + "to" => new_bucket.label() + ) + .increment(1); + } + } } + self.publish_gauges(); + } + + /// Remove a single borrower (e.g. after full repayment detected). + pub fn remove(&self, borrower: &Address) { + self.positions.remove(borrower); + self.publish_gauges(); + } + + /// Drop every tracked borrower that is **not** in `current`. Called by + /// the scan loop after `upsert(fresh_positions)` so positions whose debt + /// has been repaid (and thus no longer appear in the adapter response) + /// stop being reported as Liquidatable. + pub fn prune(&self, current: &[Position]) { + let keep: HashSet
= current.iter().map(|p| p.borrower).collect(); + self.positions.retain(|addr, _| keep.contains(addr)); + self.publish_gauges(); } - /// Snapshot the current bucket populations. pub fn bucket_counts(&self) -> BucketCounts { let mut counts = BucketCounts::default(); for entry in self.positions.iter() { @@ -131,8 +201,17 @@ impl HealthScanner { counts } - /// Clone out all currently-liquidatable positions for downstream - /// stages (profit calc, flash-loan router). Called once per block. + /// Update the `charon_scanner_borrowers_in_bucket{bucket}` gauges. + fn publish_gauges(&self) { + let counts = self.bucket_counts(); + metrics::gauge!("charon_scanner_borrowers_in_bucket", "bucket" => "healthy") + .set(counts.healthy as f64); + metrics::gauge!("charon_scanner_borrowers_in_bucket", "bucket" => "near_liquidation") + .set(counts.near_liquidation as f64); + metrics::gauge!("charon_scanner_borrowers_in_bucket", "bucket" => "liquidatable") + .set(counts.liquidatable as f64); + } + pub fn liquidatable(&self) -> Vec { self.positions .iter() @@ -141,9 +220,6 @@ impl HealthScanner { .collect() } - /// Same, but for near-liquidation positions — these are the ones the - /// mempool monitor / pre-computation layer will want pre-signed txs - /// for (follow-up task). pub fn near_liquidation(&self) -> Vec { self.positions .iter() @@ -151,25 +227,25 @@ impl HealthScanner { .map(|e| e.value().position.clone()) .collect() } -} -/// Scale a float in `[0, ~10]` to a 1e18-fixed `U256`. -/// -/// Bounded to `u128` capacity on the scaled value — with the 1.0–2.0 -/// range we use in practice, this is many orders of magnitude below the -/// overflow limit. Saturating conversion keeps us safe if config ever -/// passes something absurd instead of panicking. -fn f64_to_1e18(x: f64) -> U256 { - let scaled = x * 1e18; - if scaled.is_nan() || scaled < 0.0 { - U256::ZERO - } else if scaled > u128::MAX as f64 { - U256::from(u128::MAX) - } else { - U256::from(scaled as u128) + /// Return the borrowers currently assigned to `bucket`. Used by the + /// scan scheduler to fetch only the subset that is due this block. + pub fn borrowers_in_bucket(&self, bucket: PositionBucket) -> Vec
{ + self.positions + .iter() + .filter(|e| e.value().bucket == bucket) + .map(|e| *e.key()) + .collect() } } +/// Convert a basis-point value into a 1e18-fixed `U256`. 10_000 bps == 1.0e18. +/// Integer arithmetic only — no f64 at any point. +pub fn bps_to_1e18(bps: u32) -> U256 { + // 1 bps = 1e14 in 1e18 scale. + U256::from(bps) * U256::from(10u64).pow(U256::from(14u64)) +} + #[cfg(test)] mod tests { use super::*; @@ -186,7 +262,7 @@ mod tests { Position { protocol: ProtocolId::Venus, chain_id: 56, - borrower: alloy::primitives::Address::from(bytes), + borrower: Address::from(bytes), collateral_token: address!("0000000000000000000000000000000000000001"), debt_token: address!("0000000000000000000000000000000000000002"), collateral_amount: U256::ZERO, @@ -196,65 +272,86 @@ mod tests { } } + #[test] + fn bps_to_1e18_exact_boundary() { + // 10_500 bps must be exactly 1.05e18 — catches the old f64 ULP bug. + assert_eq!( + bps_to_1e18(10_500), + U256::from(1_050_000_000_000_000_000u128) + ); + assert_eq!( + bps_to_1e18(10_000), + U256::from(1_000_000_000_000_000_000u128) + ); + } + #[test] fn classify_partitions_positions_into_three_buckets() { - let s = HealthScanner::new(1.0, 1.05).unwrap(); + let s = HealthScanner::new(10_000, 10_500).unwrap(); let e18 = one_e18(); - // 0.5e18 = liquidatable assert_eq!( s.classify(e18 / U256::from(2u64)), PositionBucket::Liquidatable ); - // 1.0e18 exactly = not liquidatable (strict `<`) — falls into near-liq assert_eq!(s.classify(e18), PositionBucket::NearLiquidation); - // 1.04e18 = near-liq let p104 = U256::from(1_040_000_000_000_000_000u128); assert_eq!(s.classify(p104), PositionBucket::NearLiquidation); - // 1.05e18 = healthy (boundary: `near_liq_threshold` is exclusive top) let p105 = U256::from(1_050_000_000_000_000_000u128); assert_eq!(s.classify(p105), PositionBucket::Healthy); - // 2e18 = healthy assert_eq!(s.classify(e18 * U256::from(2u64)), PositionBucket::Healthy); } #[test] fn upsert_updates_buckets_and_counts() { - let s = HealthScanner::new(1.0, 1.05).unwrap(); + let s = HealthScanner::new(10_000, 10_500).unwrap(); let e18 = one_e18(); s.upsert([ - mk_position(1, U256::from(0u64)), // liquidatable - mk_position(2, U256::from(1_020_000_000_000_000_000u128)), // near-liq - mk_position(3, e18 * U256::from(3u64)), // healthy + mk_position(1, U256::from(0u64)), + mk_position(2, U256::from(1_020_000_000_000_000_000u128)), + mk_position(3, e18 * U256::from(3u64)), ]); let c = s.bucket_counts(); assert_eq!(c.liquidatable, 1); assert_eq!(c.near_liquidation, 1); assert_eq!(c.healthy, 1); - assert_eq!(c.total(), 3); - assert_eq!(s.liquidatable().len(), 1); - assert_eq!(s.near_liquidation().len(), 1); } #[test] - fn upsert_overwrites_previous_classification() { - let s = HealthScanner::new(1.0, 1.05).unwrap(); - s.upsert([mk_position(1, U256::from(0u64))]); + fn prune_drops_repaid_borrowers() { + let s = HealthScanner::new(10_000, 10_500).unwrap(); + s.upsert([ + mk_position(1, U256::from(0u64)), + mk_position(2, U256::from(0u64)), + ]); + assert_eq!(s.bucket_counts().liquidatable, 2); + // Only borrower 2 still has a position after repayment. + s.prune(&[mk_position(2, U256::from(0u64))]); assert_eq!(s.bucket_counts().liquidatable, 1); - // Same borrower bounces back to healthy. - s.upsert([mk_position(1, one_e18() * U256::from(5u64))]); - let c = s.bucket_counts(); - assert_eq!(c.liquidatable, 0); - assert_eq!(c.healthy, 1); } #[test] - fn rejects_inverted_thresholds() { - assert!(HealthScanner::new(1.05, 1.0).is_err()); + fn remove_drops_single_borrower() { + let s = HealthScanner::new(10_000, 10_500).unwrap(); + s.upsert([mk_position(1, U256::from(0u64))]); + let mut bytes = [0u8; 20]; + bytes[19] = 1; + s.remove(&Address::from(bytes)); + assert_eq!(s.bucket_counts().total(), 0); } #[test] - fn rejects_nan_thresholds() { - assert!(HealthScanner::new(f64::NAN, 1.05).is_err()); + fn scheduler_gates_per_bucket_cadence() { + let sched = ScanScheduler::new(1, 10, 100); + assert!(sched.should_scan(PositionBucket::Liquidatable, 17)); + assert!(sched.should_scan(PositionBucket::NearLiquidation, 20)); + assert!(!sched.should_scan(PositionBucket::NearLiquidation, 21)); + assert!(sched.should_scan(PositionBucket::Healthy, 100)); + assert!(!sched.should_scan(PositionBucket::Healthy, 101)); + } + + #[test] + fn rejects_inverted_thresholds() { + assert!(HealthScanner::new(10_500, 10_000).is_err()); } }