Skip to content

Commit

Permalink
Optimize batching of transactions during replay for parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 committed Feb 7, 2022
1 parent 37afdd1 commit 4de14e5
Showing 1 changed file with 76 additions and 1 deletion.
77 changes: 76 additions & 1 deletion ledger/src/blockstore_processor.rs
Expand Up @@ -28,6 +28,7 @@ use {
bank_utils,
block_cost_limits::*,
commitment::VOTE_THRESHOLD_SIZE,
cost_model::CostModel,
snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageSender, SnapshotType},
snapshot_utils::{self, BankFromArchiveTimings},
Expand All @@ -53,6 +54,7 @@ use {
collect_token_balances, TransactionTokenBalancesSet,
},
std::{
borrow::Cow,
cell::RefCell,
collections::{HashMap, HashSet},
path::PathBuf,
Expand Down Expand Up @@ -247,7 +249,7 @@ fn execute_batch(
first_err.map(|(result, _)| result).unwrap_or(Ok(()))
}

fn execute_batches(
fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatch],
entry_callback: Option<&ProcessCallback>,
Expand Down Expand Up @@ -290,6 +292,79 @@ fn execute_batches(
first_err(&results)
}

fn execute_batches(
bank: &Arc<Bank>,
batches: &[TransactionBatch],
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
let lock_results = batches
.iter()
.flat_map(|batch| batch.lock_results().clone())
.collect::<Vec<_>>();
let sanitized_txs = batches
.iter()
.flat_map(|batch| batch.sanitized_transactions().to_vec())
.collect::<Vec<_>>();

let cost_model = CostModel::new();
let mut minimal_tx_cost = u64::MAX;
let total_cost: u64 = sanitized_txs
.iter()
.map(|tx| {
let cost = cost_model.calculate_cost(tx).sum();
minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost);
cost
})
.sum();

let target_batch_count = get_thread_count() as u64;

if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let target_batch_cost = total_cost / target_batch_count;
let mut batch_cost = 0;
let mut index = 0;
let mut tx_batches: Vec<TransactionBatch> = vec![];
let mut slice_range = 0..0;
while index < sanitized_txs.len() {
batch_cost += cost_model.calculate_cost(&sanitized_txs[index]).sum();
index += 1;
if batch_cost >= target_batch_cost || sanitized_txs.len() == index {
slice_range.end = index;
let txs = &sanitized_txs[slice_range.clone()];
let results = &lock_results[slice_range.clone()];
let tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs));
slice_range.start = index;
tx_batches.push(tx_batch);
batch_cost = 0;
}
}

execute_batches_internal(
bank,
&tx_batches,
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)
} else {
execute_batches_internal(
bank,
&batches,
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)
}
}

/// Process an ordered list of entries in parallel
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
/// 2. Process the locked group in parallel
Expand Down

0 comments on commit 4de14e5

Please sign in to comment.