Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ async-trait = "0.1"
# Stream adapters (StreamExt for subscription streams)
futures-util = "0.3"

# Lock-free concurrent hashmap for scanner state
dashmap = "6"

# Backoff jitter
rand = "0.8"

Expand Down
8 changes: 8 additions & 0 deletions config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ min_profit_usd_1e6 = 5000000
max_gas_wei = "3000000000"
# Polling cadence for protocols without push events (ms).
scan_interval_ms = 1000
# Health-factor band (bps of 1e18): liquidatable ≤ 10_000, near-liq window
# up to `near_liq_threshold_bps`. Omitted keys fall back to serde defaults:
# liquidatable_threshold_bps = 10000 (HF = 1.00)
# near_liq_threshold_bps = 10500 (HF = 1.05)
# Scan cadences per bucket (blocks): defaults hot=1 / warm=10 / cold=100.
# hot_scan_blocks = 1
# warm_scan_blocks = 10
# cold_scan_blocks = 100

# ── Chains ────────────────────────────────────────────────────────────────
[chain.bnb]
Expand Down
1 change: 1 addition & 0 deletions crates/charon-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
dotenvy = { workspace = true }
metrics = { workspace = true }

[lints]
workspace = true
189 changes: 127 additions & 62 deletions crates/charon-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use alloy::providers::{ProviderBuilder, WsConnect};
use anyhow::{Context, Result};
use charon_core::{Config, LendingProtocol};
use charon_protocols::VenusAdapter;
use charon_scanner::{BlockListener, ChainEvent, ChainProvider};
use charon_scanner::{
BlockListener, ChainEvent, ChainProvider, HealthScanner, PositionBucket, ScanScheduler,
};
use clap::{Parser, Subcommand};
use tokio::sync::mpsc;
use tracing::{info, warn};
Expand Down Expand Up @@ -126,49 +128,74 @@ async fn main() -> Result<()> {
/// cleanly on SIGINT or SIGTERM so the Docker `stop` → SIGTERM → SIGKILL
/// sequence never tears mid-operation.
///
/// For every `NewBlock` event on a chain with a `[protocol.venus]` entry,
/// the Venus adapter scans the supplied borrower list anchored at the
/// observed block. Chains without a Venus protocol config still flow
/// through the drain loop but trigger no protocol scans (v0.1 scope).
/// For every `NewBlock` event on a chain with a `[protocol.venus]` entry
/// the Venus adapter fetches positions anchored at the observed block,
/// pushes them through the bucketed [`HealthScanner`], and limits fetches
/// to buckets whose cadence fires this block via [`ScanScheduler`].
/// Chains without a Venus protocol config still flow through the drain
/// loop but trigger no protocol scans (v0.1 scope).
///
/// Backfill blocks (synthesised during WebSocket reconnect) are logged
/// but not scanned — the state they would produce is superseded by the
/// next real head and a fresh scan is cheaper than retroactive bucket
/// transitions.
async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {
if config.chain.is_empty() {
anyhow::bail!("no chains configured — nothing to listen to");
}

// Venus adapter is currently single-chain (BNB) per config scope.
// Build it only if `[protocol.venus]` exists and its target chain is
// configured; otherwise run the listener pipeline without a scanner.
let venus_adapter: Option<(String, Arc<VenusAdapter>)> = match config.protocol.get("venus") {
Some(venus_cfg) => {
let chain_name = &venus_cfg.chain;
let chain_cfg = config.chain.get(chain_name).with_context(|| {
format!(
"protocol 'venus' references chain '{chain_name}' which is not in [chain.*]"
)
})?;
let adapter_ws = ProviderBuilder::new()
.on_ws(WsConnect::new(&chain_cfg.ws_url))
.await
.context("venus adapter: failed to connect over ws")?;
let adapter =
Arc::new(VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?);
info!(
chain = %chain_name,
borrower_count = borrowers.len(),
market_count = adapter.markets().await.len(),
"venus adapter ready"
);
Some((chain_name.clone(), adapter))
}
None => {
info!("no [protocol.venus] configured — listener will drain events without scanning");
None
}
};
// Venus adapter + bucketed scanner + cadence scheduler are currently
// single-chain (BNB) per config scope. Build them only if
// `[protocol.venus]` exists and its target chain is configured;
// otherwise run the listener pipeline without a scanner.
let venus_adapter: Option<(String, Arc<VenusAdapter>, Arc<HealthScanner>, ScanScheduler)> =
match config.protocol.get("venus") {
Some(venus_cfg) => {
let chain_name = &venus_cfg.chain;
let chain_cfg = config.chain.get(chain_name).with_context(|| {
format!(
"protocol 'venus' references chain '{chain_name}' which is not in [chain.*]"
)
})?;
let adapter_ws = ProviderBuilder::new()
.on_ws(WsConnect::new(&chain_cfg.ws_url))
.await
.context("venus adapter: failed to connect over ws")?;
let adapter = Arc::new(
VenusAdapter::connect(Arc::new(adapter_ws), venus_cfg.comptroller).await?,
);
let scanner = Arc::new(HealthScanner::new(
config.bot.liquidatable_threshold_bps,
config.bot.near_liq_threshold_bps,
)?);
let sched = ScanScheduler::new(
config.bot.hot_scan_blocks,
config.bot.warm_scan_blocks,
config.bot.cold_scan_blocks,
);
info!(
chain = %chain_name,
borrower_count = borrowers.len(),
market_count = adapter.markets().await.len(),
liquidatable_bps = config.bot.liquidatable_threshold_bps,
near_liq_bps = config.bot.near_liq_threshold_bps,
hot_blocks = sched.hot,
warm_blocks = sched.warm,
cold_blocks = sched.cold,
"venus adapter + scanner ready"
);
Some((chain_name.clone(), adapter, scanner, sched))
}
None => {
info!(
"no [protocol.venus] configured — listener will drain events without scanning"
);
None
}
};

let (tx, mut rx) = mpsc::channel::<ChainEvent>(CHAIN_EVENT_CHANNEL);
let mut listeners: tokio::task::JoinSet<(String, Result<()>)> =
tokio::task::JoinSet::new();
let mut listeners: tokio::task::JoinSet<(String, Result<()>)> = tokio::task::JoinSet::new();

// `ChainConfig: Clone` — we only borrow `config`, so each listener task
// gets its own owned copy.
Expand All @@ -183,6 +210,11 @@ async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {

info!("listen: draining chain events (Ctrl-C or SIGTERM to stop)");

// The first real (non-backfill) block on the Venus chain seeds the
// scanner with the operator-supplied borrower list. Subsequent scans
// pull from the scheduler-selected bucket membership so we don't
// burn RPC re-fetching COLD positions every block.
let mut seeded = false;
tokio::select! {
_ = async {
while let Some(event) = rx.recv().await {
Expand All @@ -195,32 +227,67 @@ async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {
backfill,
"cli drained event"
);
// Route to Venus scan only when this event is for
// the chain the Venus adapter was configured on.
if let Some((venus_chain, adapter)) = venus_adapter.as_ref() {
if venus_chain == &chain {
let start = std::time::Instant::now();
let block_tag = BlockNumberOrTag::Number(number);
match adapter.fetch_positions(&borrowers, block_tag).await {
Ok(positions) => {
info!(
chain = %chain,
block = number,
timestamp,
backfill,
tracked = borrowers.len(),
returned = positions.len(),
scan_ms = start.elapsed().as_millis() as u64,
"venus scan"
);
if backfill {
// Skip backfill — the next real head will
// snapshot the final state of the missed range.
continue;
}
if let Some((venus_chain, adapter, scanner, sched)) =
venus_adapter.as_ref()
{
if venus_chain != &chain {
continue;
}
let start = std::time::Instant::now();
let scan_set: Vec<Address> = if !seeded {
seeded = true;
borrowers.clone()
} else {
let mut v = Vec::new();
for b in [
PositionBucket::Liquidatable,
PositionBucket::NearLiquidation,
PositionBucket::Healthy,
] {
if sched.should_scan(b, number) {
v.extend(scanner.borrowers_in_bucket(b));
}
Err(err) => warn!(
}
v
};
if scan_set.is_empty() {
continue;
}
let block_tag = BlockNumberOrTag::Number(number);
match adapter.fetch_positions(&scan_set, block_tag).await {
Ok(positions) => {
let returned = positions.len();
scanner.upsert(positions.clone());
scanner.prune(&positions);
let counts = scanner.bucket_counts();
metrics::histogram!(
"charon_scanner_scan_duration_seconds"
)
.record(start.elapsed().as_secs_f64());
info!(
chain = %chain,
block = number,
error = ?err,
"venus scan failed"
),
timestamp,
tracked = scan_set.len(),
returned,
healthy = counts.healthy,
near_liq = counts.near_liquidation,
liquidatable = counts.liquidatable,
scan_ms = start.elapsed().as_millis() as u64,
"venus scan"
);
}
Err(err) => warn!(
chain = %chain,
block = number,
error = ?err,
"venus scan failed"
),
}
}
}
Expand Down Expand Up @@ -248,9 +315,7 @@ async fn run_listen(config: &Config, borrowers: Vec<Address>) -> Result<()> {

/// Drain a `JoinSet` of listener tasks and surface panics / errors per chain.
/// Returns when every listener has exited so the caller can shut down.
async fn supervise(
listeners: &mut tokio::task::JoinSet<(String, Result<()>)>,
) {
async fn supervise(listeners: &mut tokio::task::JoinSet<(String, Result<()>)>) {
while let Some(joined) = listeners.join_next().await {
match joined {
Ok((name, Ok(()))) => {
Expand Down
52 changes: 51 additions & 1 deletion crates/charon-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,41 @@ pub struct BotConfig {
pub max_gas_wei: U256,
/// Polling interval for protocols that don't push events.
pub scan_interval_ms: u64,
/// Health factor at or below which a position becomes liquidatable,
/// in basis points of 1e18 (10_000 = 1.0). Integer bps over f64 so
/// the boundary has no ULP-level drift (1.05 as f64 truncates to
/// 1_049_999_999_999_999_872 in 1e18 scale and silently leaks
/// positions out of the NearLiquidation bucket).
#[serde(default = "default_liquidatable_threshold_bps")]
pub liquidatable_threshold_bps: u32,
/// Upper bound of the near-liquidation watch band, same bps space.
#[serde(default = "default_near_liq_threshold_bps")]
pub near_liq_threshold_bps: u32,
/// HOT (Liquidatable) bucket scan cadence, in blocks. Default 1.
#[serde(default = "default_hot_scan_blocks")]
pub hot_scan_blocks: u64,
/// WARM (NearLiquidation) bucket scan cadence. Default every 10 blocks.
#[serde(default = "default_warm_scan_blocks")]
pub warm_scan_blocks: u64,
/// COLD (Healthy) bucket scan cadence. Default every 100 blocks.
#[serde(default = "default_cold_scan_blocks")]
pub cold_scan_blocks: u64,
}

fn default_liquidatable_threshold_bps() -> u32 {
10_000 // 1.0000
}
fn default_near_liq_threshold_bps() -> u32 {
10_500 // 1.0500
}
fn default_hot_scan_blocks() -> u64 {
1
}
fn default_warm_scan_blocks() -> u64 {
10
}
fn default_cold_scan_blocks() -> u64 {
100
}

/// RPC endpoints for a single chain. **The URLs typically embed API keys;
Expand Down Expand Up @@ -171,11 +206,26 @@ impl Config {
Ok(config)
}

/// Cross-reference chain keys, reject sentinel zero addresses.
/// Cross-reference chain keys, reject sentinel zero addresses, and
/// sanity-check scanner bucket thresholds + cadence.
fn validate(&self) -> Result<()> {
if self.chain.is_empty() {
return Err(ConfigError::Validation("no [chain.*] entries".into()));
}
if self.bot.near_liq_threshold_bps <= self.bot.liquidatable_threshold_bps {
return Err(ConfigError::Validation(format!(
"near_liq_threshold_bps ({}) must be > liquidatable_threshold_bps ({})",
self.bot.near_liq_threshold_bps, self.bot.liquidatable_threshold_bps
)));
}
if self.bot.hot_scan_blocks == 0
|| self.bot.warm_scan_blocks == 0
|| self.bot.cold_scan_blocks == 0
{
return Err(ConfigError::Validation(
"hot/warm/cold_scan_blocks must all be > 0".into(),
));
}
for (name, p) in &self.protocol {
if !self.chain.contains_key(&p.chain) {
return Err(ConfigError::Validation(format!(
Expand Down
1 change: 1 addition & 0 deletions crates/charon-scanner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-trait = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
futures-util = { workspace = true }
dashmap = { workspace = true }
rand = { workspace = true }
metrics = { workspace = true }

Expand Down
4 changes: 4 additions & 0 deletions crates/charon-scanner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

pub mod listener;
pub mod provider;
pub mod scanner;

pub use listener::{BlockListener, ChainEvent};
pub use provider::{ChainProvider, ChainProviderT, MockChainProvider};
pub use scanner::{
BucketCounts, BucketedPosition, HealthScanner, PositionBucket, ScanScheduler,
};
Loading