Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetch earlier in apply and pipeline it #11083

Closed
wants to merge 9 commits into from
233 changes: 200 additions & 33 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,9 @@ impl Runtime {
// future refactoring won’t break the condition.
assert!(cfg!(feature = "sandbox") || state_patch.is_empty());
let protocol_version = apply_state.current_protocol_version;
let mut prefetcher = TriePrefetcher::new_if_enabled(&trie);
let look_ahead_len = 8; // TODO = NUM_IO_THREADS from prefetching_trie_storage.rs
let mut prefetch_manager =
PrefetchManager::new(TriePrefetcher::new_if_enabled(&trie), look_ahead_len);
let mut state_update = TrieUpdate::new(trie);
let mut total = TotalResourceGuard {
span: tracing::Span::current(),
Expand All @@ -1318,10 +1320,7 @@ impl Runtime {
compute: 0,
};

if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_transactions_data(transactions);
}
prefetch_manager.prefetch_all_transactions(transactions);

let mut stats = ApplyStats::default();

Expand Down Expand Up @@ -1391,6 +1390,8 @@ impl Runtime {
)?;
if receipt.receiver_id == signed_transaction.transaction.signer_id {
local_receipts.push(receipt);
// Prefetch eagerly assuming all local receipts will be processed.
prefetch_manager.prefetch_next_local_receipts(&local_receipts);
} else {
outgoing_receipts.push(receipt);
}
Expand All @@ -1410,6 +1411,11 @@ impl Runtime {
}
metrics.tx_processing_done(total.gas, total.compute);

if transactions.is_empty() {
// Local receipt prefetching above wasn't triggered.
prefetch_manager.prefetch_next_local_receipts(&local_receipts);
}

let mut process_receipt = |receipt: &Receipt,
state_update: &mut TrieUpdate,
total: &mut TotalResourceGuard|
Expand Down Expand Up @@ -1489,24 +1495,73 @@ impl Runtime {
None
};

// We first process local receipts. They contain staking, local contract calls, etc.
let local_processing_start = std::time::Instant::now();
if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&local_receipts);
}
for receipt in local_receipts.iter() {
// Retrieve delayed receipts to initiate prefetching during local receipt processing.
// TODO add time to delayed receipt processing metric.
Comment on lines +1498 to +1499
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we do this at the beginning of apply?

Copy link
Contributor Author

@mooori mooori Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processing order is transactions -> local_receipts -> delayed_receipts where local receipts become available only as transactions are processed. The prefetcher's work queue is FIFO which implies prefetching should happen in the same order as processing. If delayed receipts were prefetched out of order (e.g. at the beginning), it could get in the way of prefetching for local receipts.

Making the prefetcher have a work queue with priorities was brought up, but it's a bigger change.

let mut delayed_receipts_keys = Vec::new();
let mut delayed_receipts = Vec::new();
let mut current_delayed_receipt_index = delayed_receipts_indices.first_index;
let mut load_delayed_receipts_batch = |total: &TotalResourceGuard,
state_update: &TrieUpdate,
delayed_receipts_indices: &DelayedReceiptIndices,
delayed_receipts: &mut Vec<Receipt>,
delayed_receipts_keys: &mut Vec<TrieKey>|
-> Result<(), RuntimeError> {
if total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
state_update.trie.recorded_storage_size_upper_bound() > limit
})
{
return Ok(());
}

let mut num_loaded = 0;
while num_loaded < look_ahead_len
&& current_delayed_receipt_index < delayed_receipts_indices.next_available_index
{
let key = TrieKey::DelayedReceipt { index: current_delayed_receipt_index };
let receipt: Receipt = get(state_update, &key)?.ok_or_else(|| {
StorageError::StorageInconsistentState(format!(
"Delayed receipt #{} should be in the state",
current_delayed_receipt_index
))
})?;

num_loaded += 1;
delayed_receipts_keys.push(key);
delayed_receipts.push(receipt);
current_delayed_receipt_index += 1;
}
Ok(())
};

// We first process local receipts. They contain staking, local contract calls, etc.
// Prefetching for local receipts was invoked above.
let local_processing_start = std::time::Instant::now();
for (i, receipt) in local_receipts.iter().enumerate() {
if total.compute >= compute_limit
|| proof_size_limit
.is_some_and(|limit| state_update.trie.recorded_storage_size() > limit)
{
// No need to prefetch anything beyond that, right?
set_delayed_receipt(&mut state_update, &mut delayed_receipts_indices, receipt);
} else {
// NOTE: We don't need to validate the local receipt, because it's just validated in
// the `verify_and_charge_transaction`.
process_receipt(receipt, &mut state_update, &mut total)?;
}

// Chances are delayed receipts will be processed, so start prefetching.
// Invoke prefetcher possibly multiple times to fill its work queue.
if local_receipts.len() - i <= look_ahead_len {
load_delayed_receipts_batch(
&total,
&state_update,
&delayed_receipts_indices,
&mut delayed_receipts,
&mut delayed_receipts_keys,
)?;
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}
}
metrics.local_receipts_done(
local_receipts.len() as u64,
Expand All @@ -1515,9 +1570,24 @@ impl Runtime {
total.compute,
);

// Prefetching delayed receipts above wasn't triggered, so start it.
// Loading delayed receipts is required not only for performance but also for correctness,
// to have a delayed receipt available in below's loop first iteration.
if local_receipts.is_empty() {
load_delayed_receipts_batch(
&total,
&state_update,
&delayed_receipts_indices,
&mut delayed_receipts,
&mut delayed_receipts_keys,
)?;
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}

// Then we process the delayed receipts. It's a backlog of receipts from the past blocks.
// TODO refactor to avoid cloning and `receipt`
let delayed_processing_start = std::time::Instant::now();
let mut delayed_receipt_count = 0;
let mut num_processed_delayed_receipts = 0;
while delayed_receipts_indices.first_index < delayed_receipts_indices.next_available_index {
if total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
Expand All @@ -1526,19 +1596,18 @@ impl Runtime {
{
break;
}
delayed_receipt_count += 1;
let key = TrieKey::DelayedReceipt { index: delayed_receipts_indices.first_index };
let receipt: Receipt = get(&state_update, &key)?.ok_or_else(|| {
StorageError::StorageInconsistentState(format!(
"Delayed receipt #{} should be in the state",
delayed_receipts_indices.first_index
))
})?;

if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(std::slice::from_ref(&receipt));
}
// Before entering this loop, `load_delayed_receipts_batch` was called above.
// It is called again in the body of the loop, therefore
// `num_processed_delayed_receipts < delayed_receipts.len()`
let receipt = delayed_receipts
.get(num_processed_delayed_receipts)
.expect("delayed receipt should be loaded")
.clone();
let key = delayed_receipts_keys
.get(num_processed_delayed_receipts)
.expect("delayed receipt key should be loaded")
.clone();

// Validating the delayed receipt. If it fails, it's likely the state is inconsistent.
validate_receipt(
Expand All @@ -1558,21 +1627,38 @@ impl Runtime {
delayed_receipts_indices.first_index += 1;
process_receipt(&receipt, &mut state_update, &mut total)?;
processed_delayed_receipts.push(receipt);

if num_processed_delayed_receipts % look_ahead_len == 0 {
load_delayed_receipts_batch(
&total,
&state_update,
&delayed_receipts_indices,
&mut delayed_receipts,
&mut delayed_receipts_keys,
)?;
prefetch_manager.prefetch_next_delayed_receipts(&delayed_receipts);
}
if delayed_receipts.len() - num_processed_delayed_receipts == 1 {
prefetch_manager.prefetch_next_incoming_receipts(incoming_receipts);
}

num_processed_delayed_receipts += 1;
}
metrics.delayed_receipts_done(
delayed_receipt_count,
num_processed_delayed_receipts.try_into().expect("too many delayed receipts"),
delayed_processing_start.elapsed(),
total.gas,
total.compute,
);

if delayed_receipts.is_empty() {
// Incoming receipt prefetching above wasn't triggered.
prefetch_manager.prefetch_next_incoming_receipts(incoming_receipts);
}

// And then we process the new incoming receipts. These are receipts from other shards.
let incoming_processing_start = std::time::Instant::now();
if let Some(prefetcher) = &mut prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&incoming_receipts);
}
for receipt in incoming_receipts.iter() {
for (i, receipt) in incoming_receipts.iter().enumerate() {
// Validating new incoming no matter whether we have available gas or not. We don't
// want to store invalid receipts in state as delayed.
validate_receipt(
Expand All @@ -1590,6 +1676,11 @@ impl Runtime {
} else {
process_receipt(receipt, &mut state_update, &mut total)?;
}

// Prefetch more regardless of whether the current receipt was processed?
if i % look_ahead_len == 0 {
prefetch_manager.prefetch_next_incoming_receipts(&incoming_receipts);
}
}
metrics.incoming_receipts_done(
incoming_receipts.len() as u64,
Expand Down Expand Up @@ -1705,7 +1796,7 @@ impl Runtime {
state_update.trie.recorded_storage_size_upper_bound() as f64;
metrics::CHUNK_RECORDED_SIZE_UPPER_BOUND.observe(chunk_recorded_size_upper_bound);
let (trie, trie_changes, state_changes) = state_update.finalize()?;
if let Some(prefetcher) = &prefetcher {
if prefetch_manager.prefetching_enabled() {
// Only clear the prefetcher queue after finalize is done because as part of receipt
// processing we also prefetch account data and access keys that are accessed in
// finalize. This data can take a very long time otherwise if not prefetched.
Expand All @@ -1716,7 +1807,7 @@ impl Runtime {
// In the future it may make sense to have prefetcher have a mode where it has two
// queues: one for data that is going to be required soon, and the other that it would
// only work when otherwise idle.
let discarded_prefetch_requests = prefetcher.clear();
let discarded_prefetch_requests = prefetch_manager.clear_prefetcher().unwrap_or(0);
tracing::debug!(target: "runtime", discarded_prefetch_requests);
}

Expand Down Expand Up @@ -1857,6 +1948,82 @@ impl TotalResourceGuard {
}
}

struct PrefetchManager {
prefetcher: Option<TriePrefetcher>,
look_ahead_len: usize,
next_delayed_receipt_index: usize,
next_local_receipt_index: usize,
next_incoming_receipt_index: usize,
}

// TODO remove `look_ahead_len` here, instead just prefetch one next receipt. less edge cases.
impl PrefetchManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this data and logic be just part of the TriePrefetcher instead of having a separate manager on top of it? I see the TriePrefetcher is already aware of receipts and transactions and it can be aware of types of receipts as well (local, delayed, incoming)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PrefetchManager is scoped to an invocation of fn apply, while TriePrefetcher is scoped to the transaction runtime, which I think is a broader scope. My interpretation is that integrating PrefetchManager functionality into TriePrefetcher would require limiting TriePrefetcher's scope to a particular fn apply invocation as well.

The current PrefetchManager turned out to be a suboptimal abstraction and I'm working on some changes of how delayed receipts are handled. If this results in a performance improvement and the PR is not discarded, I'll consider moving things into TriePrefetcher.

fn new(prefetcher: Option<TriePrefetcher>, look_ahead_len: usize) -> Self {
Self {
prefetcher,
look_ahead_len,
next_delayed_receipt_index: 0,
next_local_receipt_index: 0,
next_incoming_receipt_index: 0,
}
}

fn prefetching_enabled(&self) -> bool {
self.prefetcher.is_some()
}

fn prefetch_all_transactions(&mut self, transactions: &[SignedTransaction]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_transactions_data(transactions);
}
}

fn prefetch_next_local_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end =
std::cmp::min(self.next_local_receipt_index + self.look_ahead_len, receipts.len());
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_local_receipt_index..end]);
self.next_local_receipt_index = end;
}
}

fn prefetch_next_delayed_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end = std::cmp::min(
self.next_delayed_receipt_index + self.look_ahead_len,
receipts.len(),
);
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_delayed_receipt_index..end]);
self.next_delayed_receipt_index = end;
}
}

fn prefetch_next_incoming_receipts(&mut self, receipts: &[Receipt]) {
if let Some(prefetcher) = &mut self.prefetcher {
// Prefetcher is allowed to fail
let end = std::cmp::min(
self.next_incoming_receipt_index + self.look_ahead_len,
receipts.len(),
);
_ = prefetcher.prefetch_receipts_data(&receipts[self.next_incoming_receipt_index..end]);
self.next_incoming_receipt_index = end;
}
}

fn clear_prefetcher(&mut self) -> Option<usize> {
match &mut self.prefetcher {
Some(prefetcher) => {
let discarded_prefetch_requests = prefetcher.clear();
Some(discarded_prefetch_requests)
}
None => None,
}
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down
Loading