Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ where
}
}

const fn is_full(&self) -> bool {
self.inflight >= self.max_concurrent
}

/// Spawns a new multiproof calculation or enqueues it for later if
/// `max_concurrent` are already inflight.
fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
Expand All @@ -391,7 +395,7 @@ where
return
}

if self.inflight >= self.max_concurrent {
if self.is_full() {
self.pending.push_back(input);
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
return;
Expand Down Expand Up @@ -707,21 +711,32 @@ where

// Process proof targets in chunks.
let mut chunks = 0;
for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
let should_chunk = !self.multiproof_manager.is_full();

let mut spawn = |proof_targets| {
self.multiproof_manager.spawn_or_queue(
MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets: proof_targets_chunk,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
chunks += 1;
};

if should_chunk {
for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
spawn(proof_targets_chunk);
}
} else {
spawn(proof_targets);
}

self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);

chunks
Expand Down Expand Up @@ -830,25 +845,40 @@ where

// Process state updates in chunks.
let mut chunks = 0;
let should_chunk = !self.multiproof_manager.is_full();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try something more sophisticated, like tuning the number of chunks based on the number of available task slots (up to a max), but that would require many benchmarks. I'll set up a mainnet node on a new bare metal server first for such experiments. Shanghai blocks too small and boring..


let mut spawned_proof_targets = MultiProofTargets::default();
for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
let proof_targets =
get_proof_targets(&chunk, &self.fetched_proof_targets, &multi_added_removed_keys);

let mut spawn = |hashed_state_update| {
let proof_targets = get_proof_targets(
&hashed_state_update,
&self.fetched_proof_targets,
&multi_added_removed_keys,
);
spawned_proof_targets.extend_ref(&proof_targets);

self.multiproof_manager.spawn_or_queue(
MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update: chunk,
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);

chunks += 1;
};

if should_chunk {
for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
spawn(chunk);
}
} else {
spawn(not_fetched_state_update);
}

self.metrics
Expand Down
Loading