Skip to content

Commit

Permalink
Prefetch earlier in apply and pipeline it
Browse files Browse the repository at this point in the history
  • Loading branch information
mooori committed Apr 16, 2024
1 parent 22ca572 commit b42ecbd
Showing 1 changed file with 139 additions and 33 deletions.
172 changes: 139 additions & 33 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,9 @@ impl Runtime {
// future refactoring won’t break the condition.
assert!(cfg!(feature = "sandbox") || state_patch.is_empty());
let protocol_version = apply_state.current_protocol_version;
let mut prefetcher = TriePrefetcher::new_if_enabled(&trie);
let look_ahead_len = 8; // TODO = NUM_IO_THREADS from prefetching_trie_storage.rs
let mut prefetch_manager =
PrefetchManager::new(TriePrefetcher::new_if_enabled(&trie), look_ahead_len);
let mut state_update = TrieUpdate::new(trie);
let mut total = TotalResourceGuard {
span: tracing::Span::current(),
Expand All @@ -1316,10 +1318,7 @@ impl Runtime {
compute: 0,
};

if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_transactions_data(transactions);
}
prefetch_manager.prefetch_all_transacitons(transactions);

let mut stats = ApplyStats::default();

Expand Down Expand Up @@ -1389,6 +1388,8 @@ impl Runtime {
)?;
if receipt.receiver_id == signed_transaction.transaction.signer_id {
local_receipts.push(receipt);
// Prefetch eagerly assuming all local receipts will be processed.
prefetch_manager.prefetch_next_local_receipts(&local_receipts);
} else {
outgoing_receipts.push(receipt);
}
Expand Down Expand Up @@ -1482,23 +1483,53 @@ impl Runtime {
None
};

// Retrieve delayed receipts to initiate prefetching during local receipt processing.
// TODO add time to delayed receipt processing metric.
let mut delayed_receipts_keys = Vec::new();
let mut delayed_receipts = Vec::new();
let mut current_delayed_receipt_index = delayed_receipts_indices.first_index;
let skip_delayed_receipts = total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit);
while current_delayed_receipt_index < delayed_receipts_indices.next_available_index {
if skip_delayed_receipts {
break;
}

let key = TrieKey::DelayedReceipt { index: delayed_receipts_indices.first_index };
let receipt: Receipt = get(&state_update, &key)?.ok_or_else(|| {
StorageError::StorageInconsistentState(format!(
"Delayed receipt #{} should be in the state",
delayed_receipts_indices.first_index
))
})?;

delayed_receipts_keys.push(key);
delayed_receipts.push(receipt);
current_delayed_receipt_index += 1;
}

// We first process local receipts. They contain staking, local contract calls, etc.
// Prefetching for local receipts was invoked above.
let local_processing_start = std::time::Instant::now();
if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&local_receipts);
}
for receipt in local_receipts.iter() {
for (i, receipt) in local_receipts.iter().enumerate() {
if total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit)
{
// No need to prefetch anything beyond that, right?
set_delayed_receipt(&mut state_update, &mut delayed_receipts_indices, receipt);
} else {
// NOTE: We don't need to validate the local receipt, because it's just validated in
// the `verify_and_charge_transaction`.
process_receipt(receipt, &mut state_update, &mut total)?;
}

// Chances are delayed receipts will be processed, so start prefetching.
// Invoke prefetcher possibly multiple times to fill its work queue.
if local_receipts.len() - i <= look_ahead_len {
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}
}
metrics.local_receipts_done(
local_receipts.len() as u64,
Expand All @@ -1508,28 +1539,19 @@ impl Runtime {
);

// Then we process the delayed receipts. It's a backlog of receipts from the past blocks.
// TODO refactor to avoid cloning and `receipt`
let delayed_processing_start = std::time::Instant::now();
let mut delayed_receipt_count = 0;
while delayed_receipts_indices.first_index < delayed_receipts_indices.next_available_index {
for (i, (key, receipt)) in
delayed_receipts_keys.into_iter().zip(delayed_receipts.iter()).enumerate()
{
if total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit)
{
break;
}
delayed_receipt_count += 1;
let key = TrieKey::DelayedReceipt { index: delayed_receipts_indices.first_index };
let receipt: Receipt = get(&state_update, &key)?.ok_or_else(|| {
StorageError::StorageInconsistentState(format!(
"Delayed receipt #{} should be in the state",
delayed_receipts_indices.first_index
))
})?;

if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(std::slice::from_ref(&receipt));
}

// Validating the delayed receipt. If it fails, it's likely the state is inconsistent.
validate_receipt(
Expand All @@ -1544,11 +1566,18 @@ impl Runtime {
))
})?;

state_update.remove(key);
state_update.remove(key.clone());
// Math checked above: first_index is less than next_available_index
delayed_receipts_indices.first_index += 1;
process_receipt(&receipt, &mut state_update, &mut total)?;
processed_delayed_receipts.push(receipt);
process_receipt(receipt, &mut state_update, &mut total)?;
processed_delayed_receipts.push(receipt.clone());

if i % look_ahead_len == 0 {
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}
if delayed_receipts.len() - i == 1 {
prefetch_manager.prefetch_next_incoming_receipts(incoming_receipts);
}
}
metrics.delayed_receipts_done(
delayed_receipt_count,
Expand All @@ -1559,11 +1588,7 @@ impl Runtime {

// And then we process the new incoming receipts. These are receipts from other shards.
let incoming_processing_start = std::time::Instant::now();
if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&incoming_receipts);
}
for receipt in incoming_receipts.iter() {
for (i, receipt) in incoming_receipts.iter().enumerate() {
// Validating new incoming no matter whether we have available gas or not. We don't
// want to store invalid receipts in state as delayed.
validate_receipt(
Expand All @@ -1580,6 +1605,11 @@ impl Runtime {
} else {
process_receipt(receipt, &mut state_update, &mut total)?;
}

// Prefetch more regardless of whether the current receipt was processed?
if i % look_ahead_len == 0 {
prefetch_manager.prefetch_next_incoming_receipts(&incoming_receipts);
}
}
metrics.incoming_receipts_done(
incoming_receipts.len() as u64,
Expand Down Expand Up @@ -1694,7 +1724,7 @@ impl Runtime {
state_update.trie.recorded_storage_size_upper_bound() as f64;
metrics::CHUNK_RECORDED_SIZE_UPPER_BOUND.observe(chunk_recorded_size_upper_bound);
let (trie, trie_changes, state_changes) = state_update.finalize()?;
if let Some(prefetcher) = &prefetcher {
if prefetch_manager.prefetching_enabled() {
// Only clear the prefetcher queue after finalize is done because as part of receipt
// processing we also prefetch account data and access keys that are accessed in
// finalize. This data can take a very long time otherwise if not prefetched.
Expand All @@ -1705,7 +1735,7 @@ impl Runtime {
// In the future it may make sense to have prefetcher have a mode where it has two
// queues: one for data that is going to be required soon, and the other that it would
// only work when otherwise idle.
let discarded_prefetch_requests = prefetcher.clear();
let discarded_prefetch_requests = prefetch_manager.clear_prefetcher().unwrap_or(0);
tracing::debug!(target: "runtime", discarded_prefetch_requests);
}

Expand Down Expand Up @@ -1844,6 +1874,82 @@ impl TotalResourceGuard {
}
}

struct PrefetchManager {
prefetcher: Option<TriePrefetcher>,
look_ahead_len: usize,
next_delayed_receipt_index: usize,
next_local_receipt_index: usize,
next_incoming_receipt_index: usize,
}

// TODO remove `look_ahead_len` here, instead just prefetch one next receipt. less edge cases.
impl PrefetchManager {
fn new(prefetcher: Option<TriePrefetcher>, look_ahead_len: usize) -> Self {
Self {
prefetcher,
look_ahead_len,
next_delayed_receipt_index: 0,
next_local_receipt_index: 0,
next_incoming_receipt_index: 0,
}
}

fn prefetching_enabled(&self) -> bool {
self.prefetcher.is_some()
}

fn prefetch_all_transacitons(&mut self, transactions: &[SignedTransaction]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_transactions_data(transactions);
}
}

fn prefetch_next_local_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end =
std::cmp::min(self.next_local_receipt_index + self.look_ahead_len, receipts.len());
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_local_receipt_index..end]);
self.next_local_receipt_index = end;
}
}

fn prefetch_next_delayed_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end = std::cmp::min(
self.next_delayed_receipt_index + self.look_ahead_len,
receipts.len(),
);
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_delayed_receipt_index..end]);
self.next_delayed_receipt_index = end;
}
}

fn prefetch_next_incoming_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end = std::cmp::min(
self.next_incoming_receipt_index + self.look_ahead_len,
receipts.len(),
);
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_incoming_receipt_index..end]);
self.next_incoming_receipt_index = end;
}
}

fn clear_prefetcher(&mut self) -> Option<usize> {
match &mut self.prefetcher {
Some(prefetcher) => {
let discarded_prefetch_requests = prefetcher.clear();
Some(discarded_prefetch_requests)
}
None => None,
}
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down

0 comments on commit b42ecbd

Please sign in to comment.