Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ETL file size configurable #6927

Merged
merged 11 commits into from Mar 13, 2024
8 changes: 7 additions & 1 deletion bin/reth/src/commands/stage/run.rs
Expand Up @@ -82,6 +82,10 @@ pub struct Command {
#[arg(long)]
batch_size: Option<u64>,

/// Size for temporary file during ETL stages
#[arg(long)]
etl_file_size: Option<usize>,

/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
Expand Down Expand Up @@ -151,6 +155,8 @@ impl Command {

let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);

let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024);

let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
StageEnum::Bodies => {
Expand Down Expand Up @@ -229,7 +235,7 @@ impl Command {
)
}
StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, None)), None)
(Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None)
}
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
Expand Down
3 changes: 3 additions & 0 deletions book/cli/reth/stage/run.md
Expand Up @@ -60,6 +60,9 @@ Options:

--batch-size <BATCH_SIZE>
Batch size for stage execution and unwind

--etl-file-size <ETL_FILE_SIZE>
Batch size for stage execution and unwind
joshieDo marked this conversation as resolved.
Show resolved Hide resolved

-s, --skip-unwind
Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots.
Expand Down
6 changes: 6 additions & 0 deletions book/run/config.md
Expand Up @@ -193,6 +193,12 @@ The transaction lookup stage builds an index of transaction hashes to their sequ
# Lower thresholds correspond to more frequent disk I/O (writes),
# but lowers memory usage
chunk_size = 5000000

# The size of temporary file for ETL data collector.
SozinM marked this conversation as resolved.
Show resolved Hide resolved
#
# Lower threshold corresponds to more frequent flushes,
# but lowers temporary storage usage
etl_file-size = 524288000
SozinM marked this conversation as resolved.
Show resolved Hide resolved
```

### `index_account_history`
Expand Down
4 changes: 3 additions & 1 deletion crates/config/src/config.rs
Expand Up @@ -227,11 +227,13 @@ impl Default for MerkleConfig {
pub struct TransactionLookupConfig {
/// The maximum number of transactions to process before writing to disk.
pub chunk_size: u64,
/// Size of temporary file during etl stage.
SozinM marked this conversation as resolved.
Show resolved Hide resolved
pub etl_file_size: usize,
}

impl Default for TransactionLookupConfig {
fn default() -> Self {
Self { chunk_size: 5_000_000 }
Self { chunk_size: 5_000_000, etl_file_size: 500 * 1024 * 1024 }
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/node-core/src/node_config.rs
Expand Up @@ -866,6 +866,7 @@ impl NodeConfig {
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.chunk_size,
stage_config.transaction_lookup.etl_file_size,
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/benches/criterion.rs
Expand Up @@ -57,7 +57,7 @@ fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

Expand Down
14 changes: 8 additions & 6 deletions crates/stages/src/stages/tx_lookup.rs
Expand Up @@ -34,19 +34,20 @@ pub struct TransactionLookupStage {
/// The maximum number of lookup entries to hold in memory before pushing them to
/// [`reth_etl::Collector`].
chunk_size: u64,
etl_file_size: usize,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupStage {
fn default() -> Self {
Self { chunk_size: 5_000_000, prune_mode: None }
Self { chunk_size: 5_000_000, etl_file_size: 500 * 1024 * 1024, prune_mode: None }
}
}

impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(chunk_size: u64, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, prune_mode }
pub fn new(chunk_size: u64, etl_file_size: usize, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, etl_file_size, prune_mode }
}
}

Expand Down Expand Up @@ -102,7 +103,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {

// 500MB temporary files
let mut hash_collector: Collector<TxHash, TxNumber> =
Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024));
Collector::new(Arc::new(TempDir::new()?), self.etl_file_size);

debug!(
target: "sync::stages::transaction_lookup",
Expand Down Expand Up @@ -397,12 +398,13 @@ mod tests {
struct TransactionLookupTestRunner {
db: TestStageDB,
chunk_size: u64,
etl_file_size: usize,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None }
Self { db: TestStageDB::default(), chunk_size: 1000, etl_file_size: 500 * 1024 * 1024, prune_mode: None }
}
}

Expand Down Expand Up @@ -449,7 +451,7 @@ mod tests {
}

fn stage(&self) -> Self::S {
TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode }
TransactionLookupStage { chunk_size: self.chunk_size, etl_file_size: self.etl_file_size, prune_mode: self.prune_mode }
}
}

Expand Down