Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(static-file): pass producer as Arc<Mutex<_>> to ensure only one is active #7143

Merged
merged 12 commits into from Mar 15, 2024
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Expand Up @@ -94,6 +94,7 @@ similar-asserts.workspace = true
itertools.workspace = true
rayon.workspace = true
boyer-moore-magiclen = "0.2.16"
parking_lot.workspace = true

[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }
Expand Down
7 changes: 4 additions & 3 deletions bin/reth/src/commands/debug_cmd/execution.rs
Expand Up @@ -12,6 +12,7 @@ use crate::{
};
use clap::Parser;
use futures::{stream::select as stream_select, StreamExt};
use parking_lot::Mutex;
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::{database::Database, init_db, DatabaseEnv};
Expand Down Expand Up @@ -95,7 +96,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
provider_factory: ProviderFactory<DB>,
task_executor: &TaskExecutor,
static_file_producer: StaticFileProducer<DB>,
static_file_producer: Arc<Mutex<StaticFileProducer<DB>>>,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
Expand Down Expand Up @@ -231,11 +232,11 @@ impl Command {
)
.await?;

let static_file_producer = StaticFileProducer::new(
let static_file_producer = Arc::new(Mutex::new(StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
);
)));

// Configure the pipeline
let fetch_client = network.fetch_client().await?;
Expand Down
5 changes: 3 additions & 2 deletions bin/reth/src/commands/debug_cmd/replay_engine.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{
};
use clap::Parser;
use eyre::Context;
use parking_lot::Mutex;
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_beacon_consensus::{hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine};
use reth_blockchain_tree::{
Expand Down Expand Up @@ -199,11 +200,11 @@ impl Command {
network_client,
Pipeline::builder().build(
provider_factory.clone(),
StaticFileProducer::new(
Arc::new(Mutex::new(StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
))),
),
blockchain_db.clone(),
Box::new(ctx.task_executor.clone()),
Expand Down
15 changes: 7 additions & 8 deletions bin/reth/src/commands/import.rs
Expand Up @@ -11,6 +11,7 @@ use crate::{
use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use parking_lot::Mutex;
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::{database::Database, init_db};
Expand Down Expand Up @@ -107,19 +108,17 @@ impl ImportCommand {
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file imported");

let static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
);

let (mut pipeline, events) = self
.build_import_pipeline(
config,
provider_factory.clone(),
&consensus,
file_client,
static_file_producer,
Arc::new(Mutex::new(StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
))),
)
.await?;

Expand Down Expand Up @@ -155,7 +154,7 @@ impl ImportCommand {
provider_factory: ProviderFactory<DB>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
static_file_producer: StaticFileProducer<DB>,
static_file_producer: Arc<Mutex<StaticFileProducer<DB>>>,
) -> eyre::Result<(Pipeline<DB>, impl Stream<Item = NodeEvent>)>
where
DB: Database + Clone + Unpin + 'static,
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/Cargo.toml
Expand Up @@ -40,6 +40,7 @@ metrics.workspace = true
tracing.workspace = true
thiserror.workspace = true
schnellru.workspace = true
parking_lot = { workspace = true, features = ["send_guard", "arc_lock"] }

[dev-dependencies]
# reth
Expand Down
37 changes: 25 additions & 12 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Expand Up @@ -5,12 +5,16 @@ use crate::{
hooks::EngineHookDBAccessLevel,
};
use futures::FutureExt;
use parking_lot::Mutex;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{static_file::HighestStaticFiles, BlockNumber};
use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
use reth_static_file::{StaticFileProducer, StaticFileProducerResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tracing::trace;

Expand All @@ -28,7 +32,7 @@ pub struct StaticFileHook<DB> {
impl<DB: Database + 'static> StaticFileHook<DB> {
/// Create a new instance
pub fn new(
static_file_producer: StaticFileProducer<DB>,
static_file_producer: Arc<Mutex<StaticFileProducer<DB>>>,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
Expand Down Expand Up @@ -85,31 +89,40 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
StaticFileProducerState::Idle(static_file_producer) => {
let Some(mut static_file_producer) = static_file_producer.take() else {
let Some(static_file_producer) = static_file_producer.take() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
return Ok(None)
};

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;
let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc()
else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};

let targets =
locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;

// Check if the moving data to static files has been requested.
if targets.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(
"static_file_producer task",
Box::pin(async move {
let result = static_file_producer.run(targets);
let result = locked_static_file_producer.run(targets);
drop(locked_static_file_producer);
let _ = tx.send((static_file_producer, result));
}),
);
self.state = StaticFileProducerState::Running(rx);

Some(EngineHookEvent::Started)
} else {
drop(locked_static_file_producer);
self.state = StaticFileProducerState::Idle(Some(static_file_producer));
Some(EngineHookEvent::NotReady)
}
Expand Down Expand Up @@ -157,7 +170,7 @@ impl<DB: Database + 'static> EngineHook for StaticFileHook<DB> {
#[derive(Debug)]
enum StaticFileProducerState<DB> {
/// [StaticFileProducer] is idle.
Idle(Option<StaticFileProducer<DB>>),
Idle(Option<Arc<Mutex<StaticFileProducer<DB>>>>),
/// [StaticFileProducer] is running and waiting for a response
Running(oneshot::Receiver<StaticFileProducerWithResult<DB>>),
Running(oneshot::Receiver<(Arc<Mutex<StaticFileProducer<DB>>>, StaticFileProducerResult)>),
}
1 change: 1 addition & 0 deletions crates/node-builder/Cargo.toml
Expand Up @@ -49,3 +49,4 @@ tokio = { workspace = true, features = [
eyre.workspace = true
fdlimit = "0.3.0"
confy.workspace = true
parking_lot.workspace = true
7 changes: 4 additions & 3 deletions crates/node-builder/src/builder.rs
Expand Up @@ -14,6 +14,7 @@ use crate::{
};
use eyre::Context;
use futures::{future::Either, stream, stream_select, StreamExt};
use parking_lot::Mutex;
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensusEngine,
Expand Down Expand Up @@ -548,12 +549,12 @@ where
let max_block = config.max_block(&network_client, provider_factory.clone()).await?;
let mut hooks = EngineHooks::new();

let mut static_file_producer = StaticFileProducer::new(
let mut static_file_producer = Arc::new(Mutex::new(StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
prune_config.clone().unwrap_or_default().segments,
);
let static_file_producer_events = static_file_producer.events();
)));
let static_file_producer_events = static_file_producer.lock().events();
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "StaticFileProducer initialized");

Expand Down
1 change: 1 addition & 0 deletions crates/node-core/Cargo.toml
Expand Up @@ -70,6 +70,7 @@ const-str = "0.5.6"
rand.workspace = true
pin-project.workspace = true
derive_more.workspace = true
parking_lot.workspace = true

# io
dirs-next = "2.0.0"
Expand Down
5 changes: 3 additions & 2 deletions crates/node-core/src/node_config.rs
Expand Up @@ -12,6 +12,7 @@ use crate::{
};
use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use reth_auto_seal_consensus::{AutoSealConsensus, MiningMode};
use reth_beacon_consensus::BeaconConsensus;
use reth_blockchain_tree::{
Expand Down Expand Up @@ -544,7 +545,7 @@ impl NodeConfig {
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<DB>,
static_file_producer: Arc<Mutex<StaticFileProducer<DB>>>,
evm_config: EvmConfig,
) -> eyre::Result<Pipeline<DB>>
where
Expand Down Expand Up @@ -788,7 +789,7 @@ impl NodeConfig {
continuous: bool,
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
static_file_producer: StaticFileProducer<DB>,
static_file_producer: Arc<Mutex<StaticFileProducer<DB>>>,
evm_config: EvmConfig,
) -> eyre::Result<Pipeline<DB>>
where
Expand Down
8 changes: 4 additions & 4 deletions crates/revm/src/stack.rs
Expand Up @@ -131,7 +131,7 @@ where
) -> Option<CallOutcome> {
call_inspectors!([&mut self.custom_print_tracer], |inspector| {
if let Some(outcome) = inspector.call(context, inputs) {
return Some(outcome);
Copy link
Member

@rkrasiuk rkrasiuk Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do these keep making their way back into the codebase

return Some(outcome)
}
});

Expand All @@ -151,7 +151,7 @@ where
// If the inspector returns a different ret or a revert with a non-empty message,
// we assume it wants to tell us something
if new_ret != outcome {
return new_ret;
return new_ret
}
});

Expand All @@ -166,7 +166,7 @@ where
) -> Option<CreateOutcome> {
call_inspectors!([&mut self.custom_print_tracer], |inspector| {
if let Some(out) = inspector.create(context, inputs) {
return Some(out);
return Some(out)
}
});

Expand All @@ -186,7 +186,7 @@ where
// If the inspector returns a different ret or a revert with a non-empty message,
// we assume it wants to tell us something
if new_ret != outcome {
return new_ret;
return new_ret
}
});

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/src/eth/bundle.rs
Expand Up @@ -82,7 +82,7 @@ where
{
return Err(EthApiError::InvalidParams(
EthBundleError::Eip4844BlobGasExceeded.to_string(),
));
))
}

let block_id: reth_rpc_types::BlockId = state_block_number.into();
Expand Down
1 change: 1 addition & 0 deletions crates/stages/Cargo.toml
Expand Up @@ -46,6 +46,7 @@ itertools.workspace = true
rayon.workspace = true
num-traits = "0.2.15"
auto_impl = "1"
parking_lot.workspace = true

[dev-dependencies]
# reth
Expand Down
5 changes: 4 additions & 1 deletion crates/stages/src/pipeline/builder.rs
@@ -1,4 +1,7 @@
use std::sync::Arc;

use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet};
use parking_lot::Mutex;
use reth_db::database::Database;
use reth_primitives::{stage::StageId, BlockNumber, B256};
use reth_provider::ProviderFactory;
Expand Down Expand Up @@ -71,7 +74,7 @@ where
pub fn build(
self,
provider_factory: ProviderFactory<DB>,
static_file_producer: StaticFileProducer<DB>,
static_file_producer: Arc<Mutex<StaticFileProducer<DB>>>,
) -> Pipeline<DB> {
let Self { stages, max_block, tip_tx, metrics_tx } = self;
Pipeline {
Expand Down