diff --git a/config/default.toml b/config/default.toml index a877c9e..c3aded2 100644 --- a/config/default.toml +++ b/config/default.toml @@ -28,9 +28,11 @@ signer_key = "${CHARON_SIGNER_KEY:-}" # ── Chains ──────────────────────────────────────────────────────────────── [chain.bnb] -chain_id = 56 -ws_url = "${BNB_WS_URL}" -http_url = "${BNB_HTTP_URL}" +chain_id = 56 +ws_url = "${BNB_WS_URL}" +http_url = "${BNB_HTTP_URL}" +# Validator-friendly tip on BSC; raise during congestion. +priority_fee_gwei = 1 # ── Lending protocols ───────────────────────────────────────────────────── [protocol.venus] diff --git a/crates/charon-core/src/config.rs b/crates/charon-core/src/config.rs index 55159c2..08301c4 100644 --- a/crates/charon-core/src/config.rs +++ b/crates/charon-core/src/config.rs @@ -186,12 +186,25 @@ fn default_cold_scan_blocks() -> u64 { /// RPC endpoints for a single chain. **The URLs typically embed API keys; /// `Debug` prints `` rather than the URL.** +/// +/// `priority_fee_gwei` is the EIP-1559 priority fee (tip) the gas +/// oracle attaches per chain, expressed in gwei for operator +/// readability. Defaults to `1` — BSC validators are fine with a +/// 1 gwei tip; L2s running sub-gwei tips should override explicitly. +/// The oracle converts to wei internally; mixing units silently +/// under-filters at 1e9×. #[derive(Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct ChainConfig { pub chain_id: u64, pub ws_url: String, pub http_url: String, + #[serde(default = "default_priority_fee_gwei")] + pub priority_fee_gwei: u64, +} + +fn default_priority_fee_gwei() -> u64 { + 1 } impl fmt::Debug for ChainConfig { @@ -200,6 +213,7 @@ impl fmt::Debug for ChainConfig { .field("chain_id", &self.chain_id) .field("ws_url", &"") .field("http_url", &"") + .field("priority_fee_gwei", &self.priority_fee_gwei) .finish() } } diff --git a/crates/charon-executor/src/gas.rs b/crates/charon-executor/src/gas.rs new file mode 100644 index 0000000..3557605 --- /dev/null +++ b/crates/charon-executor/src/gas.rs @@ -0,0 +1,423 @@ +//! EIP-1559 gas oracle. +//! +//! Three responsibilities: +//! +//! 1. **Live fee snapshot** — read `baseFeePerGas` from the latest +//! block header and build an EIP-1559 fee pair using the canonical +//! `max_fee = 2 * base_fee + priority_fee` headroom formula. When +//! the header has no `baseFeePerGas` (pre-EIP-1559 chain or a +//! flaky RPC that drops the field), fall back to +//! `eth_gasPrice` so the oracle still returns a usable quote. +//! 2. **Ceiling enforcement** — refuse to emit gas params if the +//! proposed `maxFeePerGas` exceeds `bot.max_gas_wei`. Returned as +//! a typed [`GasDecision::SkipCeilingExceeded`] variant so callers +//! can branch without pattern-matching on `Option`. +//! 3. **Cost estimation in USD cents** — converts `gas_units × maxFee` +//! (wei) into integer USD cents using a Chainlink price reading +//! for the chain's native asset (BNB on BSC). The result feeds +//! [`charon_core::ProfitInputs`]. +//! +//! ### Unit conventions +//! +//! Internally every fee is kept in **wei**. The ceiling +//! (`bot.max_gas_wei`) is already wei and is stored as `U256` so +//! sub-gwei tips and absurdly-high-priced chains remain representable +//! without truncation. The per-chain `priority_fee_gwei` is the only +//! gwei-denominated input; it is scaled to wei on construction and +//! never compared against `U256` values in the raw gwei domain. + +use std::sync::Mutex; + +use alloy::eips::BlockNumberOrTag; +use alloy::primitives::U256; +use alloy::providers::Provider; +use alloy::rpc::types::TransactionRequest; +use alloy::transports::TransportError; +use tracing::{debug, warn}; + +/// 1 gwei expressed in wei. +const ONE_GWEI: u128 = 1_000_000_000; +/// EIP-1559 canonical headroom multiplier on the base fee. The +/// protocol allows the base fee to roughly double between two +/// consecutive blocks under max congestion, so anything lower risks +/// the tx getting booted out of the mempool between sim and broadcast. +const BASE_FEE_HEADROOM_MULT: u128 = 2; +/// Multiplicative gas-estimate safety buffer (`estimate * 12 / 10`). +/// Covers state drift between estimate time and inclusion time. +const GAS_ESTIMATE_BUFFER_NUM: u64 = 12; +const GAS_ESTIMATE_BUFFER_DEN: u64 = 10; +/// Chainlink price-feed decimals. All BNB-Chain USD aggregators we +/// consume publish 8-decimal answers; asserted at oracle setup time. +pub const CHAINLINK_DECIMALS: u8 = 8; + +/// Errors the gas oracle can surface. Callers unwrap the typed +/// variant and decide whether to retry (transport), skip +/// (`MissingBaseFee` after fallback), or abort (`Overflow`). +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum GasError { + /// `baseFeePerGas` absent *and* the `eth_gasPrice` fallback also + /// failed. On a healthy EIP-1559 chain we never reach this. + #[error("baseFeePerGas absent from block header and eth_gasPrice fallback unavailable")] + MissingBaseFee, + /// Provider / transport failure (DNS, timeout, 5xx, JSON-RPC + /// error). Retryable from the caller's perspective. + #[error("provider error: {0}")] + Provider(#[from] TransportError), + /// u128 / U256 arithmetic would overflow. Indicates an absurdly + /// expensive chain (fee × buffer > 2^128 wei) or a buggy feed; + /// treated as fatal. + #[error("arithmetic overflow in gas calculation")] + Overflow, +} + +/// Resolved EIP-1559 gas parameters for one transaction. Always in +/// wei. Convert to gwei at the log / config boundary only. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct GasParams { + pub max_fee_per_gas: u128, + pub max_priority_fee_per_gas: u128, +} + +/// Outcome of a [`GasOracle::fetch_params`] call. Typed enum — no +/// `Option` / `Ok(None)` ambiguity. Callers pattern-match on both +/// variants so adding a new skip reason later shows up as a compile +/// error. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum GasDecision { + /// Fee is below ceiling, safe to build the tx. + Proceed(GasParams), + /// `maxFeePerGas` would exceed `bot.max_gas_wei`. Caller drops + /// the opportunity and logs — both values are wei, mirroring the + /// config surface, so there is never a gwei/wei mix-up at the + /// log line. + SkipCeilingExceeded { max_fee_wei: U256, ceiling_wei: U256 }, +} + +/// Per-block cache entry so repeated `fetch_params(block_n)` calls +/// from the same tick don't spam the RPC. +#[derive(Debug, Clone, Copy)] +struct CacheEntry { + block: u64, + decision: GasDecision, +} + +/// Per-chain gas oracle. Construct once per chain at startup. +/// +/// The ceiling (`max_gas_wei`) comes from [`BotConfig::max_gas_wei`] +/// and is already wei; the priority fee (`priority_fee_gwei`) comes +/// from [`ChainConfig::priority_fee_gwei`] and is scaled to wei on +/// every `fetch_params` call. Keeping the two storage types distinct +/// preserves the config surface (ceiling is `U256` because a sub-gwei +/// L2 fee or a pathological spike is still representable exactly, +/// tips are `u64` gwei because no operator writes sub-gwei tips by +/// hand). +/// +/// [`BotConfig::max_gas_wei`]: charon_core::config::BotConfig::max_gas_wei +/// [`ChainConfig::priority_fee_gwei`]: charon_core::config::ChainConfig::priority_fee_gwei +#[derive(Debug)] +pub struct GasOracle { + /// Drop the tx if `max_fee_per_gas` exceeds this (wei). + max_gas_wei: U256, + /// EIP-1559 priority fee (gwei). Scaled to wei on every call. + priority_fee_gwei: u64, + /// Last `(block_number, decision)` observed. `Mutex>` + /// — contention here is negligible (one lookup per tx build, + /// microseconds), a lock is simpler than a `RwLock`. + cache: Mutex>, +} + +impl GasOracle { + /// Construct a new oracle. `max_gas_wei` is the hard fee ceiling + /// (already wei, sourced from [`BotConfig::max_gas_wei`]); + /// `priority_fee_gwei` is the per-chain tip (sourced from + /// [`ChainConfig::priority_fee_gwei`]) and gets converted to wei + /// inside `fetch_params`. + /// + /// [`BotConfig::max_gas_wei`]: charon_core::config::BotConfig::max_gas_wei + /// [`ChainConfig::priority_fee_gwei`]: charon_core::config::ChainConfig::priority_fee_gwei + pub fn new(max_gas_wei: U256, priority_fee_gwei: u64) -> Self { + Self { + max_gas_wei, + priority_fee_gwei, + cache: Mutex::new(None), + } + } + + /// Read the latest base fee, apply 2x EIP-1559 headroom, attach + /// the priority fee. + /// + /// Pass `current_block` when the caller already knows the block + /// number (e.g. inside a `newHeads` handler) to hit the + /// per-block cache. Pass `None` to force a fresh RPC read. + pub async fn fetch_params( + &self, + provider: &P, + current_block: Option, + ) -> Result + where + P: Provider, + T: alloy::transports::Transport + Clone, + { + // Cache hit: same block we already priced, return the cached + // decision. Using a scoped lock — never held across await. + // Written as nested `if let` rather than an `if let` chain so + // the MSRV stays at 1.85 (chains stabilised in 1.88). + if let Some(block_n) = current_block { + let cached = *self.cache.lock().expect("gas cache mutex poisoned"); + if let Some(entry) = cached { + if entry.block == block_n { + debug!(block = block_n, "gas params cache hit"); + return Ok(entry.decision); + } + } + } + + let block = provider + .get_block( + BlockNumberOrTag::Latest.into(), + alloy::rpc::types::BlockTransactionsKind::Hashes, + ) + .await? + .ok_or(GasError::MissingBaseFee)?; + + // Primary path: EIP-1559 header. Fallback: eth_gasPrice (used + // to bootstrap max_fee on chains that still occasionally omit + // the header field under load). + let base_fee: u128 = match block.header.base_fee_per_gas { + Some(b) => u128::from(b), + None => { + warn!( + "header has no baseFeePerGas — falling back to eth_gasPrice (pre-EIP-1559 path)" + ); + match provider.get_gas_price().await { + Ok(p) => p, + Err(e) => { + warn!(error = %e, "eth_gasPrice fallback failed"); + return Err(GasError::MissingBaseFee); + } + } + } + }; + + // Priority fee: gwei → wei, checked. + let priority_fee_wei = u128::from(self.priority_fee_gwei) + .checked_mul(ONE_GWEI) + .ok_or(GasError::Overflow)?; + + // Canonical EIP-1559 headroom: max_fee = 2 * base + priority. + let max_fee = base_fee + .checked_mul(BASE_FEE_HEADROOM_MULT) + .and_then(|v| v.checked_add(priority_fee_wei)) + .ok_or(GasError::Overflow)?; + + // Ceiling check is done in U256 since the ceiling is sourced + // as `U256` from `BotConfig::max_gas_wei`. Promoting the u128 + // `max_fee` into U256 is lossless. + let max_fee_u256 = U256::from(max_fee); + let decision = if max_fee_u256 > self.max_gas_wei { + warn!( + max_fee_wei = %max_fee_u256, + ceiling_wei = %self.max_gas_wei, + "gas exceeds configured ceiling — skipping tx" + ); + GasDecision::SkipCeilingExceeded { + max_fee_wei: max_fee_u256, + ceiling_wei: self.max_gas_wei, + } + } else { + let params = GasParams { + max_fee_per_gas: max_fee, + max_priority_fee_per_gas: priority_fee_wei, + }; + debug!( + base_fee_gwei = base_fee / ONE_GWEI, + max_fee_gwei = params.max_fee_per_gas / ONE_GWEI, + priority_fee_gwei = params.max_priority_fee_per_gas / ONE_GWEI, + "gas params resolved" + ); + GasDecision::Proceed(params) + }; + + if let Some(block_n) = current_block { + *self.cache.lock().expect("gas cache mutex poisoned") = Some(CacheEntry { + block: block_n, + decision, + }); + } + + Ok(decision) + } + + /// Run `eth_estimateGas` on `tx` and return the unit count with a + /// 20 % safety margin applied. Real-world gas spend drifts by + /// single-digit percent between estimate and inclusion; the + /// buffer keeps us from under-funding and landing an out-of-gas + /// revert on-chain. + pub async fn estimate_gas_units( + &self, + provider: &P, + tx: &TransactionRequest, + ) -> Result + where + P: Provider, + T: alloy::transports::Transport + Clone, + { + let raw = provider.estimate_gas(tx).await?; + Ok(raw.saturating_mul(GAS_ESTIMATE_BUFFER_NUM) / GAS_ESTIMATE_BUFFER_DEN) + } +} + +/// Convert `gas_units × max_fee_per_gas` (wei cost) into USD cents, +/// given a Chainlink reading for the chain's native asset. +/// +/// Formula: +/// ```text +/// cost_cents = gas_units * max_fee * native_price * 100 +/// / (10^native_decimals_18 * 10^chainlink_decimals) +/// ``` +/// +/// * `native_price` — raw aggregator answer. +/// * `native_decimals` — the feed's `decimals()`, typically +/// [`CHAINLINK_DECIMALS`] (8). +/// * The native unit is assumed 18-decimal (true on BNB / ETH / +/// MATIC / AVAX; callers on a non-18-decimal native must adjust). +pub fn gas_cost_usd_cents( + gas_units: u64, + max_fee_per_gas: u128, + native_price: U256, + native_decimals: u8, +) -> u64 { + let wei_cost = U256::from(gas_units).saturating_mul(U256::from(max_fee_per_gas)); + // wei / 1e18 = native-units; × price / 10^feed_decimals = USD; + // × 100 = cents. Combined divisor: 10^(18 + native_decimals - 2). + // Kept as a single division to keep rounding in one place. + let exponent = 18u32 + u32::from(native_decimals) - 2; + let divisor = U256::from(10u64).pow(U256::from(exponent)); + + let numerator = wei_cost.saturating_mul(native_price); + let cents = numerator / divisor; + u64::try_from(cents).unwrap_or(u64::MAX) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn gas_cost_for_one_gwei_at_bnb_632_dollar() { + // 200_000 gas × 1 gwei = 2e14 wei. + // BNB at $632.85 → Chainlink price 63284968915 (8 decimals). + // Expected cents: 2e14 * 6.3284968915e10 * 100 / (1e18 * 1e8) + // = 2e14 * 6.3284968915e10 / 1e24 + // ≈ 12.66 cents. + let cents = gas_cost_usd_cents(200_000, ONE_GWEI, U256::from(63_284_968_915u128), 8); + assert!((12..=14).contains(¢s), "got {cents} cents"); + } + + #[test] + fn gas_cost_zero_when_units_zero() { + let cents = gas_cost_usd_cents(0, ONE_GWEI, U256::from(1u64), 8); + assert_eq!(cents, 0); + } + + #[test] + fn gas_cost_saturates_on_huge_inputs() { + // 1e9 gas × 1e30 wei/gas = 1e39 wei — clearly absurd, must + // saturate to u64::MAX without panicking. + let cents = gas_cost_usd_cents( + 1_000_000_000, + 10u128.pow(30), + U256::from(63_284_968_915u128), + 8, + ); + assert_eq!(cents, u64::MAX); + } + + #[test] + fn priority_fee_wei_conversion_one_gwei() { + // Sanity: priority_fee_gwei = 1 must resolve to 1e9 wei on + // the GasParams boundary. + let oracle = GasOracle::new(U256::from(100u128) * U256::from(ONE_GWEI), 1); + // Direct field access via constructor — can't call fetch + // without a provider, so we reproduce the arithmetic path + // that fetch_params uses. + let wei = u128::from(oracle.priority_fee_gwei) + .checked_mul(ONE_GWEI) + .expect("priority fee conversion must not overflow in test"); + assert_eq!(wei, 1_000_000_000u128); + } + + #[test] + fn priority_fee_wei_conversion_five_gwei() { + let wei = 5u128 + .checked_mul(ONE_GWEI) + .expect("5 gwei must fit in u128"); + assert_eq!(wei, 5_000_000_000u128); + } + + #[test] + fn max_fee_uses_two_x_headroom() { + // base = 3 gwei, priority = 1 gwei → max_fee = 7 gwei. + let base_wei = 3u128.checked_mul(ONE_GWEI).expect("3 gwei fits"); + let priority_wei = ONE_GWEI; + let max_fee = base_wei + .checked_mul(BASE_FEE_HEADROOM_MULT) + .and_then(|v| v.checked_add(priority_wei)) + .expect("max_fee fits in u128"); + assert_eq!(max_fee, 7u128.checked_mul(ONE_GWEI).expect("7 gwei fits")); + } + + #[test] + fn gas_estimate_buffer_is_twenty_percent() { + let raw: u64 = 1_000_000; + let buffered = raw.saturating_mul(GAS_ESTIMATE_BUFFER_NUM) / GAS_ESTIMATE_BUFFER_DEN; + assert_eq!(buffered, 1_200_000); + } + + #[test] + fn ceiling_in_wei_rejects_over_limit() { + // Ceiling 10 gwei in wei = 1e10. A max_fee of 11 gwei must + // trip the ceiling. Sanity-checks the unit fix for #179. + let ceiling_wei = U256::from(10u128.checked_mul(ONE_GWEI).expect("10 gwei fits")); + let max_fee = U256::from(11u128.checked_mul(ONE_GWEI).expect("11 gwei fits")); + assert!(max_fee > ceiling_wei); + } + + // Live-network integration test (#191). Requires BNB_HTTP_URL; + // ignored by default so `cargo test` stays offline. + #[tokio::test] + #[ignore = "requires live BNB_HTTP_URL"] + async fn fetch_params_against_live_bsc() { + use alloy::providers::ProviderBuilder; + let url = std::env::var("BNB_HTTP_URL").expect("BNB_HTTP_URL not set"); + let provider = ProviderBuilder::new().on_http(url.parse().expect("valid http url")); + // 100 gwei ceiling expressed in wei. + let ceiling = U256::from(100u128) + .checked_mul(U256::from(ONE_GWEI)) + .expect("ceiling fits in U256"); + let oracle = GasOracle::new(ceiling, 1); + let decision = oracle + .fetch_params(&provider, None) + .await + .expect("fetch_params should succeed against live BSC"); + match decision { + GasDecision::Proceed(params) => { + let max_fee_gwei = params.max_fee_per_gas / ONE_GWEI; + let priority_gwei = params.max_priority_fee_per_gas / ONE_GWEI; + assert!( + (1..=100).contains(&max_fee_gwei), + "max_fee {max_fee_gwei} gwei outside sane BSC range" + ); + assert!( + priority_gwei >= 1, + "priority {priority_gwei} gwei below floor" + ); + } + GasDecision::SkipCeilingExceeded { .. } => { + panic!("ceiling of 100 gwei should not trip on BSC"); + } + } + } +} diff --git a/crates/charon-executor/src/lib.rs b/crates/charon-executor/src/lib.rs index 45d3b68..e9092d7 100644 --- a/crates/charon-executor/src/lib.rs +++ b/crates/charon-executor/src/lib.rs @@ -10,7 +10,13 @@ //! simulation gate before any broadcast can happen. pub mod builder; +pub mod gas; +pub mod nonce; pub mod simulation; pub use builder::{BuilderError, ICharonLiquidator, TxBuilder}; +pub use gas::{ + CHAINLINK_DECIMALS, GasDecision, GasError, GasOracle, GasParams, gas_cost_usd_cents, +}; +pub use nonce::{NonceError, NonceManager}; pub use simulation::{SimulationError, Simulator}; diff --git a/crates/charon-executor/src/nonce.rs b/crates/charon-executor/src/nonce.rs new file mode 100644 index 0000000..e2e2998 --- /dev/null +++ b/crates/charon-executor/src/nonce.rs @@ -0,0 +1,223 @@ +//! Concurrent nonce manager. +//! +//! One [`NonceManager`] per `(chain × signer)` pair. Holds the next +//! nonce in an `AtomicU64` so the tx builder can hand out sequential +//! values without `&mut` plumbing through the hot path. Optimistic by +//! default — the manager bumps locally on every issue, and the caller +//! re-syncs from chain after a failed broadcast / on startup. +//! +//! Why optimistic: `eth_getTransactionCount(pending)` is one +//! round-trip we don't want on every block. Bumping locally lets +//! multiple in-flight txs hold contiguous nonces; if one reverts and +//! a gap opens up, a `resync` paves it over. +//! +//! ### `pending` vs `latest` +//! +//! Both `init` and `resync` query the **pending** block tag, not +//! `latest`. Reason: if we've already broadcast N txs this block and +//! resync against `latest`, the mempool-side nonce we get back is +//! stale — it reflects the state *before* our pending bundle. Using +//! `pending` counts our own in-flight txs and avoids reusing a nonce +//! that's sitting in the mempool waiting for inclusion. +//! +//! ### High-water-mark guard +//! +//! `resync` is a rescue path, not a rollback. If the chain reports +//! nonce = 42 but we've already handed out 47 locally, snapping to 42 +//! would double-issue nonces 42–46 — every one of those txs would +//! revert with `nonce too low` once the in-flight ones land. The +//! high-water mark tracks the maximum value [`next`] ever handed out +//! and clamps `resync` to `max(on_chain, high_water + 1)`. + +use std::sync::atomic::{AtomicU64, Ordering}; + +use alloy::eips::{BlockId, BlockNumberOrTag}; +use alloy::primitives::Address; +use alloy::providers::Provider; +use alloy::transports::TransportError; +use tracing::{debug, info, warn}; + +/// Errors the nonce manager can surface. One variant for now — +/// `#[non_exhaustive]` keeps the door open for variants like +/// `NonceGap { on_chain, local }` without a breaking change later. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum NonceError { + #[error("provider error: {0}")] + Provider(#[from] TransportError), +} + +/// Tracks the next-to-use nonce for one signer on one chain. +#[derive(Debug)] +pub struct NonceManager { + signer: Address, + /// Next nonce to hand out. Bumped via `fetch_add`. + next: AtomicU64, + /// Max value `next()` ever returned + 1. Used by `resync` to + /// refuse going backwards past an already-issued nonce. + high_water: AtomicU64, +} + +impl NonceManager { + /// Build with an explicit starting value. Most callers should use + /// [`init`] instead, which pulls the on-chain nonce. + pub fn new(signer: Address, start: u64) -> Self { + Self { + signer, + next: AtomicU64::new(start), + high_water: AtomicU64::new(start), + } + } + + /// Async constructor: pulls `eth_getTransactionCount(pending)` + /// and stores it as the starting nonce. `pending` is mandatory + /// here — see module-level docs. + pub async fn init(provider: &P, signer: Address) -> Result + where + P: Provider, + T: alloy::transports::Transport + Clone, + { + let nonce = provider + .get_transaction_count(signer) + .block_id(BlockId::Number(BlockNumberOrTag::Pending)) + .await?; + info!(%signer, nonce, "nonce manager initialised (pending tag)"); + Ok(Self::new(signer, nonce)) + } + + /// Signer the manager tracks — handy for sanity assertions. + pub fn signer(&self) -> Address { + self.signer + } + + /// Peek without consuming. Useful for logging. + pub fn current(&self) -> u64 { + self.next.load(Ordering::Acquire) + } + + /// Highest nonce ever issued (or `start` if nothing has been + /// issued). Public for diagnostics; `resync` consults it + /// internally. + pub fn high_water(&self) -> u64 { + self.high_water.load(Ordering::Acquire) + } + + /// Atomically claim the next nonce and bump the counter. Two + /// concurrent calls always return distinct values. + pub fn next(&self) -> u64 { + let n = self.next.fetch_add(1, Ordering::AcqRel); + // Update high-water mark monotonically. Using fetch_max so + // racing threads collapse to the same final value without a + // lost-update window. + self.high_water.fetch_max(n + 1, Ordering::AcqRel); + debug!(signer = %self.signer, nonce = n, "nonce issued"); + n + } + + /// Re-fetch the on-chain nonce and adopt it — but never go + /// backwards past an already-issued value. + /// + /// Concretely: `next` is set to `max(on_chain, high_water)`. If + /// the chain lags our local view (common: our own pending txs + /// haven't landed yet), we keep the local nonce. If the chain + /// jumped ahead (cold start, manual transfer from the same key), + /// we adopt the chain value. + /// + /// Returns the value `next` was set to. + pub async fn resync(&self, provider: &P) -> Result + where + P: Provider, + T: alloy::transports::Transport + Clone, + { + let on_chain = provider + .get_transaction_count(self.signer) + .block_id(BlockId::Number(BlockNumberOrTag::Pending)) + .await?; + let hw = self.high_water.load(Ordering::Acquire); + let target = on_chain.max(hw); + + if on_chain < hw { + warn!( + signer = %self.signer, + on_chain, + high_water = hw, + "resync: chain lags local high-water, keeping local value" + ); + } + + self.next.store(target, Ordering::Release); + // Keep high_water in lockstep — if chain jumped ahead, our + // new baseline is the chain value. + self.high_water.fetch_max(target, Ordering::AcqRel); + info!( + signer = %self.signer, + nonce = target, + on_chain, + "nonce manager resynced (pending tag, hw-guarded)" + ); + Ok(target) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::address; + use std::sync::Arc; + + fn signer() -> Address { + address!("1111111111111111111111111111111111111111") + } + + #[test] + fn next_returns_sequential_values() { + let m = NonceManager::new(signer(), 7); + assert_eq!(m.current(), 7); + assert_eq!(m.next(), 7); + assert_eq!(m.next(), 8); + assert_eq!(m.next(), 9); + assert_eq!(m.current(), 10); + assert_eq!(m.high_water(), 10); + } + + #[test] + fn high_water_tracks_max_issued() { + let m = NonceManager::new(signer(), 0); + for _ in 0..5 { + m.next(); + } + assert_eq!(m.high_water(), 5); + } + + #[test] + fn concurrent_callers_get_distinct_nonces() { + // Hammer the atomic from 32 threads × 100 calls each. Every + // returned nonce must be unique and the final `current()` must + // equal start + 32 × 100. + const THREADS: usize = 32; + const PER: usize = 100; + let m = Arc::new(NonceManager::new(signer(), 0)); + + let mut handles = Vec::new(); + for _ in 0..THREADS { + let m = m.clone(); + handles.push(std::thread::spawn(move || { + let mut local = Vec::with_capacity(PER); + for _ in 0..PER { + local.push(m.next()); + } + local + })); + } + + let mut all = Vec::with_capacity(THREADS * PER); + for h in handles { + all.extend(h.join().unwrap()); + } + all.sort_unstable(); + all.dedup(); + assert_eq!(all.len(), THREADS * PER, "duplicate nonce issued"); + assert_eq!(m.current(), (THREADS * PER) as u64); + assert_eq!(m.high_water(), (THREADS * PER) as u64); + } +}