Skip to content

Commit

Permalink
[test] revert task group change (MystenLabs#812)
Browse files Browse the repository at this point in the history
Revert "Use `TaskGroup` to ensure all primary / worker tasks are cancelled on error and panic (MystenLabs#707)

This reverts commit 693e87979d6be32d29414f1639735760d55c0b21.
  • Loading branch information
akichidis committed Aug 18, 2022
1 parent 8214707 commit deca260
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 67 deletions.
7 changes: 2 additions & 5 deletions narwhal/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Executor {
tx_output: Sender<ExecutorOutput<State>>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
) -> SubscriberResult<Vec<(&'static str, JoinHandle<()>)>>
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
Expand Down Expand Up @@ -133,9 +133,6 @@ impl Executor {
// Return the handle.
info!("Consensus subscriber successfully started");

Ok(vec![
("executor_subscriber", subscriber_handle),
("executor", executor_handle),
])
Ok(vec![subscriber_handle, executor_handle])
}
}
1 change: 0 additions & 1 deletion narwhal/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3.15", features = ["time", "env-filter"] }
url = "2.2.2"
axum = "0.5.15"
task-group = "0.2.2"

config = { path = "../config" }
consensus = { path = "../consensus" }
Expand Down
41 changes: 15 additions & 26 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use store::{
rocks::{open_cf, DBMap},
Store,
};
use task_group::{TaskGroup, TaskManager};
use tokio::{
sync::{mpsc::Sender, watch},
task::{JoinError, JoinHandle},
task::JoinHandle,
};
use tracing::debug;
use types::{
Expand Down Expand Up @@ -138,7 +137,7 @@ impl Node {
tx_confirmation: Sender<ExecutorOutput<State>>,
// A prometheus exporter Registry to use for the metrics
registry: &Registry,
) -> SubscriberResult<TaskManager<JoinError>>
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
Expand Down Expand Up @@ -182,7 +181,7 @@ impl Node {
let consensus_metrics = Arc::new(ConsensusMetrics::new(registry));
let (handle, dag) = Dag::new(&committee.load(), rx_new_certificates, consensus_metrics);

handles.push(("dag", handle));
handles.push(handle);

(Some(Arc::new(dag)), NetworkModel::Asynchronous)
} else {
Expand Down Expand Up @@ -256,13 +255,7 @@ impl Node {
});
}

let (task_group, task_manager) = TaskGroup::new();
for (name, handle) in handles {
// The tasks will be awaited with the `task_manager`, so the task handles / futures can be dropped.
let _ = task_group.spawn(name, handle);
}

Ok(task_manager)
Ok(handles)
}

/// Spawn the consensus core and the client executing transactions.
Expand All @@ -280,7 +273,7 @@ impl Node {
)>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
) -> SubscriberResult<Vec<(&'static str, JoinHandle<()>)>>
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
PublicKey: VerifyingKey,
State: ExecutionState + Send + Sync + 'static,
Expand All @@ -293,15 +286,13 @@ impl Node {
let (tx_sequence, rx_sequence) =
metered_channel::channel(Self::CHANNEL_CAPACITY, &channel_metrics.tx_sequence);

let mut handles = Vec::new();

// Spawn the consensus core who only sequences transactions.
let ordering_engine = Bullshark::new(
(**committee.load()).clone(),
store.consensus_store.clone(),
parameters.gc_depth,
);
let consensus_handle = Consensus::spawn(
let consensus_handles = Consensus::spawn(
(**committee.load()).clone(),
store.consensus_store.clone(),
store.certificate_store.clone(),
Expand All @@ -313,7 +304,6 @@ impl Node {
consensus_metrics.clone(),
parameters.gc_depth,
);
handles.push(("consensus", consensus_handle));

// Spawn the client executing the transactions. It can also synchronize with the
// subscriber handler if it missed some transactions.
Expand All @@ -327,9 +317,11 @@ impl Node {
registry,
)
.await?;
handles.extend(executor_handles);

Ok(handles)
Ok(executor_handles
.into_iter()
.chain(std::iter::once(consensus_handles))
.collect())
}

/// Spawn a specified number of workers.
Expand All @@ -346,10 +338,11 @@ impl Node {
parameters: Parameters,
// The prometheus metrics Registry
registry: &Registry,
) -> TaskManager<JoinError> {
) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new();

let metrics = initialise_metrics(registry);

let (task_group, task_manager) = TaskGroup::new();
for id in ids {
let worker_handles = Worker::spawn(
name.clone(),
Expand All @@ -359,12 +352,8 @@ impl Node {
store.batch_store.clone(),
metrics.clone(),
);
// TODO(narwhal/727): propagate worker task names.
for (i, h) in worker_handles.into_iter().enumerate() {
let _ = task_group.spawn(format!("worker_{}_{}", id, i), h);
}
handles.extend(worker_handles);
}

task_manager
handles
}
}
15 changes: 7 additions & 8 deletions narwhal/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand};
use config::{Committee, Import, Parameters, WorkerId};
use crypto::{generate_production_keypair, traits::KeyPair as _, KeyPair};
use executor::{SerializedTransaction, SubscriberResult};
use eyre::{eyre, Context};
use eyre::Context;
use futures::future::join_all;
use node::{
execution_state::SimpleExecutionState,
metrics::{primary_metrics_registry, start_prometheus_server, worker_metrics_registry},
Expand Down Expand Up @@ -155,7 +156,7 @@ async fn run(matches: &ArgMatches<'_>) -> Result<(), eyre::Report> {
let registry;

// Check whether to run a primary, a worker, or an entire authority.
let task_manager = match matches.subcommand() {
let node_handles = match matches.subcommand() {
// Spawn the primary and consensus core.
("primary", Some(sub_matches)) => {
registry = primary_metrics_registry(keypair.public().clone());
Expand Down Expand Up @@ -208,12 +209,10 @@ async fn run(matches: &ArgMatches<'_>) -> Result<(), eyre::Report> {
analyze(rx_transaction_confirmation).await;

// Await on the completion handles of all the nodes we have launched
task_manager.await.map_err(|err| match err {
task_group::RuntimeError::Panic { name: n, panic: p } => eyre!("{} paniced: {:?}", n, p),
task_group::RuntimeError::Application { name: n, error: e } => {
eyre!("{} error: {:?}", n, e)
}
})
join_all(node_handles).await;

// If this expression is reached, the program ends and all other tasks terminate.
Ok(())
}

/// Receives an ordered list of certificates and apply any application-specific logic.
Expand Down
12 changes: 6 additions & 6 deletions narwhal/node/src/restarter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl NodeRestarter {
let mut name = keypair.public().clone();
let mut committee = committee.clone();

let mut task_managers = Vec::new();
let mut handles = Vec::new();
let mut primary_network = WorkerToPrimaryNetwork::default();
let mut worker_network = PrimaryToWorkerNetwork::default();

Expand All @@ -49,7 +49,7 @@ impl NodeRestarter {
let store = NodeStorage::reopen(store_path);

// Restart the relevant components.
let primary = Node::spawn_primary(
let primary_handles = Node::spawn_primary(
keypair,
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
&store,
Expand All @@ -62,7 +62,7 @@ impl NodeRestarter {
.await
.unwrap();

let workers = Node::spawn_workers(
let worker_handles = Node::spawn_workers(
name.clone(),
/* worker_ids */ vec![0],
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
Expand All @@ -71,8 +71,8 @@ impl NodeRestarter {
registry,
);

task_managers.push(primary);
task_managers.push(workers);
handles.extend(primary_handles);
handles.extend(worker_handles);

// Wait for a committee change.
let (new_keypair, new_committee) = match rx_reconfigure.recv().await {
Expand Down Expand Up @@ -111,7 +111,7 @@ impl NodeRestarter {
worker_network.cleanup(committee.network_diff(&new_committee));

// Wait for the components to shut down.
join_all(task_managers.drain(..)).await;
join_all(handles.drain(..)).await;
tracing::debug!("All tasks successfully exited");

// Give it an extra second in case the last task to exit is a network server. The OS
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async fn epoch_change() {
}
});

let _primary = Node::spawn_primary(
let _primary_handles = Node::spawn_primary(
keypair,
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
&store,
Expand Down
28 changes: 14 additions & 14 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Primary {
tx_reconfigure: watch::Sender<ReconfigureNotification>,
tx_committed_certificates: Sender<Certificate>,
registry: &Registry,
) -> Vec<(&str, JoinHandle<()>)> {
) -> Vec<JoinHandle<()>> {
// Write the parameters to the logs.
parameters.tracing();

Expand Down Expand Up @@ -447,22 +447,22 @@ impl Primary {
);

let mut handles = vec![
("primary_receiver", primary_receiver_handle),
("worker_receiver", worker_receiver_handle),
("core", core_handle),
("payload_receiver", payload_receiver_handle),
("block_synchronizer", block_synchronizer_handle),
("block_waiter", block_waiter_handle),
("block_remover", block_remover_handle),
("header_waiter", header_waiter_handle),
("certificate_waiter", certificate_waiter_handle),
("proposer", proposer_handle),
("helper", helper_handle),
("state_handler", state_handler_handle),
primary_receiver_handle,
worker_receiver_handle,
core_handle,
payload_receiver_handle,
block_synchronizer_handle,
block_waiter_handle,
block_remover_handle,
header_waiter_handle,
certificate_waiter_handle,
proposer_handle,
helper_handle,
state_handler_handle,
];

if let Some(h) = consensus_api_handle {
handles.push(("consensus_api", h));
handles.push(h);
}

handles
Expand Down
12 changes: 6 additions & 6 deletions narwhal/primary/tests/epoch_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async fn test_restart_with_new_committee_change() {
let (tx_reconfigure, _rx_reconfigure) = watch::channel(initial_committee);

let store = NodeStorage::reopen(temp_dir());
let registry = Registry::new();

let primary_handles = Primary::spawn(
name,
signer,
Expand All @@ -338,9 +338,9 @@ async fn test_restart_with_new_committee_change() {
NetworkModel::Asynchronous,
tx_reconfigure,
/* tx_committed_certificates */ tx_feedback,
&registry,
&Registry::new(),
);
handles.extend(primary_handles.into_iter().map(|(_n, j)| j));
handles.extend(primary_handles);
}

// Run for a while in epoch 0.
Expand Down Expand Up @@ -398,7 +398,7 @@ async fn test_restart_with_new_committee_change() {
test_utils::test_get_block_commands!(1);

let store = NodeStorage::reopen(temp_dir());
let registry = Registry::new();

let primary_handles = Primary::spawn(
name,
signer,
Expand All @@ -415,9 +415,9 @@ async fn test_restart_with_new_committee_change() {
NetworkModel::Asynchronous,
tx_reconfigure,
/* tx_committed_certificates */ tx_feedback,
&registry,
&Registry::new(),
);
handles.extend(primary_handles.into_iter().map(|(_n, j)| j));
handles.extend(primary_handles);
}

// Run for a while.
Expand Down
37 changes: 37 additions & 0 deletions narwhal/primary/tests/nodes_bootstrapping_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,43 @@ use std::time::Duration;
use telemetry_subscribers::TelemetryGuards;
use test_utils::cluster::Cluster;

use types::{PublicKeyProto, RoundsRequest};

#[tokio::test]
async fn test_shutdown_bug() {
// Enabled debug tracing so we can easily observe the
// nodes logs.
let _guard = setup_tracing();

let delay = Duration::from_secs(10); // 10 seconds

// A cluster of 4 nodes will be created
let cluster = Cluster::new(None, None, false);

// ==== Start first authority ====
let authority = cluster.authority(0);
authority.start(false, Some(1)).await;

tokio::time::sleep(delay).await;

authority.stop_all().await;

tokio::time::sleep(delay).await;

let mut client = authority.new_proposer_client().await;

// send a sample rounds request
let request = tonic::Request::new(RoundsRequest {
public_key: Some(PublicKeyProto::from(authority.name.clone())),
});
let response = client.rounds(request).await;

// Should get back an error response - however this test will fail
// as we keep getting an OK response back , which shouldn't happen , as we
// stopped the node.
assert!(response.is_err());
}

/// Nodes will be started in a staggered fashion. This is simulating
/// a real world scenario where nodes across validators will not start
/// in the same time.
Expand Down

0 comments on commit deca260

Please sign in to comment.