From 9ddfb49646ce8dd840805d69e423458f45c7d891 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 13 May 2022 13:51:46 +1000 Subject: [PATCH 1/6] Unify task_executor Introduce harness to test harness Thread task_executor into BeaconChain --- Cargo.lock | 2 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/builder.rs | 14 ++++- beacon_node/beacon_chain/src/test_utils.rs | 52 ++++++++++------ beacon_node/client/src/builder.rs | 1 + beacon_node/execution_layer/src/lib.rs | 16 +++-- .../src/test_utils/mock_execution_layer.rs | 60 +++---------------- .../execution_layer/src/test_utils/mod.rs | 2 +- beacon_node/http_api/tests/tests.rs | 4 +- common/task_executor/Cargo.toml | 3 +- common/task_executor/src/lib.rs | 3 +- common/task_executor/src/test_utils.rs | 49 +++++++++++++++ lighthouse/environment/src/lib.rs | 13 +--- 13 files changed, 129 insertions(+), 91 deletions(-) create mode 100644 common/task_executor/src/test_utils.rs diff --git a/Cargo.lock b/Cargo.lock index f8016d99407..73253467055 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,7 @@ dependencies = [ "eth2_ssz_derive", "eth2_ssz_types", "execution_layer", + "exit-future", "fork_choice", "futures", "genesis", @@ -6028,6 +6029,7 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "slog", + "sloggers", "tokio", ] diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 022b85fa7e1..c8b82e3d28a 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -61,6 +61,7 @@ execution_layer = { path = "../execution_layer" } sensitive_url = { path = "../../common/sensitive_url" } superstruct = "0.5.0" hex = "0.4.2" +exit-future = "0.2.0" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 98dcce9d2ae..2efc972ed56 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -27,7 +27,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; -use task_executor::ShutdownReason; +use task_executor::{ShutdownReason, TaskExecutor}; use types::{ BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, @@ -91,6 +91,7 @@ pub struct BeaconChainBuilder { // Pending I/O batch that is constructed during building and should be executed atomically // alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called. pending_io_batch: Vec, + task_executor: Option, } impl @@ -129,6 +130,7 @@ where slasher: None, validator_monitor: None, pending_io_batch: vec![], + task_executor: None, } } @@ -182,6 +184,13 @@ where self.log = Some(log); self } + + /// Sets the task executor. + pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self { + self.task_executor = Some(task_executor); + self + } + /// Attempt to load an existing eth1 cache from the builder's `Store`. pub fn get_persisted_eth1_backend(&self) -> Result, String> { let store = self @@ -919,6 +928,7 @@ mod test { use std::time::Duration; use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; + use task_executor::test_utils::TestRuntime; use types::{EthSpec, MinimalEthSpec, Slot}; type TestEthSpec = MinimalEthSpec; @@ -952,10 +962,12 @@ mod test { .expect("should create interop genesis state"); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let runtime = TestRuntime::default(); let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .store(Arc::new(store)) + .task_executor(runtime.task_executor.clone()) .genesis_state(genesis_state) .expect("should build state using recent genesis") .dummy_eth1_backend() diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3c8299f1650..3e4bc9adbd7 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -12,9 +12,7 @@ use crate::{ }; use bls::get_withdrawal_credentials; use execution_layer::{ - test_utils::{ - ExecutionBlockGenerator, ExecutionLayerRuntime, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK, - }, + test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK}, ExecutionLayer, }; use futures::channel::mpsc::Receiver; @@ -41,7 +39,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; -use task_executor::ShutdownReason; +use task_executor::{Runtime, ShutdownReason, TaskExecutor}; use tree_hash::TreeHash; use types::sync_selection_proof::SyncSelectionProof; pub use types::test_utils::generate_deterministic_keypairs; @@ -151,8 +149,10 @@ pub struct Builder { initial_mutator: Option>, store_mutator: Option>, execution_layer: Option, - execution_layer_runtime: Option, mock_execution_layer: Option>, + runtime: Arc, + task_executor: TaskExecutor, + runtime_shutdown: exit_future::Signal, log: Logger, } @@ -255,6 +255,18 @@ where Cold: ItemStore, { pub fn new(eth_spec_instance: E) -> Self { + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let log = test_logger(); + let task_executor = + TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + Self { eth_spec_instance, spec: None, @@ -266,8 +278,10 @@ where store_mutator: None, execution_layer: None, mock_execution_layer: None, - execution_layer_runtime: None, - log: test_logger(), + runtime, + task_executor, + runtime_shutdown, + log, } } @@ -330,8 +344,6 @@ where "execution layer already defined" ); - let el_runtime = ExecutionLayerRuntime::default(); - let urls: Vec = urls .iter() .map(|s| SensitiveUrl::parse(*s)) @@ -344,21 +356,18 @@ where suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; - let execution_layer = ExecutionLayer::from_config( - config, - el_runtime.task_executor.clone(), - el_runtime.log.clone(), - ) - .unwrap(); + let execution_layer = + ExecutionLayer::from_config(config, self.task_executor.clone(), self.log.clone()) + .unwrap(); self.execution_layer = Some(execution_layer); - self.execution_layer_runtime = Some(el_runtime); self } pub fn mock_execution_layer(mut self) -> Self { let spec = self.spec.clone().expect("cannot build without spec"); let mock = MockExecutionLayer::new( + self.task_executor.clone(), spec.terminal_total_difficulty, DEFAULT_TERMINAL_BLOCK, spec.terminal_block_hash, @@ -383,7 +392,7 @@ where pub fn build(self) -> BeaconChainHarness> { let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); - let log = test_logger(); + let log = self.log; let spec = self.spec.expect("cannot build without spec"); let seconds_per_slot = spec.seconds_per_slot; let validator_keypairs = self @@ -395,6 +404,7 @@ where .custom_spec(spec) .store(self.store.expect("cannot build without store")) .store_migrator_config(MigratorConfig::default().blocking()) + .task_executor(self.task_executor.clone()) .execution_layer(self.execution_layer) .dummy_eth1_backend() .expect("should build dummy backend") @@ -434,8 +444,10 @@ where chain: Arc::new(chain), validator_keypairs, shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)), + runtime: self.runtime, + task_executor: self.task_executor, + runtime_shutdown: self.runtime_shutdown, mock_execution_layer: self.mock_execution_layer, - execution_layer_runtime: self.execution_layer_runtime, rng: make_rng(), } } @@ -451,9 +463,11 @@ pub struct BeaconChainHarness { pub chain: Arc>, pub spec: ChainSpec, pub shutdown_receiver: Arc>>, + pub runtime: Arc, + pub task_executor: TaskExecutor, + pub runtime_shutdown: exit_future::Signal, pub mock_execution_layer: Option>, - pub execution_layer_runtime: Option, pub rng: Mutex, } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 353b174a02b..59f1bebdb41 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -166,6 +166,7 @@ where let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log().clone()) .store(store) + .task_executor(context.executor.clone()) .custom_spec(spec.clone()) .chain_config(chain_config) .graffiti(graffiti) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 023cfa6e324..b33b96cd7c5 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1263,13 +1263,15 @@ impl ExecutionLayer { mod test { use super::*; use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer; + use task_executor::test_utils::TestRuntime; use types::MainnetEthSpec; type MockExecutionLayer = GenericMockExecutionLayer; #[tokio::test] async fn produce_three_valid_pos_execution_blocks() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .produce_valid_execution_payload_on_head() .await @@ -1281,7 +1283,8 @@ mod test { #[tokio::test] async fn finds_valid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_block_prior_to_terminal_block() .with_terminal_block(|spec, el, _| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1300,7 +1303,8 @@ mod test { #[tokio::test] async fn verifies_valid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1316,7 +1320,8 @@ mod test { #[tokio::test] async fn rejects_invalid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1334,7 +1339,8 @@ mod test { #[tokio::test] async fn rejects_unknown_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, _| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index f5a7313395b..8c6519011bd 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -2,61 +2,22 @@ use crate::{ test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY, JWT_SECRET}, Config, *, }; -use environment::null_logger; use sensitive_url::SensitiveUrl; -use std::sync::Arc; use task_executor::TaskExecutor; use tempfile::NamedTempFile; use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256}; -pub struct ExecutionLayerRuntime { - pub runtime: Option>, - pub _runtime_shutdown: exit_future::Signal, - pub task_executor: TaskExecutor, - pub log: Logger, -} - -impl Default for ExecutionLayerRuntime { - fn default() -> Self { - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), - ); - let (runtime_shutdown, exit) = exit_future::signal(); - let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let log = null_logger().unwrap(); - let task_executor = - TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); - - Self { - runtime: Some(runtime), - _runtime_shutdown: runtime_shutdown, - task_executor, - log, - } - } -} - -impl Drop for ExecutionLayerRuntime { - fn drop(&mut self) { - if let Some(runtime) = self.runtime.take() { - Arc::try_unwrap(runtime).unwrap().shutdown_background() - } - } -} - pub struct MockExecutionLayer { pub server: MockServer, pub el: ExecutionLayer, - pub el_runtime: ExecutionLayerRuntime, + pub executor: TaskExecutor, pub spec: ChainSpec, } impl MockExecutionLayer { - pub fn default_params() -> Self { + pub fn default_params(executor: TaskExecutor) -> Self { Self::new( + executor, DEFAULT_TERMINAL_DIFFICULTY.into(), DEFAULT_TERMINAL_BLOCK, ExecutionBlockHash::zero(), @@ -65,13 +26,14 @@ impl MockExecutionLayer { } pub fn new( + executor: TaskExecutor, terminal_total_difficulty: Uint256, terminal_block: u64, terminal_block_hash: ExecutionBlockHash, terminal_block_hash_activation_epoch: Epoch, ) -> Self { - let el_runtime = ExecutionLayerRuntime::default(); - let handle = el_runtime.runtime.as_ref().unwrap().handle(); + let runtime = executor.runtime().upgrade().unwrap(); + let handle = runtime.handle(); let mut spec = T::default_spec(); spec.terminal_total_difficulty = terminal_total_difficulty; @@ -97,17 +59,13 @@ impl MockExecutionLayer { suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; - let el = ExecutionLayer::from_config( - config, - el_runtime.task_executor.clone(), - el_runtime.log.clone(), - ) - .unwrap(); + let el = + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); Self { server, el, - el_runtime, + executor, spec, } } diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index b3dd80d6c5f..805f6716fbf 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -22,7 +22,7 @@ use types::{EthSpec, ExecutionBlockHash, Uint256}; use warp::{http::StatusCode, Filter, Rejection}; pub use execution_block_generator::{generate_pow_block, ExecutionBlockGenerator}; -pub use mock_execution_layer::{ExecutionLayerRuntime, MockExecutionLayer}; +pub use mock_execution_layer::MockExecutionLayer; pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index b153e9a2749..43e96df1b66 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -186,7 +186,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - tokio::spawn(server); + harness.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -265,7 +265,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - tokio::spawn(server); + harness.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index 660cc1ca011..5d7f9d49b4a 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -5,9 +5,10 @@ authors = ["Sigma Prime "] edition = "2021" [dependencies] -tokio = { version = "1.14.0", features = ["rt"] } +tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread"] } slog = "2.5.2" futures = "0.3.7" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../lighthouse_metrics" } +sloggers = { version = "2.1.1", features = ["json"] } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 2d3e941a3eb..608620a658b 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -1,10 +1,11 @@ mod metrics; +pub mod test_utils; use futures::channel::mpsc::Sender; use futures::prelude::*; use slog::{crit, debug, o, trace}; use std::sync::Weak; -use tokio::runtime::Runtime; +pub use tokio::runtime::Runtime; /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs new file mode 100644 index 00000000000..e13e42f64ec --- /dev/null +++ b/common/task_executor/src/test_utils.rs @@ -0,0 +1,49 @@ +use crate::TaskExecutor; +use slog::Logger; +use sloggers::{null::NullLoggerBuilder, Build}; +use std::sync::Arc; + +pub struct TestRuntime { + pub runtime: Option>, + pub _runtime_shutdown: exit_future::Signal, + pub task_executor: TaskExecutor, + pub log: Logger, +} + +impl Default for TestRuntime { + fn default() -> Self { + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let log = null_logger().unwrap(); + let task_executor = + TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + + Self { + runtime: Some(runtime), + _runtime_shutdown: runtime_shutdown, + task_executor, + log, + } + } +} + +impl Drop for TestRuntime { + fn drop(&mut self) { + if let Some(runtime) = self.runtime.take() { + Arc::try_unwrap(runtime).unwrap().shutdown_background() + } + } +} + +pub fn null_logger() -> Result { + let log_builder = NullLoggerBuilder; + log_builder + .build() + .map_err(|e| format!("Failed to start null logger: {:?}", e)) +} diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 91feef5b058..160f696542d 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -13,9 +13,7 @@ use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{future, StreamExt}; use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger}; -use sloggers::{ - file::FileLoggerBuilder, null::NullLoggerBuilder, types::Format, types::Severity, Build, -}; +use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build}; use std::fs::create_dir_all; use std::path::PathBuf; use std::sync::Arc; @@ -33,6 +31,8 @@ use { #[cfg(not(target_family = "unix"))] use {futures::channel::oneshot, std::cell::RefCell}; +pub use task_executor::test_utils::null_logger; + const LOG_CHANNEL_SIZE: usize = 2048; /// The maximum time in seconds the client will wait for all internal tasks to shutdown. const MAXIMUM_SHUTDOWN_TIME: u64 = 15; @@ -506,13 +506,6 @@ impl Environment { } } -pub fn null_logger() -> Result { - let log_builder = NullLoggerBuilder; - log_builder - .build() - .map_err(|e| format!("Failed to start null logger: {:?}", e)) -} - #[cfg(target_family = "unix")] struct SignalFuture { signal: Signal, From 1319ce23b6436c748b0c14b4da9040f02484e74e Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 13 May 2022 16:16:20 +1000 Subject: [PATCH 2/6] Implement handle enum --- Cargo.lock | 1 + beacon_node/beacon_chain/src/test_utils.rs | 41 +++------ beacon_node/execution_layer/src/lib.rs | 12 +-- .../src/test_utils/mock_execution_layer.rs | 5 +- beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/tests/tests.rs | 8 +- common/task_executor/src/lib.rs | 64 +++++++++----- common/task_executor/src/test_utils.rs | 26 ++++-- validator_client/src/http_api/keystores.rs | 33 ++++---- validator_client/src/http_api/mod.rs | 84 +++++++++---------- validator_client/src/http_api/remotekeys.rs | 36 ++++---- validator_client/src/lib.rs | 2 +- 12 files changed, 164 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73253467055..be6e844dc90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2532,6 +2532,7 @@ dependencies = [ "slot_clock", "state_processing", "store", + "task_executor", "tokio", "tokio-stream", "tree_hash", diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3e4bc9adbd7..2dc1d0301db 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -18,7 +18,6 @@ use execution_layer::{ use futures::channel::mpsc::Receiver; pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; -use logging::test_logger; use merkle_proof::MerkleTree; use parking_lot::Mutex; use parking_lot::RwLockWriteGuard; @@ -39,7 +38,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; -use task_executor::{Runtime, ShutdownReason, TaskExecutor}; +use task_executor::{test_utils::TestRuntime, ShutdownReason}; use tree_hash::TreeHash; use types::sync_selection_proof::SyncSelectionProof; pub use types::test_utils::generate_deterministic_keypairs; @@ -150,9 +149,7 @@ pub struct Builder { store_mutator: Option>, execution_layer: Option, mock_execution_layer: Option>, - runtime: Arc, - task_executor: TaskExecutor, - runtime_shutdown: exit_future::Signal, + runtime: TestRuntime, log: Logger, } @@ -255,17 +252,8 @@ where Cold: ItemStore, { pub fn new(eth_spec_instance: E) -> Self { - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), - ); - let (runtime_shutdown, exit) = exit_future::signal(); - let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let log = test_logger(); - let task_executor = - TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + let runtime = TestRuntime::default(); + let log = runtime.log.clone(); Self { eth_spec_instance, @@ -279,8 +267,6 @@ where execution_layer: None, mock_execution_layer: None, runtime, - task_executor, - runtime_shutdown, log, } } @@ -356,9 +342,12 @@ where suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; - let execution_layer = - ExecutionLayer::from_config(config, self.task_executor.clone(), self.log.clone()) - .unwrap(); + let execution_layer = ExecutionLayer::from_config( + config, + self.runtime.task_executor.clone(), + self.log.clone(), + ) + .unwrap(); self.execution_layer = Some(execution_layer); self @@ -367,7 +356,7 @@ where pub fn mock_execution_layer(mut self) -> Self { let spec = self.spec.clone().expect("cannot build without spec"); let mock = MockExecutionLayer::new( - self.task_executor.clone(), + self.runtime.task_executor.clone(), spec.terminal_total_difficulty, DEFAULT_TERMINAL_BLOCK, spec.terminal_block_hash, @@ -404,7 +393,7 @@ where .custom_spec(spec) .store(self.store.expect("cannot build without store")) .store_migrator_config(MigratorConfig::default().blocking()) - .task_executor(self.task_executor.clone()) + .task_executor(self.runtime.task_executor.clone()) .execution_layer(self.execution_layer) .dummy_eth1_backend() .expect("should build dummy backend") @@ -445,8 +434,6 @@ where validator_keypairs, shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)), runtime: self.runtime, - task_executor: self.task_executor, - runtime_shutdown: self.runtime_shutdown, mock_execution_layer: self.mock_execution_layer, rng: make_rng(), } @@ -463,9 +450,7 @@ pub struct BeaconChainHarness { pub chain: Arc>, pub spec: ChainSpec, pub shutdown_receiver: Arc>>, - pub runtime: Arc, - pub task_executor: TaskExecutor, - pub runtime_shutdown: exit_future::Signal, + pub runtime: TestRuntime, pub mock_execution_layer: Option>, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index b33b96cd7c5..3b9e94aabf5 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -304,11 +304,7 @@ impl ExecutionLayer { T: Fn(&'a Self) -> U, U: Future>, { - let runtime = self - .executor() - .runtime() - .upgrade() - .ok_or(Error::ShuttingDown)?; + let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; // TODO(merge): respect the shutdown signal. runtime.block_on(generate_future(self)) } @@ -322,11 +318,7 @@ impl ExecutionLayer { T: Fn(&'a Self) -> U, U: Future, { - let runtime = self - .executor() - .runtime() - .upgrade() - .ok_or(Error::ShuttingDown)?; + let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; // TODO(merge): respect the shutdown signal. Ok(runtime.block_on(generate_future(self))) } diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index 8c6519011bd..5770a8a3821 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -32,8 +32,7 @@ impl MockExecutionLayer { terminal_block_hash: ExecutionBlockHash, terminal_block_hash_activation_epoch: Epoch, ) -> Self { - let runtime = executor.runtime().upgrade().unwrap(); - let handle = runtime.handle(); + let handle = executor.handle().unwrap(); let mut spec = T::default_spec(); spec.terminal_total_difficulty = terminal_total_difficulty; @@ -41,7 +40,7 @@ impl MockExecutionLayer { spec.terminal_block_hash_activation_epoch = terminal_block_hash_activation_epoch; let server = MockServer::new( - handle, + &handle, terminal_total_difficulty, terminal_block, terminal_block_hash, diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0e20f5c8b8d..f982f0d0229 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -30,6 +30,7 @@ futures = "0.3.8" execution_layer = {path = "../execution_layer"} parking_lot = "0.12.0" safe_arith = {path = "../../consensus/safe_arith"} +task_executor = { path = "../../common/task_executor" } [dev-dependencies] diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 43e96df1b66..b05231e9a66 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -20,6 +20,7 @@ use slot_clock::SlotClock; use state_processing::per_slot_processing; use std::convert::TryInto; use std::sync::Arc; +use task_executor::test_utils::TestRuntime; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tree_hash::TreeHash; @@ -63,6 +64,7 @@ struct ApiTester { network_rx: mpsc::UnboundedReceiver>, local_enr: Enr, external_peer_id: PeerId, + _runtime: TestRuntime, } impl ApiTester { @@ -186,7 +188,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - harness.task_executor.spawn(server, "api_server"); + harness.runtime.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -213,6 +215,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + _runtime: harness.runtime, } } @@ -265,7 +268,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - harness.task_executor.spawn(server, "api_server"); + harness.runtime.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -292,6 +295,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + _runtime: harness.runtime, } } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 608620a658b..752c999ca7c 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -5,7 +5,7 @@ use futures::channel::mpsc::Sender; use futures::prelude::*; use slog::{crit, debug, o, trace}; use std::sync::Weak; -pub use tokio::runtime::Runtime; +use tokio::runtime; /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] @@ -25,11 +25,40 @@ impl ShutdownReason { } } +#[derive(Clone)] +pub enum Handle { + Runtime(Weak), + Handle(runtime::Handle), +} + +impl From for Handle { + fn from(handle: runtime::Handle) -> Self { + Handle::Handle(handle) + } +} + +impl From> for Handle { + fn from(weak_runtime: Weak) -> Self { + Handle::Runtime(weak_runtime) + } +} + +impl Handle { + pub fn handle(&self) -> Option { + match self { + Handle::Runtime(weak_runtime) => weak_runtime + .upgrade() + .map(|runtime| runtime.handle().clone()), + Handle::Handle(handle) => Some(handle.clone()), + } + } +} + /// A wrapper over a runtime handle which can spawn async and blocking tasks. #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned - runtime: Weak, + handle: Handle, /// The receiver exit future which on receiving shuts down the task exit: exit_future::Exit, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -43,17 +72,14 @@ pub struct TaskExecutor { impl TaskExecutor { /// Create a new task executor. - /// - /// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from - /// a [`RuntimeContext`](struct.RuntimeContext.html) - pub fn new( - runtime: Weak, + pub fn new>( + handle: T, exit: exit_future::Exit, log: slog::Logger, signal_tx: Sender, ) -> Self { Self { - runtime, + handle: handle.into(), exit, signal_tx, log, @@ -63,7 +89,7 @@ impl TaskExecutor { /// Clones the task executor adding a service name. pub fn clone_with_name(&self, service_name: String) -> Self { TaskExecutor { - runtime: self.runtime.clone(), + handle: self.handle.clone(), exit: self.exit.clone(), signal_tx: self.signal_tx.clone(), log: self.log.new(o!("service" => service_name)), @@ -95,8 +121,8 @@ impl TaskExecutor { let mut shutdown_sender = self.shutdown_sender(); let log = self.log.clone(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(async move { + if let Some(handle) = self.handle.handle() { + handle.spawn(async move { let timer = metrics::start_timer_vec(&metrics::TASKS_HISTOGRAM, &[name]); if let Err(join_error) = task_handle.await { if let Ok(panic) = join_error.try_into_panic() { @@ -161,8 +187,8 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(future); + if let Some(handle) = self.handle.handle() { + handle.spawn(future); } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); } @@ -212,8 +238,8 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - Some(runtime.spawn(future)) + if let Some(handle) = self.handle.handle() { + Some(handle.spawn(future)) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); None @@ -243,8 +269,8 @@ impl TaskExecutor { let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]); metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); - let join_handle = if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn_blocking(task) + let join_handle = if let Some(handle) = self.handle.handle() { + handle.spawn_blocking(task) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); return None; @@ -269,8 +295,8 @@ impl TaskExecutor { Some(future) } - pub fn runtime(&self) -> Weak { - self.runtime.clone() + pub fn handle(&self) -> Option { + self.handle.handle() } /// Returns a copy of the `exit_future::Exit`. diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index e13e42f64ec..10d6bd47bb0 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -2,6 +2,7 @@ use crate::TaskExecutor; use slog::Logger; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; +use tokio::runtime; pub struct TestRuntime { pub runtime: Option>, @@ -12,20 +13,27 @@ pub struct TestRuntime { impl Default for TestRuntime { fn default() -> Self { - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), - ); let (runtime_shutdown, exit) = exit_future::signal(); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let log = null_logger().unwrap(); - let task_executor = - TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + + let (runtime, handle) = if let Ok(handle) = runtime::Handle::try_current() { + (None, handle) + } else { + let runtime = Arc::new( + runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let handle = runtime.handle().clone(); + (Some(runtime), handle) + }; + + let task_executor = TaskExecutor::new(handle, exit, log.clone(), shutdown_tx); Self { - runtime: Some(runtime), + runtime, _runtime_shutdown: runtime_shutdown, task_executor, log, diff --git a/validator_client/src/http_api/keystores.rs b/validator_client/src/http_api/keystores.rs index 63cd9460639..f88aacfca8d 100644 --- a/validator_client/src/http_api/keystores.rs +++ b/validator_client/src/http_api/keystores.rs @@ -14,8 +14,8 @@ use slog::{info, warn, Logger}; use slot_clock::SlotClock; use std::path::PathBuf; use std::sync::Arc; -use std::sync::Weak; -use tokio::runtime::Runtime; +use task_executor::TaskExecutor; +use tokio::runtime::Handle; use types::{EthSpec, PublicKeyBytes}; use validator_dir::Builder as ValidatorDirBuilder; use warp::Rejection; @@ -59,7 +59,7 @@ pub fn import( request: ImportKeystoresRequest, validator_dir: PathBuf, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { // Check request validity. This is the only cases in which we should return a 4xx code. @@ -122,14 +122,14 @@ pub fn import( ImportKeystoreStatus::Error, format!("slashing protection import failed: {:?}", e), ) - } else if let Some(runtime) = runtime.upgrade() { + } else if let Some(handle) = task_executor.handle() { // Import the keystore. match import_single_keystore( keystore, password, validator_dir.clone(), &validator_store, - runtime, + handle, ) { Ok(status) => Status::ok(status), Err(e) => { @@ -159,7 +159,7 @@ fn import_single_keystore( password: ZeroizeString, validator_dir_path: PathBuf, validator_store: &ValidatorStore, - runtime: Arc, + handle: Handle, ) -> Result { // Check if the validator key already exists, erroring if it is a remote signer validator. let pubkey = keystore @@ -198,7 +198,7 @@ fn import_single_keystore( let voting_keystore_path = validator_dir.voting_keystore_path(); drop(validator_dir); - runtime + handle .block_on(validator_store.add_validator_keystore( voting_keystore_path, password, @@ -214,7 +214,7 @@ fn import_single_keystore( pub fn delete( request: DeleteKeystoresRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { // Remove from initialized validators. @@ -225,8 +225,11 @@ pub fn delete( .pubkeys .iter() .map(|pubkey_bytes| { - match delete_single_keystore(pubkey_bytes, &mut initialized_validators, runtime.clone()) - { + match delete_single_keystore( + pubkey_bytes, + &mut initialized_validators, + task_executor.clone(), + ) { Ok(status) => Status::ok(status), Err(error) => { warn!( @@ -244,8 +247,8 @@ pub fn delete( // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(initialized_validators.update_validators()) .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; } @@ -278,14 +281,14 @@ pub fn delete( fn delete_single_keystore( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - runtime: Weak, + task_executor: TaskExecutor, ) -> Result { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let pubkey = pubkey_bytes .decompress() .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match runtime.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) + match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) { Ok(_) => Ok(DeleteKeystoreStatus::Deleted), Err(e) => match e { diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 1207ed3b088..650dbd24763 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -22,8 +22,8 @@ use std::future::Future; use std::marker::PhantomData; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; -use std::sync::{Arc, Weak}; -use tokio::runtime::Runtime; +use std::sync::Arc; +use task_executor::TaskExecutor; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; use warp::{ @@ -59,7 +59,7 @@ impl From for Error { /// /// The server will gracefully handle the case where any fields are `None`. pub struct Context { - pub runtime: Weak, + pub task_executor: TaskExecutor, pub api_secret: ApiSecret, pub validator_store: Option>>, pub validator_dir: Option, @@ -161,8 +161,8 @@ pub fn serve( }) }); - let inner_runtime = ctx.runtime.clone(); - let runtime_filter = warp::any().map(move || inner_runtime.clone()); + let inner_task_executor = ctx.task_executor.clone(); + let task_executor_filter = warp::any().map(move || inner_task_executor.clone()); let inner_validator_dir = ctx.validator_dir.clone(); let validator_dir_filter = warp::any() @@ -290,18 +290,18 @@ pub fn serve( .and(validator_store_filter.clone()) .and(spec_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: Vec, validator_dir: PathBuf, validator_store: Arc>, spec: Arc, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let (validators, mnemonic) = - runtime.block_on(create_validators_mnemonic( + handle.block_on(create_validators_mnemonic( None, None, &body, @@ -316,7 +316,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(response)) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "TaskExecutor shutdown".into(), )) } }) @@ -333,16 +333,16 @@ pub fn serve( .and(validator_store_filter.clone()) .and(spec_filter) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, validator_store: Arc>, spec: Arc, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let mnemonic = mnemonic_from_phrase(body.mnemonic.as_str()).map_err(|e| { warp_utils::reject::custom_bad_request(format!( @@ -351,7 +351,7 @@ pub fn serve( )) })?; let (validators, _mnemonic) = - runtime.block_on(create_validators_mnemonic( + handle.block_on(create_validators_mnemonic( Some(mnemonic), Some(body.key_derivation_path_offset), &body.validators, @@ -362,7 +362,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(validators)) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "TaskExecutor shutdown".into(), )) } }) @@ -378,13 +378,13 @@ pub fn serve( .and(validator_dir_filter.clone()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { // Check to ensure the password is correct. let keypair = body @@ -416,8 +416,8 @@ pub fn serve( let suggested_fee_recipient = body.suggested_fee_recipient; let validator_def = { - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(validator_store.add_validator_keystore( voting_keystore_path, voting_password, @@ -433,7 +433,7 @@ pub fn serve( })? } else { return Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "TaskExecutor shutdown".into(), )); } }; @@ -455,14 +455,14 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: Vec, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let web3signers: Vec = body .into_iter() .map(|web3signer| ValidatorDefinition { @@ -478,14 +478,14 @@ pub fn serve( }, }) .collect(); - runtime.block_on(create_validators_web3signer( + handle.block_on(create_validators_web3signer( web3signers, &validator_store, ))?; Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "TaskExecutor shutdown".into(), )) } }) @@ -500,13 +500,13 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { let initialized_validators_rw_lock = validator_store.initialized_validators(); let mut initialized_validators = initialized_validators_rw_lock.write(); @@ -518,8 +518,8 @@ pub fn serve( ))), Some(enabled) if enabled == body.enabled => Ok(()), Some(_) => { - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on( initialized_validators .set_validator_status(&validator_pubkey, body.enabled), @@ -533,7 +533,7 @@ pub fn serve( Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "TaskExecutor shutdown".into(), )) } } @@ -574,12 +574,12 @@ pub fn serve( .and(signer.clone()) .and(validator_dir_filter) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) .and_then( - |request, signer, validator_dir, validator_store, runtime, log| { + |request, signer, validator_dir, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - keystores::import(request, validator_dir, validator_store, runtime, log) + keystores::import(request, validator_dir, validator_store, task_executor, log) }) }, ); @@ -589,11 +589,11 @@ pub fn serve( .and(warp::body::json()) .and(signer.clone()) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - keystores::delete(request, validator_store, runtime, log) + keystores::delete(request, validator_store, task_executor, log) }) }); @@ -610,11 +610,11 @@ pub fn serve( .and(warp::body::json()) .and(signer.clone()) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - remotekeys::import(request, validator_store, runtime, log) + remotekeys::import(request, validator_store, task_executor, log) }) }); @@ -623,11 +623,11 @@ pub fn serve( .and(warp::body::json()) .and(signer) .and(validator_store_filter) - .and(runtime_filter) + .and(task_executor_filter) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - remotekeys::delete(request, validator_store, runtime, log) + remotekeys::delete(request, validator_store, task_executor, log) }) }); diff --git a/validator_client/src/http_api/remotekeys.rs b/validator_client/src/http_api/remotekeys.rs index b3702a028a9..5c3ec73de3e 100644 --- a/validator_client/src/http_api/remotekeys.rs +++ b/validator_client/src/http_api/remotekeys.rs @@ -8,8 +8,9 @@ use eth2::lighthouse_vc::std_types::{ }; use slog::{info, warn, Logger}; use slot_clock::SlotClock; -use std::sync::{Arc, Weak}; -use tokio::runtime::Runtime; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::runtime::Handle; use types::{EthSpec, PublicKeyBytes}; use url::Url; use warp::Rejection; @@ -45,7 +46,7 @@ pub fn list( pub fn import( request: ImportRemotekeysRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { info!( @@ -57,14 +58,10 @@ pub fn import( let mut statuses = Vec::with_capacity(request.remote_keys.len()); for remotekey in request.remote_keys { - let status = if let Some(runtime) = runtime.upgrade() { + let status = if let Some(handle) = task_executor.handle() { // Import the keystore. - match import_single_remotekey( - remotekey.pubkey, - remotekey.url, - &validator_store, - runtime, - ) { + match import_single_remotekey(remotekey.pubkey, remotekey.url, &validator_store, handle) + { Ok(status) => Status::ok(status), Err(e) => { warn!( @@ -91,7 +88,7 @@ fn import_single_remotekey( pubkey: PublicKeyBytes, url: String, validator_store: &ValidatorStore, - runtime: Arc, + handle: Handle, ) -> Result { if let Err(url_err) = Url::parse(&url) { return Err(format!("failed to parse remotekey URL: {}", url_err)); @@ -129,7 +126,7 @@ fn import_single_remotekey( request_timeout_ms: None, }, }; - runtime + handle .block_on(validator_store.add_validator(web3signer_validator)) .map_err(|e| format!("failed to initialize validator: {:?}", e))?; @@ -139,7 +136,7 @@ fn import_single_remotekey( pub fn delete( request: DeleteRemotekeysRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { info!( @@ -158,7 +155,7 @@ pub fn delete( match delete_single_remotekey( pubkey_bytes, &mut initialized_validators, - runtime.clone(), + task_executor.clone(), ) { Ok(status) => Status::ok(status), Err(error) => { @@ -177,8 +174,8 @@ pub fn delete( // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(initialized_validators.update_validators()) .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; } @@ -189,15 +186,14 @@ pub fn delete( fn delete_single_remotekey( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - runtime: Weak, + task_executor: TaskExecutor, ) -> Result { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let pubkey = pubkey_bytes .decompress() .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match runtime - .block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) + match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) { Ok(_) => Ok(DeleteRemotekeyStatus::Deleted), Err(e) => match e { diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 039b54496cf..43f88b54f04 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -498,7 +498,7 @@ impl ProductionValidatorClient { self.http_api_listen_addr = if self.config.http_api.enabled { let ctx = Arc::new(http_api::Context { - runtime: self.context.executor.runtime(), + task_executor: self.context.executor.clone(), api_secret, validator_store: Some(self.validator_store.clone()), validator_dir: Some(self.config.validator_dir.clone()), From 5d527b411a00db5603db38ba5833cbb012289685 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 16 May 2022 09:15:27 +1000 Subject: [PATCH 3/6] Add comments --- common/task_executor/src/test_utils.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index 10d6bd47bb0..d9dd1dd098c 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -4,9 +4,19 @@ use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; use tokio::runtime; +/// Whilst the `TestRuntime` is not necessarily useful in itself, it provides the necessary +/// components for creating a `TaskExecutor` during tests. +/// +/// If created *inside* an existing runtime, it will use a handle to that. If created *outside* any +/// existing runtime, it will create a new `Runtime` and keep it alive until the `TestRuntime` is +/// dropped. +/// +/// ## Warning +/// +/// This struct should never be used in production, only testing. pub struct TestRuntime { - pub runtime: Option>, - pub _runtime_shutdown: exit_future::Signal, + runtime: Option>, + _runtime_shutdown: exit_future::Signal, pub task_executor: TaskExecutor, pub log: Logger, } From a01f81e7cfc678119d28e5f351b2266b574837b7 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 16 May 2022 09:41:18 +1000 Subject: [PATCH 4/6] Fix tests, tidy --- beacon_node/network/src/beacon_processor/tests.rs | 13 ++++++------- common/task_executor/src/test_utils.rs | 7 ++++--- validator_client/src/http_api/tests.rs | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 0f97bc79443..1c9d323576d 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -20,7 +20,7 @@ use std::cmp; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Runtime; +use tokio::runtime::Handle; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock, @@ -324,20 +324,19 @@ impl TestRig { .unwrap(); } - fn runtime(&mut self) -> Arc { + fn handle(&mut self) -> Handle { self.environment .as_mut() .unwrap() .core_context() .executor - .runtime() - .upgrade() + .handle() .unwrap() } /// Assert that the `BeaconProcessor` doesn't produce any events in the given `duration`. pub fn assert_no_events_for(&mut self, duration: Duration) { - self.runtime().block_on(async { + self.handle().block_on(async { tokio::select! { _ = tokio::time::sleep(duration) => (), event = self.work_journal_rx.recv() => panic!( @@ -360,7 +359,7 @@ impl TestRig { .iter() .all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO)); - let (events, worker_freed_remaining) = self.runtime().block_on(async { + let (events, worker_freed_remaining) = self.handle().block_on(async { let mut events = Vec::with_capacity(expected.len()); let mut worker_freed_remaining = expected.len(); @@ -415,7 +414,7 @@ impl TestRig { /// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense /// to use the `NOTHING_TO_DO` event to ensure that execution has completed. pub fn assert_event_journal_with_timeout(&mut self, expected: &[&str], timeout: Duration) { - let events = self.runtime().block_on(async { + let events = self.handle().block_on(async { let mut events = Vec::with_capacity(expected.len()); let drain_future = async { diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index d9dd1dd098c..bd8fad3f335 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -7,9 +7,7 @@ use tokio::runtime; /// Whilst the `TestRuntime` is not necessarily useful in itself, it provides the necessary /// components for creating a `TaskExecutor` during tests. /// -/// If created *inside* an existing runtime, it will use a handle to that. If created *outside* any -/// existing runtime, it will create a new `Runtime` and keep it alive until the `TestRuntime` is -/// dropped. +/// May create its own runtime or use an existing one. /// /// ## Warning /// @@ -22,6 +20,9 @@ pub struct TestRuntime { } impl Default for TestRuntime { + /// If called *inside* an existing runtime, instantiates `Self` using handle to that runtime. If + /// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the + /// `Self` is dropped. fn default() -> Self { let (runtime_shutdown, exit) = exit_future::signal(); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index eef76eb3630..da9c8dc534f 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -102,7 +102,7 @@ impl ApiTester { spec, Some(Arc::new(DoppelgangerService::new(log.clone()))), slot_clock, - executor, + executor.clone(), log.clone(), )); @@ -113,7 +113,7 @@ impl ApiTester { let initialized_validators = validator_store.initialized_validators(); let context = Arc::new(Context { - runtime, + task_executor: executor, api_secret, validator_dir: Some(validator_dir.path().into()), validator_store: Some(validator_store.clone()), From 378e926a49d9fb3e71367d4cf0305bd5eaf0aa97 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 16 May 2022 10:21:18 +1000 Subject: [PATCH 5/6] Tidy --- common/task_executor/Cargo.toml | 2 +- common/task_executor/src/lib.rs | 66 ++++++++++++++++++---------- validator_client/src/http_api/mod.rs | 10 ++--- 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index 5d7f9d49b4a..f344dc47354 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Sigma Prime "] edition = "2021" [dependencies] -tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread"] } +tokio = { version = "1.14.0", features = ["rt-multi-thread"] } slog = "2.5.2" futures = "0.3.7" exit-future = "0.2.0" diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 752c999ca7c..dd525bea504 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -5,7 +5,7 @@ use futures::channel::mpsc::Sender; use futures::prelude::*; use slog::{crit, debug, o, trace}; use std::sync::Weak; -use tokio::runtime; +use tokio::runtime::{Handle, Runtime}; /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] @@ -25,31 +25,42 @@ impl ShutdownReason { } } +/// Provides a `Handle` by either: +/// +/// 1. Holding a `Weak` and calling `Runtime::handle`. +/// 2. Directly holding a `Handle` and cloning it. +/// +/// This enum allows the `TaskExecutor` to work in production where a `Weak` is directly +/// accessible and in testing where the `Runtime` is hidden outside our scope. #[derive(Clone)] -pub enum Handle { - Runtime(Weak), - Handle(runtime::Handle), +pub enum HandleProvider { + Runtime(Weak), + Handle(Handle), } -impl From for Handle { - fn from(handle: runtime::Handle) -> Self { - Handle::Handle(handle) +impl From for HandleProvider { + fn from(handle: Handle) -> Self { + HandleProvider::Handle(handle) } } -impl From> for Handle { - fn from(weak_runtime: Weak) -> Self { - Handle::Runtime(weak_runtime) +impl From> for HandleProvider { + fn from(weak_runtime: Weak) -> Self { + HandleProvider::Runtime(weak_runtime) } } -impl Handle { - pub fn handle(&self) -> Option { +impl HandleProvider { + /// Returns a `Handle` to a `Runtime`. + /// + /// May return `None` if the weak reference to the `Runtime` has been dropped (this generally + /// means Lighthouse is shutting down). + pub fn handle(&self) -> Option { match self { - Handle::Runtime(weak_runtime) => weak_runtime + HandleProvider::Runtime(weak_runtime) => weak_runtime .upgrade() .map(|runtime| runtime.handle().clone()), - Handle::Handle(handle) => Some(handle.clone()), + HandleProvider::Handle(handle) => Some(handle.clone()), } } } @@ -58,7 +69,7 @@ impl Handle { #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned - handle: Handle, + handle_provider: HandleProvider, /// The receiver exit future which on receiving shuts down the task exit: exit_future::Exit, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -72,14 +83,20 @@ pub struct TaskExecutor { impl TaskExecutor { /// Create a new task executor. - pub fn new>( + /// + /// ## Note + /// + /// This function should only be used during testing. In production, prefer to obtain an + /// instance of `Self` via a `environment::RuntimeContext` (see the `lighthouse/environment` + /// crate). + pub fn new>( handle: T, exit: exit_future::Exit, log: slog::Logger, signal_tx: Sender, ) -> Self { Self { - handle: handle.into(), + handle_provider: handle.into(), exit, signal_tx, log, @@ -89,7 +106,7 @@ impl TaskExecutor { /// Clones the task executor adding a service name. pub fn clone_with_name(&self, service_name: String) -> Self { TaskExecutor { - handle: self.handle.clone(), + handle_provider: self.handle_provider.clone(), exit: self.exit.clone(), signal_tx: self.signal_tx.clone(), log: self.log.new(o!("service" => service_name)), @@ -121,7 +138,7 @@ impl TaskExecutor { let mut shutdown_sender = self.shutdown_sender(); let log = self.log.clone(); - if let Some(handle) = self.handle.handle() { + if let Some(handle) = self.handle() { handle.spawn(async move { let timer = metrics::start_timer_vec(&metrics::TASKS_HISTOGRAM, &[name]); if let Err(join_error) = task_handle.await { @@ -187,7 +204,7 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(handle) = self.handle.handle() { + if let Some(handle) = self.handle() { handle.spawn(future); } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); @@ -238,7 +255,7 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(handle) = self.handle.handle() { + if let Some(handle) = self.handle() { Some(handle.spawn(future)) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); @@ -269,7 +286,7 @@ impl TaskExecutor { let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]); metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); - let join_handle = if let Some(handle) = self.handle.handle() { + let join_handle = if let Some(handle) = self.handle() { handle.spawn_blocking(task) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); @@ -295,8 +312,9 @@ impl TaskExecutor { Some(future) } - pub fn handle(&self) -> Option { - self.handle.handle() + /// Returns a `Handle` to the current runtime. + pub fn handle(&self) -> Option { + self.handle_provider.handle() } /// Returns a copy of the `exit_future::Exit`. diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 650dbd24763..bf7261a271b 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -316,7 +316,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(response)) } else { Err(warp_utils::reject::custom_server_error( - "TaskExecutor shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -362,7 +362,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(validators)) } else { Err(warp_utils::reject::custom_server_error( - "TaskExecutor shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -433,7 +433,7 @@ pub fn serve( })? } else { return Err(warp_utils::reject::custom_server_error( - "TaskExecutor shutdown".into(), + "Lighthouse shutting down".into(), )); } }; @@ -485,7 +485,7 @@ pub fn serve( Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "TaskExecutor shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -533,7 +533,7 @@ pub fn serve( Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "TaskExecutor shutdown".into(), + "Lighthouse shutting down".into(), )) } } From ff5fd8df603d101a25991d01fc6afa1c4ca9d061 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 16 May 2022 18:34:49 +1000 Subject: [PATCH 6/6] Update common/task_executor/src/test_utils.rs Co-authored-by: Mac L --- common/task_executor/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index bd8fad3f335..7d59cdf022c 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -20,7 +20,7 @@ pub struct TestRuntime { } impl Default for TestRuntime { - /// If called *inside* an existing runtime, instantiates `Self` using handle to that runtime. If + /// If called *inside* an existing runtime, instantiates `Self` using a handle to that runtime. If /// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the /// `Self` is dropped. fn default() -> Self {