Skip to content

Commit

Permalink
feat: add kill signal to cucumber nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Dec 16, 2022
1 parent 92b044a commit 4cb21dc
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 39 deletions.
12 changes: 12 additions & 0 deletions integration_tests/tests/cucumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ impl TariWorld {
pub fn all_seed_nodes(&self) -> &[String] {
self.seed_nodes.as_slice()
}

pub async fn after(&mut self, scenario: &Scenario) {
for node in self.seed_nodes.iter() {
let mut bnode = self.base_nodes.get_mut(node).unwrap();
bnode.kill().await;
}
}
}

#[given(expr = "I have a seed node {word}")]
Expand Down Expand Up @@ -236,6 +243,11 @@ async fn main() {
.summarized()
.assert_normalized(),
)
.after(|feature,rule,scenario,_ev,maybe_world| {
Box::pin(async move {
maybe_world.unwrap().after(scenario).await;
})
})
.run_and_exit("tests/features/")
.await;
}
87 changes: 48 additions & 39 deletions integration_tests/tests/utils/base_node_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct BaseNodeProcess {
pub identity: NodeIdentity,
pub temp_dir_path: String,
pub is_seed_node: bool,
pub kill_signal: Option<tokio::sync::oneshot::Sender<()>>,
}

// NOTE: implemented to skip `cx`, because BaseNodeContext doesn't implement Debug
Expand Down Expand Up @@ -76,13 +77,15 @@ pub async fn spawn_base_node(world: &mut TariWorld, is_seed_node: bool, bn_name:
let temp_dir = tempdir().unwrap();
let temp_dir_path = temp_dir.path().display().to_string();

let (kill_signal_sender, kill_signal_receiver) = tokio::sync::oneshot::channel::<()>();
let process = BaseNodeProcess {
name: bn_name.clone(),
port,
grpc_port,
identity,
temp_dir_path,
is_seed_node,
kill_signal: Some(kill_signal_sender),
};

let name_cloned = bn_name.clone();
Expand All @@ -98,44 +101,46 @@ pub async fn spawn_base_node(world: &mut TariWorld, is_seed_node: bool, bn_name:
}

let mut common_config = CommonConfig::default();
common_config.base_path = temp_dir.as_ref().to_path_buf();

task::spawn(async move {
let mut base_node_config = tari_base_node::ApplicationConfig {
common: common_config,
auto_update: AutoUpdateConfig::default(),
base_node: BaseNodeConfig::default(),
metrics: MetricsConfig::default(),
peer_seeds: PeerSeedsConfig {
peer_seeds: peer_addresses.into(),
..Default::default()
},
};

println!("Using base_node temp_dir: {}", temp_dir.path().display());
base_node_config.base_node.network = Network::LocalNet;
base_node_config.base_node.grpc_enabled = true;
base_node_config.base_node.grpc_address = Some(format!("/ip4/127.0.0.1/tcp/{}", grpc_port).parse().unwrap());
base_node_config.base_node.report_grpc_error = true;

base_node_config.base_node.data_dir = temp_dir.path().to_path_buf();
base_node_config.base_node.identity_file = temp_dir.path().join("base_node_id.json");
base_node_config.base_node.tor_identity_file = temp_dir.path().join("base_node_tor_id.json");

base_node_config.base_node.lmdb_path = temp_dir.path().to_path_buf();
base_node_config.base_node.p2p.transport.transport_type = TransportType::Tcp;
base_node_config.base_node.p2p.transport.tcp.listener_address =
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap();
base_node_config.base_node.p2p.public_address =
Some(base_node_config.base_node.p2p.transport.tcp.listener_address.clone());
base_node_config.base_node.p2p.datastore_path = temp_dir.path().to_path_buf();
base_node_config.base_node.p2p.dht = DhtConfig::default_local_test();
base_node_config.base_node.p2p.allow_test_addresses = true;

println!(
"Initializing base node: name={}; port={}; grpc_port={}; is_seed_node={}",
name_cloned, port, grpc_port, is_seed_node
);
common_config.base_path = temp_dir.clone();
task::spawn(futures::future::select(
kill_signal_receiver,
Box::pin(async move {
let mut base_node_config = tari_base_node::ApplicationConfig {
common: common_config,
auto_update: AutoUpdateConfig::default(),
base_node: BaseNodeConfig::default(),
metrics: MetricsConfig::default(),
peer_seeds: PeerSeedsConfig {
peer_seeds: peer_addresses.into(),
..Default::default()
},
};

println!("Using base_node temp_dir: {}", temp_dir.as_path().display());
base_node_config.base_node.network = Network::LocalNet;
base_node_config.base_node.grpc_enabled = true;
base_node_config.base_node.grpc_address =
Some(format!("/ip4/127.0.0.1/tcp/{}", grpc_port).parse().unwrap());
base_node_config.base_node.report_grpc_error = true;

base_node_config.base_node.data_dir = temp_dir.clone();
base_node_config.base_node.identity_file = temp_dir.join("base_node_id.json");
base_node_config.base_node.tor_identity_file = temp_dir.join("base_node_tor_id.json");

base_node_config.base_node.lmdb_path = temp_dir.clone();
base_node_config.base_node.p2p.transport.transport_type = TransportType::Tcp;
base_node_config.base_node.p2p.transport.tcp.listener_address =
format!("/ip4/0.0.0.0/tcp/{}", port).parse().unwrap();
base_node_config.base_node.p2p.public_address =
Some(format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap());
base_node_config.base_node.p2p.datastore_path = temp_dir.join("p2p");
base_node_config.base_node.p2p.dht = DhtConfig::default_testnet();
base_node_config.base_node.p2p.allow_test_addresses = true;

println!(
"Initializing base node: name={}; port={}; grpc_port={}; is_seed_node={}",
name_cloned, port, grpc_port, is_seed_node
);

let result = run_base_node(Arc::new(base_node_identity), Arc::new(base_node_config)).await;
if let Err(e) = result {
Expand All @@ -148,7 +153,6 @@ pub async fn spawn_base_node(world: &mut TariWorld, is_seed_node: bool, bn_name:
if is_seed_node {
world.seed_nodes.push(bn_name);
}

// We need to give it time for the base node to startup
// TODO: it would be better to scan the base node to detect when it has started
tokio::time::sleep(Duration::from_secs(5)).await;
Expand All @@ -164,4 +168,9 @@ impl BaseNodeProcess {
pub async fn get_grpc_client(&self) -> anyhow::Result<BaseNodeGrpcClient<Channel>> {
Ok(BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port)).await?)
}

pub async fn kill(&mut self) {
self.kill_signal.take().unwrap().send(());
sleep_until(Instant::now() + Duration::from_secs(5)).await;
}
}

0 comments on commit 4cb21dc

Please sign in to comment.