Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ETL file size configurable #6927

Merged
merged 11 commits into from Mar 13, 2024
1 change: 1 addition & 0 deletions bin/reth/src/commands/debug_cmd/execution.rs
Expand Up @@ -127,6 +127,7 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
stage_conf.etl.etl_file_size,
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
Expand Down
4 changes: 4 additions & 0 deletions bin/reth/src/commands/import.rs
Expand Up @@ -179,6 +179,9 @@ impl ImportCommand {
reth_revm::EvmProcessorFactory::new(self.chain.clone(), EthEvmConfig::default());

let max_block = file_client.max_block().unwrap_or(0);

let etl_file_size = config.stages.etl.etl_file_size;

let mut pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
Expand All @@ -191,6 +194,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
etl_file_size,
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
Expand Down
8 changes: 7 additions & 1 deletion bin/reth/src/commands/stage/run.rs
Expand Up @@ -82,6 +82,10 @@ pub struct Command {
#[arg(long)]
batch_size: Option<u64>,

/// The maximum size in bytes of data held in memory before being flushed to disk as a file.
#[arg(long)]
etl_file_size: Option<usize>,

/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
Expand Down Expand Up @@ -152,6 +156,8 @@ impl Command {

let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);

let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024);

let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
StageEnum::Bodies => {
Expand Down Expand Up @@ -230,7 +236,7 @@ impl Command {
)
}
StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, None)), None)
(Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None)
}
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
Expand Down
2 changes: 1 addition & 1 deletion book/cli/reth/node.md
Expand Up @@ -238,7 +238,7 @@ RPC:
--rpc-max-tracing-requests <COUNT>
Maximum number of concurrent tracing requests

[default: 8]
[default: 10]

--rpc-max-blocks-per-filter <COUNT>
Maximum number of blocks that could be scanned per filter request. (0 = entire chain)
Expand Down
3 changes: 3 additions & 0 deletions book/cli/reth/stage/run.md
Expand Up @@ -61,6 +61,9 @@ Options:
--batch-size <BATCH_SIZE>
Batch size for stage execution and unwind

--etl-file-size <ETL_FILE_SIZE>
Size for temporary file during ETL stages

-s, --skip-unwind
Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots.

Expand Down
13 changes: 13 additions & 0 deletions book/run/config.md
Expand Up @@ -221,6 +221,19 @@ The storage history indexing stage builds an index of what blocks a particular s
commit_threshold = 100000
```

### `etl`

An ETL (extract, transform, load) data collector. Used mainly to insert data into `MDBX` in a sorted manner.

```toml
[stages.etl]
# The maximum size in bytes of data held in memory before being flushed to disk as a file.
#
# Lower threshold corresponds to more frequent flushes,
# but lowers temporary storage usage
file_size = 524_288_000 # 500 * 1024 * 1024
```

## The `[peers]` section

The peers section is used to configure how the networking component of reth establishes and maintains connections to peers.
Expand Down
18 changes: 17 additions & 1 deletion crates/config/src/config.rs
Expand Up @@ -70,6 +70,8 @@ pub struct StageConfig {
pub index_account_history: IndexHistoryConfig,
/// Index Storage History stage configuration.
pub index_storage_history: IndexHistoryConfig,
/// Common ETL related configuration.
pub etl: EtlConfig,
}

/// Header stage configuration.
Expand Down Expand Up @@ -235,7 +237,21 @@ impl Default for TransactionLookupConfig {
}
}

/// History History stage configuration.
/// Common ETL related configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct EtlConfig {
/// The maximum size in bytes of data held in memory before being flushed to disk as a file.
pub etl_file_size: usize,
}

impl Default for EtlConfig {
fn default() -> Self {
Self { etl_file_size: 500 * (1024 * 1024) }
}
}

/// History stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct IndexHistoryConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/test_utils.rs
Expand Up @@ -406,6 +406,7 @@ where
header_downloader,
body_downloader,
executor_factory.clone(),
500 * (1024 * 1024),
))
}
};
Expand Down
2 changes: 2 additions & 0 deletions crates/node-core/src/node_config.rs
Expand Up @@ -837,6 +837,7 @@ impl NodeConfig {
header_downloader,
body_downloader,
factory.clone(),
stage_config.etl.etl_file_size,
)
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
Expand Down Expand Up @@ -870,6 +871,7 @@ impl NodeConfig {
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.chunk_size,
stage_config.etl.etl_file_size,
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/benches/criterion.rs
Expand Up @@ -57,7 +57,7 @@ fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/lib.rs
Expand Up @@ -59,6 +59,7 @@
//! headers_downloader,
//! bodies_downloader,
//! executor_factory,
//! 500*1024*1024,
//! )
//! )
//! .build(provider_factory, static_file_producer);
Expand Down
17 changes: 15 additions & 2 deletions crates/stages/src/sets.rs
Expand Up @@ -100,6 +100,7 @@ impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
header_downloader: H,
body_downloader: B,
executor_factory: EF,
etl_file_size: usize,
) -> Self
where
EF: ExecutorFactory,
Expand All @@ -111,6 +112,7 @@ impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
consensus,
header_downloader,
body_downloader,
etl_file_size,
),
executor_factory,
}
Expand Down Expand Up @@ -162,6 +164,8 @@ pub struct OnlineStages<Provider, H, B> {
header_downloader: H,
/// The block body downloader
body_downloader: B,
/// The size of temporary files in bytes for ETL data collector.
etl_file_size: usize,
}

impl<Provider, H, B> OnlineStages<Provider, H, B> {
Expand All @@ -172,8 +176,9 @@ impl<Provider, H, B> OnlineStages<Provider, H, B> {
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
etl_file_size: usize,
) -> Self {
Self { provider, header_mode, consensus, header_downloader, body_downloader }
Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_file_size }
}
}

Expand All @@ -198,9 +203,16 @@ where
mode: HeaderSyncMode,
header_downloader: H,
consensus: Arc<dyn Consensus>,
etl_file_size: usize,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone()))
.add_stage(HeaderStage::new(
provider,
header_downloader,
mode,
consensus.clone(),
etl_file_size,
))
.add_stage(bodies)
}
}
Expand All @@ -219,6 +231,7 @@ where
self.header_downloader,
self.header_mode,
self.consensus.clone(),
self.etl_file_size,
))
.add_stage(BodyStage::new(self.body_downloader))
}
Expand Down
6 changes: 4 additions & 2 deletions crates/stages/src/stages/headers.rs
Expand Up @@ -74,15 +74,16 @@ where
downloader: Downloader,
mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
etl_file_size: usize,
) -> Self {
Self {
provider: database,
downloader,
mode,
consensus,
sync_gap: None,
hash_collector: Collector::new(100 * (1024 * 1024)),
header_collector: Collector::new(100 * (1024 * 1024)),
hash_collector: Collector::new(etl_file_size / 2),
header_collector: Collector::new(etl_file_size / 2),
is_etl_ready: false,
}
}
Expand Down Expand Up @@ -419,6 +420,7 @@ mod tests {
(*self.downloader_factory)(),
HeaderSyncMode::Tip(self.channel.1.clone()),
self.consensus.clone(),
500 * (1024 * 1024),
)
}
}
Expand Down
23 changes: 17 additions & 6 deletions crates/stages/src/stages/tx_lookup.rs
Expand Up @@ -32,19 +32,20 @@ pub struct TransactionLookupStage {
/// The maximum number of lookup entries to hold in memory before pushing them to
/// [`reth_etl::Collector`].
chunk_size: u64,
etl_file_size: usize,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupStage {
fn default() -> Self {
Self { chunk_size: 5_000_000, prune_mode: None }
Self { chunk_size: 5_000_000, etl_file_size: 500 * 1024 * 1024, prune_mode: None }
}
}

impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(chunk_size: u64, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, prune_mode }
pub fn new(chunk_size: u64, etl_file_size: usize, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, etl_file_size, prune_mode }
}
}

Expand Down Expand Up @@ -99,7 +100,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}

// 500MB temporary files
let mut hash_collector: Collector<TxHash, TxNumber> = Collector::new(500 * (1024 * 1024));
let mut hash_collector: Collector<TxHash, TxNumber> = Collector::new(self.etl_file_size);

debug!(
target: "sync::stages::transaction_lookup",
Expand Down Expand Up @@ -397,12 +398,18 @@ mod tests {
struct TransactionLookupTestRunner {
db: TestStageDB,
chunk_size: u64,
etl_file_size: usize,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None }
Self {
db: TestStageDB::default(),
chunk_size: 1000,
etl_file_size: 500 * 1024 * 1024,
prune_mode: None,
}
}
}

Expand Down Expand Up @@ -449,7 +456,11 @@ mod tests {
}

fn stage(&self) -> Self::S {
TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode }
TransactionLookupStage {
chunk_size: self.chunk_size,
etl_file_size: self.etl_file_size,
prune_mode: self.prune_mode,
}
}
}

Expand Down