Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(static-file): pass producer as Arc<Mutex<_>> to ensure only one is active #7143

Merged
merged 12 commits into from Mar 15, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions bin/reth/src/commands/import.rs
Expand Up @@ -107,19 +107,17 @@ impl ImportCommand {
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file imported");

let static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
);

let (mut pipeline, events) = self
.build_import_pipeline(
config,
provider_factory.clone(),
&consensus,
file_client,
static_file_producer,
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
)
.await?;

Expand Down
32 changes: 20 additions & 12 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Expand Up @@ -69,14 +69,15 @@ impl<DB: Database + 'static> StaticFileHook<DB> {

/// This will try to spawn the static_file_producer if it is idle:
/// 1. Check if producing static files is needed through
/// [StaticFileProducer::get_static_file_targets] and then
/// [StaticFileTargets::any](reth_static_file::StaticFileTargets::any).
/// [StaticFileProducer::get_static_file_targets](reth_static_file::StaticFileProducerInner::get_static_file_targets)
/// and then [StaticFileTargets::any](reth_static_file::StaticFileTargets::any).
/// 2.
/// 1. If producing static files is needed, pass static file request to the
/// [StaticFileProducer::run] and spawn it in a separate task. Set static file producer
/// state to [StaticFileProducerState::Running].
/// [StaticFileProducer::run](reth_static_file::StaticFileProducerInner::run) and spawn
/// it in a separate task. Set static file producer state to
/// [StaticFileProducerState::Running].
/// 2. If producing static files is not needed, set static file producer state back to
/// [StaticFileProducerState::Idle].
/// [StaticFileProducerState::Idle].
///
/// If static_file_producer is already running, do nothing.
fn try_spawn_static_file_producer(
Expand All @@ -85,24 +86,31 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
StaticFileProducerState::Idle(static_file_producer) => {
let Some(mut static_file_producer) = static_file_producer.take() else {
let Some(static_file_producer) = static_file_producer.take() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
return Ok(None)
};

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;
let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc()
else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};

let targets =
locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;

// Check if the moving data to static files has been requested.
if targets.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(
"static_file_producer task",
Box::pin(async move {
let result = static_file_producer.run(targets);
let result = locked_static_file_producer.run(targets);
let _ = tx.send((static_file_producer, result));
}),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/node-builder/src/builder.rs
Expand Up @@ -548,12 +548,12 @@ where
let max_block = config.max_block(&network_client, provider_factory.clone()).await?;
let mut hooks = EngineHooks::new();

let mut static_file_producer = StaticFileProducer::new(
let static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
prune_config.clone().unwrap_or_default().segments,
);
let static_file_producer_events = static_file_producer.events();
let static_file_producer_events = static_file_producer.lock().events();
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "StaticFileProducer initialized");

Expand Down
8 changes: 4 additions & 4 deletions crates/revm/src/stack.rs
Expand Up @@ -131,7 +131,7 @@ where
) -> Option<CallOutcome> {
call_inspectors!([&mut self.custom_print_tracer], |inspector| {
if let Some(outcome) = inspector.call(context, inputs) {
return Some(outcome);
Copy link
Member

@rkrasiuk rkrasiuk Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do these keep making their way back into the codebase

return Some(outcome)
}
});

Expand All @@ -151,7 +151,7 @@ where
// If the inspector returns a different ret or a revert with a non-empty message,
// we assume it wants to tell us something
if new_ret != outcome {
return new_ret;
return new_ret
}
});

Expand All @@ -166,7 +166,7 @@ where
) -> Option<CreateOutcome> {
call_inspectors!([&mut self.custom_print_tracer], |inspector| {
if let Some(out) = inspector.create(context, inputs) {
return Some(out);
return Some(out)
}
});

Expand All @@ -186,7 +186,7 @@ where
// If the inspector returns a different ret or a revert with a non-empty message,
// we assume it wants to tell us something
if new_ret != outcome {
return new_ret;
return new_ret
}
});

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/src/eth/bundle.rs
Expand Up @@ -82,7 +82,7 @@ where
{
return Err(EthApiError::InvalidParams(
EthBundleError::Eip4844BlobGasExceeded.to_string(),
));
))
}

let block_id: reth_rpc_types::BlockId = state_block_number.into();
Expand Down
9 changes: 7 additions & 2 deletions crates/stages/src/pipeline/mod.rs
Expand Up @@ -229,9 +229,14 @@ where
/// -> [StageId::Execution]
/// - [StaticFileSegment::Transactions](reth_primitives::static_file::StaticFileSegment::Transactions)
/// -> [StageId::Bodies]
///
/// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
/// lock is occupied.
fn produce_static_files(&mut self) -> RethResult<()> {
let mut static_file_producer = self.static_file_producer.lock();

let provider = self.provider_factory.provider()?;
let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles {
let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: provider
.get_stage_checkpoint(StageId::Headers)?
.map(|checkpoint| checkpoint.block_number),
Expand All @@ -242,7 +247,7 @@ where
.get_stage_checkpoint(StageId::Bodies)?
.map(|checkpoint| checkpoint.block_number),
})?;
self.static_file_producer.run(targets)?;
static_file_producer.run(targets)?;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/static-file/Cargo.toml
Expand Up @@ -27,6 +27,7 @@ tokio-stream.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true
parking_lot = { workspace = true, features = ["send_guard", "arc_lock"] }
shekhirin marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
reth-db = { workspace = true, features = ["test-utils"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/static-file/src/lib.rs
Expand Up @@ -13,5 +13,6 @@ mod static_file_producer;

pub use event::StaticFileProducerEvent;
pub use static_file_producer::{
StaticFileProducer, StaticFileProducerResult, StaticFileProducerWithResult, StaticFileTargets,
StaticFileProducer, StaticFileProducerInner, StaticFileProducerResult,
StaticFileProducerWithResult, StaticFileTargets,
};
123 changes: 106 additions & 17 deletions crates/static-file/src/static_file_producer.rs
@@ -1,6 +1,7 @@
//! Support for producing static files.

use crate::{segments, segments::Segment, StaticFileProducerEvent};
use parking_lot::Mutex;
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
Expand All @@ -10,26 +11,58 @@ use reth_provider::{
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, time::Instant};
use std::{
ops::{Deref, RangeInclusive},
sync::Arc,
time::Instant,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};

/// Result of [StaticFileProducer::run] execution.
/// Result of [StaticFileProducerInner::run] execution.
pub type StaticFileProducerResult = RethResult<StaticFileTargets>;

/// The [StaticFileProducer] instance itself with the result of [StaticFileProducer::run]
/// The [StaticFileProducer] instance itself with the result of [StaticFileProducerInner::run]
pub type StaticFileProducerWithResult<DB> = (StaticFileProducer<DB>, StaticFileProducerResult);

/// Static File producer routine. See [StaticFileProducer::run] for more detailed description.
/// Static File producer. It's a wrapper around [StaticFileProducer] that allows to share it
/// between threads.
#[derive(Debug, Clone)]
pub struct StaticFileProducer<DB> {
pub struct StaticFileProducer<DB>(Arc<Mutex<StaticFileProducerInner<DB>>>);

impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
) -> Self {
Self(Arc::new(Mutex::new(StaticFileProducerInner::new(
provider_factory,
static_file_provider,
prune_modes,
))))
}
}

impl<DB> Deref for StaticFileProducer<DB> {
type Target = Arc<Mutex<StaticFileProducerInner<DB>>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Static File producer routine. See [StaticFileProducerInner::run] for more detailed description.
#[derive(Debug)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good!

pub struct StaticFileProducerInner<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Static File provider
static_file_provider: StaticFileProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [StaticFileProducer] to prevent attempting to move prunable data to static files.
/// See [StaticFileProducer::get_static_file_targets].
/// needed in [StaticFileProducerInner] to prevent attempting to move prunable data to static
/// files. See [StaticFileProducerInner::get_static_file_targets].
prune_modes: PruneModes,
listeners: EventListeners<StaticFileProducerEvent>,
}
Expand Down Expand Up @@ -68,9 +101,8 @@ impl StaticFileTargets {
}
}

impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
impl<DB: Database> StaticFileProducerInner<DB> {
fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
Expand Down Expand Up @@ -200,9 +232,11 @@ impl<DB: Database> StaticFileProducer<DB> {

#[cfg(test)]
mod tests {
use crate::{static_file_producer::StaticFileTargets, StaticFileProducer};
use crate::static_file_producer::{
StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
};
use assert_matches::assert_matches;
use reth_db::{database::Database, transaction::DbTx};
use reth_db::{database::Database, test_utils::TempDatabase, transaction::DbTx, DatabaseEnv};
use reth_interfaces::{
provider::ProviderError,
test_utils::{
Expand All @@ -214,13 +248,18 @@ mod tests {
use reth_primitives::{
static_file::HighestStaticFiles, PruneModes, StaticFileSegment, B256, U256,
};
use reth_provider::providers::StaticFileWriter;
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
ProviderFactory,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{
sync::{mpsc::channel, Arc},
time::Duration,
};

#[test]
fn run() {
fn setup() -> (ProviderFactory<Arc<TempDatabase<DatabaseEnv>>>, StaticFileProvider) {
let mut rng = generators::rng();

let db = TestStageDB::default();

let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
Expand Down Expand Up @@ -251,8 +290,14 @@ mod tests {

let provider_factory = db.factory;
let static_file_provider = provider_factory.static_file_provider();
(provider_factory, static_file_provider)
}

let mut static_file_producer = StaticFileProducer::new(
#[test]
fn run() {
let (provider_factory, static_file_provider) = setup();

let mut static_file_producer = StaticFileProducerInner::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
Expand Down Expand Up @@ -324,4 +369,48 @@ mod tests {
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
}

/// Tests that a cloneable [`StaticFileProducer`] type is not susceptible to any race condition.
#[test]
fn only_one() {
let (provider_factory, static_file_provider) = setup();

let static_file_producer = StaticFileProducer::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
);

let (tx, rx) = channel();

for i in 0..5 {
let producer = static_file_producer.clone();
let tx = tx.clone();

std::thread::spawn(move || {
let mut locked_producer = producer.lock();
if i == 0 {
// Let other threads spawn as well.
std::thread::sleep(Duration::from_millis(100));
}
let targets = locked_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get static file targets");
assert_matches!(locked_producer.run(targets.clone()), Ok(_));
tx.send(targets).unwrap();
});
}

drop(tx);

let mut only_one = Some(());
for target in rx {
// Only the first spawn should have any meaningful target.
assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
}
}
}