Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions crates/sim/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ impl SimCache {

/// Get an iterator over the best items in the cache.
pub fn read_best(&self, n: usize) -> Vec<(u128, SimItem)> {
self.inner.read().items.iter().rev().take(n).map(|(k, item)| (*k, item.clone())).collect()
self.inner
.read()
.items
.iter()
.rev()
.take(n)
.map(|(cache_rank, item)| (*cache_rank, item.clone()))
.collect()
}

/// Get the number of items in the cache.
Expand All @@ -56,30 +63,30 @@ impl SimCache {
}

/// Get an item by key.
pub fn get(&self, key: u128) -> Option<SimItem> {
self.inner.read().items.get(&key).cloned()
pub fn get(&self, cache_rank: u128) -> Option<SimItem> {
self.inner.read().items.get(&cache_rank).cloned()
}

/// Remove an item by key.
pub fn remove(&self, key: u128) -> Option<SimItem> {
pub fn remove(&self, cache_rank: u128) -> Option<SimItem> {
let mut inner = self.inner.write();
if let Some(item) = inner.items.remove(&key) {
if let Some(item) = inner.items.remove(&cache_rank) {
inner.seen.remove(item.identifier().as_bytes());
Some(item)
} else {
None
}
}

fn add_inner(inner: &mut CacheInner, mut score: u128, item: SimItem, capacity: usize) {
fn add_inner(inner: &mut CacheInner, mut cache_rank: u128, item: SimItem, capacity: usize) {
// Check if we've already seen this item - if so, don't add it
if !inner.seen.insert(item.identifier_owned()) {
return;
}

// If it has the same score, we decrement (prioritizing earlier items)
while inner.items.contains_key(&score) && score != 0 {
score = score.saturating_sub(1);
// If it has the same cache_rank, we decrement (prioritizing earlier items)
while inner.items.contains_key(&cache_rank) && cache_rank != 0 {
cache_rank = cache_rank.saturating_sub(1);
}

if inner.items.len() >= capacity {
Expand All @@ -89,7 +96,7 @@ impl SimCache {
}
}

inner.items.insert(score, item.clone());
inner.items.insert(cache_rank, item.clone());
}

/// Add a bundle to the cache.
Expand All @@ -100,10 +107,10 @@ impl SimCache {
}

let item = SimItem::try_from(bundle)?;
let score = item.calculate_total_fee(basefee);
let cache_rank = item.calculate_total_fee(basefee);

let mut inner = self.inner.write();
Self::add_inner(&mut inner, score, item, self.capacity);
Self::add_inner(&mut inner, cache_rank, item, self.capacity);

Ok(())
}
Expand All @@ -124,8 +131,8 @@ impl SimCache {
// Skip invalid bundles
continue;
};
let score = item.calculate_total_fee(basefee);
Self::add_inner(&mut inner, score, item, self.capacity);
let cache_rank = item.calculate_total_fee(basefee);
Self::add_inner(&mut inner, cache_rank, item, self.capacity);
}

Ok(())
Expand All @@ -134,10 +141,10 @@ impl SimCache {
/// Add a transaction to the cache.
pub fn add_tx(&self, tx: TxEnvelope, basefee: u64) {
let item = SimItem::from(tx);
let score = item.calculate_total_fee(basefee);
let cache_rank = item.calculate_total_fee(basefee);

let mut inner = self.inner.write();
Self::add_inner(&mut inner, score, item, self.capacity);
Self::add_inner(&mut inner, cache_rank, item, self.capacity);
}

/// Add an iterator of transactions to the cache. This locks the cache only once
Expand All @@ -149,8 +156,8 @@ impl SimCache {

for item in item.into_iter() {
let item = SimItem::from(item);
let score = item.calculate_total_fee(basefee);
Self::add_inner(&mut inner, score, item, self.capacity);
let cache_rank = item.calculate_total_fee(basefee);
Self::add_inner(&mut inner, cache_rank, item, self.capacity);
}
}

Expand Down Expand Up @@ -196,7 +203,9 @@ impl SimCache {

/// Internal cache data, meant to be protected by a lock.
struct CacheInner {
/// Key is the cache_rank, unique ID within the cache && the item's order in the cache. Value is [`SimItem`] itself.
items: BTreeMap<u128, SimItem>,
/// Key is the unique identifier for the [`SimItem`] - the UUID for bundles, tx hash for transactions.
seen: HashSet<SimIdentifier<'static>>,
}

Expand Down
46 changes: 24 additions & 22 deletions crates/sim/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ where
let outcome = best.as_ref()?;

// Remove the item from the cache.
let item = self.sim_items.remove(outcome.identifier)?;
let item = self.sim_items.remove(outcome.cache_rank)?;
// Accept the cache from the simulation.
Arc::get_mut(&mut self.inner)
.expect("sims dropped already")
Expand Down Expand Up @@ -264,10 +264,10 @@ where
///
/// This function runs the simulation in a separate thread and waits for
/// the result or the deadline to expire.
#[instrument(skip_all, fields(identifier, tx_hash = %transaction.hash()))]
#[instrument(skip_all, fields(cache_rank, tx_hash = %transaction.hash()))]
fn simulate_tx(
&self,
identifier: u128,
cache_rank: u128,
transaction: &TxEnvelope,
) -> Result<SimOutcomeWithCache, SignetEthBundleError<SimDb<Db>>> {
let trevm = self.create_with_block(&self.cfg, &self.block).unwrap();
Expand Down Expand Up @@ -302,28 +302,29 @@ where
let score = beneficiary_balance.saturating_sub(initial_beneficiary_balance);

trace!(
?identifier,
?cache_rank,
tx_hash = %transaction.hash(),
gas_used = gas_used,
score = %score,
reverted = !success,
halted,
halt_reason = ?if halted { halt_reason } else { None },
revert_reason = if !success { reason } else { None },
"Simulation complete"
"Transaction simulation complete"
);

// Create the outcome
Ok(SimOutcomeWithCache { identifier, score, cache, gas_used })
Ok(SimOutcomeWithCache { cache_rank, score, cache, gas_used })
}
Err(e) => Err(SignetEthBundleError::from(e.into_error())),
}
}

/// Simulates a bundle on the current environment.
#[instrument(skip_all, fields(identifier, uuid = bundle.replacement_uuid()))]
#[instrument(skip_all, fields(cache_rank, uuid = bundle.replacement_uuid()))]
fn simulate_bundle(
&self,
identifier: u128,
cache_rank: u128,
bundle: &SignetEthBundle,
) -> Result<SimOutcomeWithCache, SignetEthBundleError<SimDb<Db>>>
where
Expand All @@ -344,24 +345,25 @@ where
let cache = trevm.into_db().into_cache();

trace!(
?identifier,
?cache_rank,
uuid = %bundle.replacement_uuid().expect("Bundle must have a replacement UUID"),
gas_used = gas_used,
score = %score,
"Bundle simulation successful"
);

Ok(SimOutcomeWithCache { identifier, score, cache, gas_used })
Ok(SimOutcomeWithCache { cache_rank, score, cache, gas_used })
}

/// Simulates a transaction or bundle in the context of a block.
fn simulate(
&self,
identifier: u128,
cache_rank: u128,
item: &SimItem,
) -> Result<SimOutcomeWithCache, SignetEthBundleError<SimDb<Db>>> {
match item {
SimItem::Bundle(bundle) => self.simulate_bundle(identifier, bundle),
SimItem::Tx(tx) => self.simulate_tx(identifier, tx),
SimItem::Bundle(bundle) => self.simulate_bundle(cache_rank, bundle),
SimItem::Tx(tx) => self.simulate_tx(cache_rank, tx),
}
}

Expand All @@ -386,31 +388,31 @@ where

std::thread::scope(move |scope| {
// Spawn a thread per bundle to simulate.
for (identifier, item) in active_sim.into_iter() {
for (cache_rank, item) in active_sim.into_iter() {
let c = candidates.clone();

scope.spawn(move || {
let _ig = trace_span!(parent: outer_ref, "sim_task", identifier = %identifier)
.entered();
let identifier = item.identifier();
let _ig = trace_span!(parent: outer_ref, "sim_task", %identifier).entered();

// If simulation is succesful, send the outcome via the
// channel.
match this_ref.simulate(identifier, &item) {
match this_ref.simulate(cache_rank, &item) {
Ok(candidate) => {
if candidate.gas_used <= max_gas {
// shortcut return on success
let _ = c.blocking_send(candidate);
return;
}
trace!(gas_used = candidate.gas_used, max_gas, "Gas limit exceeded");
trace!(gas_used = candidate.gas_used, max_gas, %identifier, "Gas limit exceeded");
}
Err(e) => {
trace!(?identifier, %e, "Simulation failed");
}
};
// fall through applies to all errors, occurs if
// the simulation fails or the gas limit is exceeded.
this_ref.sim_items.remove(identifier);
this_ref.sim_items.remove(cache_rank);
});
}
// Drop the TX so that the channel is closed when all threads
Expand All @@ -422,15 +424,15 @@ where
// Update the best score and send it to the channel.
let _ = best_tx.send_if_modified(|current| {
let best_score = current.as_ref().map(|c| c.score).unwrap_or_default();
let current_id = current.as_ref().map(|c| c.identifier);
let current_cache_rank = current.as_ref().map(|c| c.cache_rank);

let changed = candidate.score > best_score;
if changed {
trace!(
old_best = ?best_score,
old_identifier = current_id,
old_cache_rank = current_cache_rank,
new_best = %candidate.score,
identifier = candidate.identifier,
new_cache_rank = candidate.cache_rank,
"Found better candidate"
);
*current = Some(candidate);
Expand Down
4 changes: 2 additions & 2 deletions crates/sim/src/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::SimItem;
/// state changes.
#[derive(Debug, Clone)]
pub struct SimOutcomeWithCache {
/// The transaction or bundle that was simulated, as in the cache.
pub identifier: u128,
/// The key for the item in the [`SimCache`].
pub cache_rank: u128,

/// The score of the simulation, a [`U256`] value that represents the
/// increase in the beneficiary's balance.
Expand Down