Skip to content

Commit

Permalink
Exclude delayed receipts from pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
mooori committed Apr 17, 2024
1 parent b42ecbd commit f24ed44
Showing 1 changed file with 22 additions and 55 deletions.
77 changes: 22 additions & 55 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1483,36 +1483,10 @@ impl Runtime {
None
};

// Retrieve delayed receipts to initiate prefetching during local receipt processing.
// TODO add time to delayed receipt processing metric.
let mut delayed_receipts_keys = Vec::new();
let mut delayed_receipts = Vec::new();
let mut current_delayed_receipt_index = delayed_receipts_indices.first_index;
let skip_delayed_receipts = total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit);
while current_delayed_receipt_index < delayed_receipts_indices.next_available_index {
if skip_delayed_receipts {
break;
}

let key = TrieKey::DelayedReceipt { index: delayed_receipts_indices.first_index };
let receipt: Receipt = get(&state_update, &key)?.ok_or_else(|| {
StorageError::StorageInconsistentState(format!(
"Delayed receipt #{} should be in the state",
delayed_receipts_indices.first_index
))
})?;

delayed_receipts_keys.push(key);
delayed_receipts.push(receipt);
current_delayed_receipt_index += 1;
}

// We first process local receipts. They contain staking, local contract calls, etc.
// Prefetching for local receipts was invoked above.
let local_processing_start = std::time::Instant::now();
for (i, receipt) in local_receipts.iter().enumerate() {
for receipt in local_receipts.iter() {
if total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit)
Expand All @@ -1524,12 +1498,6 @@ impl Runtime {
// the `verify_and_charge_transaction`.
process_receipt(receipt, &mut state_update, &mut total)?;
}

// Chances are delayed receipts will be processed, so start prefetching.
// Invoke prefetcher possibly multiple times to fill its work queue.
if local_receipts.len() - i <= look_ahead_len {
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}
}
metrics.local_receipts_done(
local_receipts.len() as u64,
Expand All @@ -1542,16 +1510,23 @@ impl Runtime {
// TODO refactor to avoid cloning and `receipt`
let delayed_processing_start = std::time::Instant::now();
let mut delayed_receipt_count = 0;
for (i, (key, receipt)) in
delayed_receipts_keys.into_iter().zip(delayed_receipts.iter()).enumerate()
{
while delayed_receipts_indices.first_index < delayed_receipts_indices.next_available_index {
if total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit)
{
break;
}
delayed_receipt_count += 1;
let key = TrieKey::DelayedReceipt { index: delayed_receipts_indices.first_index };
let receipt: Receipt = get(&state_update, &key)?.ok_or_else(|| {
StorageError::StorageInconsistentState(format!(
"Delayed receipt #{} should be in the state",
delayed_receipts_indices.first_index
))
})?;

prefetch_manager.prefetch_receipts(std::slice::from_ref(&receipt));

// Validating the delayed receipt. If it fails, it's likely the state is inconsistent.
validate_receipt(
Expand All @@ -1569,13 +1544,12 @@ impl Runtime {
state_update.remove(key.clone());
// Math checked above: first_index is less than next_available_index
delayed_receipts_indices.first_index += 1;
process_receipt(receipt, &mut state_update, &mut total)?;
process_receipt(&receipt, &mut state_update, &mut total)?;
processed_delayed_receipts.push(receipt.clone());

if i % look_ahead_len == 0 {
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}
if delayed_receipts.len() - i == 1 {
if delayed_receipts_indices.next_available_index - delayed_receipts_indices.first_index
== 1
{
prefetch_manager.prefetch_next_incoming_receipts(incoming_receipts);
}
}
Expand Down Expand Up @@ -1877,7 +1851,6 @@ impl TotalResourceGuard {
struct PrefetchManager {
prefetcher: Option<TriePrefetcher>,
look_ahead_len: usize,
next_delayed_receipt_index: usize,
next_local_receipt_index: usize,
next_incoming_receipt_index: usize,
}
Expand All @@ -1888,7 +1861,6 @@ impl PrefetchManager {
Self {
prefetcher,
look_ahead_len,
next_delayed_receipt_index: 0,
next_local_receipt_index: 0,
next_incoming_receipt_index: 0,
}
Expand All @@ -1905,25 +1877,20 @@ impl PrefetchManager {
}
}

fn prefetch_next_local_receipts(&mut self, receipts: &[Receipt]) {
fn prefetch_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end =
std::cmp::min(self.next_local_receipt_index + self.look_ahead_len, receipts.len());
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_local_receipt_index..end]);
self.next_local_receipt_index = end;
_ = prefetcher.prefetch_receipts_data(receipts);
}
}

fn prefetch_next_delayed_receipts(&mut self, receipts: &[Receipt]) {
fn prefetch_next_local_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end = std::cmp::min(
self.next_delayed_receipt_index + self.look_ahead_len,
receipts.len(),
);
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_delayed_receipt_index..end]);
self.next_delayed_receipt_index = end;
let end =
std::cmp::min(self.next_local_receipt_index + self.look_ahead_len, receipts.len());
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_local_receipt_index..end]);
self.next_local_receipt_index = end;
}
}

Expand Down

0 comments on commit f24ed44

Please sign in to comment.