Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Allow TaskExecutor to be used in async tests #3178

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ execution_layer = { path = "../execution_layer" }
sensitive_url = { path = "../../common/sensitive_url" }
superstruct = "0.5.0"
hex = "0.4.2"
exit-future = "0.2.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this guy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use him in beacon_chain::test_utils.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is just testing? Can it be a dev-dep then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for testing, but it's not hidden behind a #[test] compilation flag so it must be a major dependency. We can't hide it behind a #[test] flag since it's used across multiple crates and Cargo doesn't play nicely with that.


[[test]]
name = "beacon_chain_tests"
Expand Down
14 changes: 13 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::ShutdownReason;
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
Expand Down Expand Up @@ -91,6 +91,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
// Pending I/O batch that is constructed during building and should be executed atomically
// alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called.
pending_io_batch: Vec<KeyValueStoreOp>,
task_executor: Option<TaskExecutor>,
}

impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
Expand Down Expand Up @@ -129,6 +130,7 @@ where
slasher: None,
validator_monitor: None,
pending_io_batch: vec![],
task_executor: None,
}
}

Expand Down Expand Up @@ -182,6 +184,13 @@ where
self.log = Some(log);
self
}

/// Sets the task executor.
pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self {
self.task_executor = Some(task_executor);
self
}

/// Attempt to load an existing eth1 cache from the builder's `Store`.
pub fn get_persisted_eth1_backend(&self) -> Result<Option<SszEth1>, String> {
let store = self
Expand Down Expand Up @@ -919,6 +928,7 @@ mod test {
use std::time::Duration;
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use task_executor::test_utils::TestRuntime;
use types::{EthSpec, MinimalEthSpec, Slot};

type TestEthSpec = MinimalEthSpec;
Expand Down Expand Up @@ -952,10 +962,12 @@ mod test {
.expect("should create interop genesis state");

let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let runtime = TestRuntime::default();

let chain = BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.store(Arc::new(store))
.task_executor(runtime.task_executor.clone())
.genesis_state(genesis_state)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
Expand Down
31 changes: 15 additions & 16 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ use crate::{
};
use bls::get_withdrawal_credentials;
use execution_layer::{
test_utils::{
ExecutionBlockGenerator, ExecutionLayerRuntime, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK,
},
test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK},
ExecutionLayer,
};
use futures::channel::mpsc::Receiver;
pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
use logging::test_logger;
use merkle_proof::MerkleTree;
use parking_lot::Mutex;
use parking_lot::RwLockWriteGuard;
Expand All @@ -41,7 +38,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore};
use task_executor::ShutdownReason;
use task_executor::{test_utils::TestRuntime, ShutdownReason};
use tree_hash::TreeHash;
use types::sync_selection_proof::SyncSelectionProof;
pub use types::test_utils::generate_deterministic_keypairs;
Expand Down Expand Up @@ -151,8 +148,8 @@ pub struct Builder<T: BeaconChainTypes> {
initial_mutator: Option<BoxedMutator<T::EthSpec, T::HotStore, T::ColdStore>>,
store_mutator: Option<BoxedMutator<T::EthSpec, T::HotStore, T::ColdStore>>,
execution_layer: Option<ExecutionLayer>,
execution_layer_runtime: Option<ExecutionLayerRuntime>,
mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
runtime: TestRuntime,
log: Logger,
}

Expand Down Expand Up @@ -255,6 +252,9 @@ where
Cold: ItemStore<E>,
{
pub fn new(eth_spec_instance: E) -> Self {
let runtime = TestRuntime::default();
let log = runtime.log.clone();

Self {
eth_spec_instance,
spec: None,
Expand All @@ -266,8 +266,8 @@ where
store_mutator: None,
execution_layer: None,
mock_execution_layer: None,
execution_layer_runtime: None,
log: test_logger(),
runtime,
log,
}
}

Expand Down Expand Up @@ -330,8 +330,6 @@ where
"execution layer already defined"
);

let el_runtime = ExecutionLayerRuntime::default();

let urls: Vec<SensitiveUrl> = urls
.iter()
.map(|s| SensitiveUrl::parse(*s))
Expand All @@ -346,19 +344,19 @@ where
};
let execution_layer = ExecutionLayer::from_config(
config,
el_runtime.task_executor.clone(),
el_runtime.log.clone(),
self.runtime.task_executor.clone(),
self.log.clone(),
)
.unwrap();

self.execution_layer = Some(execution_layer);
self.execution_layer_runtime = Some(el_runtime);
self
}

pub fn mock_execution_layer(mut self) -> Self {
let spec = self.spec.clone().expect("cannot build without spec");
let mock = MockExecutionLayer::new(
self.runtime.task_executor.clone(),
spec.terminal_total_difficulty,
DEFAULT_TERMINAL_BLOCK,
spec.terminal_block_hash,
Expand All @@ -383,7 +381,7 @@ where
pub fn build(self) -> BeaconChainHarness<BaseHarnessType<E, Hot, Cold>> {
let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1);

let log = test_logger();
let log = self.log;
let spec = self.spec.expect("cannot build without spec");
let seconds_per_slot = spec.seconds_per_slot;
let validator_keypairs = self
Expand All @@ -395,6 +393,7 @@ where
.custom_spec(spec)
.store(self.store.expect("cannot build without store"))
.store_migrator_config(MigratorConfig::default().blocking())
.task_executor(self.runtime.task_executor.clone())
.execution_layer(self.execution_layer)
.dummy_eth1_backend()
.expect("should build dummy backend")
Expand Down Expand Up @@ -434,8 +433,8 @@ where
chain: Arc::new(chain),
validator_keypairs,
shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)),
runtime: self.runtime,
mock_execution_layer: self.mock_execution_layer,
execution_layer_runtime: self.execution_layer_runtime,
rng: make_rng(),
}
}
Expand All @@ -451,9 +450,9 @@ pub struct BeaconChainHarness<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
pub spec: ChainSpec,
pub shutdown_receiver: Arc<Mutex<Receiver<ShutdownReason>>>,
pub runtime: TestRuntime,

pub mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
pub execution_layer_runtime: Option<ExecutionLayerRuntime>,

pub rng: Mutex<StdRng>,
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ where
let builder = BeaconChainBuilder::new(eth_spec_instance)
.logger(context.log().clone())
.store(store)
.task_executor(context.executor.clone())
.custom_spec(spec.clone())
.chain_config(chain_config)
.graffiti(graffiti)
Expand Down
28 changes: 13 additions & 15 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,7 @@ impl ExecutionLayer {
T: Fn(&'a Self) -> U,
U: Future<Output = Result<V, Error>>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
// TODO(merge): respect the shutdown signal.
runtime.block_on(generate_future(self))
}
Expand All @@ -322,11 +318,7 @@ impl ExecutionLayer {
T: Fn(&'a Self) -> U,
U: Future<Output = V>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
// TODO(merge): respect the shutdown signal.
Ok(runtime.block_on(generate_future(self)))
}
Expand Down Expand Up @@ -1263,13 +1255,15 @@ impl ExecutionLayer {
mod test {
use super::*;
use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer;
use task_executor::test_utils::TestRuntime;
use types::MainnetEthSpec;

type MockExecutionLayer = GenericMockExecutionLayer<MainnetEthSpec>;

#[tokio::test]
async fn produce_three_valid_pos_execution_blocks() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.produce_valid_execution_payload_on_head()
.await
Expand All @@ -1281,7 +1275,8 @@ mod test {

#[tokio::test]
async fn finds_valid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_block_prior_to_terminal_block()
.with_terminal_block(|spec, el, _| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand All @@ -1300,7 +1295,8 @@ mod test {

#[tokio::test]
async fn verifies_valid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, terminal_block| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand All @@ -1316,7 +1312,8 @@ mod test {

#[tokio::test]
async fn rejects_invalid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, terminal_block| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand All @@ -1334,7 +1331,8 @@ mod test {

#[tokio::test]
async fn rejects_unknown_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, _| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand Down
Loading