Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isolate shotovers tokio runtime in integration tests #912

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 18 additions & 34 deletions shotover-proxy/src/runner.rs
Expand Up @@ -8,7 +8,7 @@ use clap::{crate_version, Parser};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::env;
use std::net::SocketAddr;
use tokio::runtime::{self, Handle as RuntimeHandle, Runtime};
use tokio::runtime::{self, Runtime};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -63,8 +63,7 @@ impl Default for ConfigOpts {
}

pub struct Runner {
runtime: Option<Runtime>,
runtime_handle: RuntimeHandle,
runtime: Runtime,
topology: Topology,
config: Config,
tracing: TracingState,
Expand All @@ -77,11 +76,10 @@ impl Runner {

let tracing = TracingState::new(config.main_log_level.as_str(), params.log_format)?;

let (runtime_handle, runtime) = Runner::get_runtime(params.stack_size, params.core_threads);
let runtime = Runner::get_runtime(params.stack_size, params.core_threads);

Ok(Runner {
runtime,
runtime_handle,
topology,
config,
tracing,
Expand All @@ -96,20 +94,19 @@ impl Runner {
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(handle, socket, self.tracing.handle.clone());

self.runtime_handle.spawn(exporter.async_run());
self.runtime.spawn(exporter.async_run());

Ok(self)
}

pub fn run_spawn(self) -> RunnerSpawned {
let (trigger_shutdown_tx, trigger_shutdown_rx) = watch::channel(false);

let join_handle =
self.runtime_handle
.spawn(run(self.topology, self.config, trigger_shutdown_rx));
let join_handle = self
.runtime
.spawn(run(self.topology, self.config, trigger_shutdown_rx));

RunnerSpawned {
runtime_handle: self.runtime_handle,
runtime: self.runtime,
tracing_guard: self.tracing.guard,
trigger_shutdown_tx,
Expand All @@ -120,7 +117,7 @@ impl Runner {
pub fn run_block(self) -> Result<()> {
let (trigger_shutdown_tx, trigger_shutdown_rx) = watch::channel(false);

self.runtime_handle.spawn(async move {
self.runtime.spawn(async move {
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
let mut terminate = signal(SignalKind::terminate()).unwrap();

Expand All @@ -136,31 +133,19 @@ impl Runner {
trigger_shutdown_tx.send(true).unwrap();
});

self.runtime_handle
self.runtime
.block_on(run(self.topology, self.config, trigger_shutdown_rx))
}

/// Get handle for an existing runtime or create one
fn get_runtime(stack_size: usize, core_threads: usize) -> (RuntimeHandle, Option<Runtime>) {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
// Using block_in_place to trigger a panic in case the runtime is set up in single-threaded mode.
// Shotover does not function correctly in single threaded mode (currently hangs)
// and block_in_place gives an error message explaining to setup the runtime in multi-threaded mode.
// This does not protect us when calling Runtime::enter() or when no runtime is set up at all.
tokio::task::block_in_place(|| {});

(handle, None)
} else {
let runtime = runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("Shotover-Proxy-Thread")
.thread_stack_size(stack_size)
.worker_threads(core_threads)
.build()
.unwrap();

(runtime.handle().clone(), Some(runtime))
}
fn get_runtime(stack_size: usize, core_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("Shotover-Proxy-Thread")
.thread_stack_size(stack_size)
.worker_threads(core_threads)
.build()
.unwrap()
}
}

Expand Down Expand Up @@ -259,8 +244,7 @@ impl ReloadHandle {
}

pub struct RunnerSpawned {
pub runtime: Option<Runtime>,
pub runtime_handle: RuntimeHandle,
pub runtime: Runtime,
pub join_handle: JoinHandle<Result<()>>,
pub tracing_guard: WorkerGuard,
pub trigger_shutdown_tx: watch::Sender<bool>,
Expand Down
31 changes: 24 additions & 7 deletions shotover-proxy/tests/helpers/mod.rs
@@ -1,7 +1,7 @@
use anyhow::Result;
use shotover_proxy::runner::{ConfigOpts, Runner};
use std::sync::mpsc;
use tokio::runtime::{Handle as RuntimeHandle, Runtime};
use tokio::runtime::Runtime;
use tokio::sync::watch;
use tokio::task::JoinHandle;

Expand All @@ -11,7 +11,6 @@ pub mod redis_connection;
#[must_use]
pub struct ShotoverManager {
pub runtime: Option<Runtime>,
pub runtime_handle: RuntimeHandle,
pub join_handle: Option<JoinHandle<Result<()>>>,
pub trigger_shutdown_tx: watch::Sender<bool>,
panic_occured_rx: mpsc::Receiver<()>,
Expand Down Expand Up @@ -64,8 +63,7 @@ impl ShotoverManager {
std::mem::forget(spawn.tracing_guard);

ShotoverManager {
runtime: spawn.runtime,
runtime_handle: spawn.runtime_handle,
runtime: Some(spawn.runtime),
join_handle: Some(spawn.join_handle),
trigger_shutdown_tx: spawn.trigger_shutdown_tx,
panic_occured_rx,
Expand All @@ -74,9 +72,28 @@ impl ShotoverManager {

fn shutdown_shotover(&mut self) -> Result<()> {
self.trigger_shutdown_tx.send(true)?;
let _enter_guard = self.runtime_handle.enter();
let _enter_guard = self.runtime.as_ref().unwrap().enter();
futures::executor::block_on(self.join_handle.take().unwrap())?
}

fn shutdown_tokio(&mut self) -> Result<()> {
// Even if shutdown fails we still need to drop the runtime this way,
// otherwise we will double panic if the regular Runtime drop impl runs
let result = self.shutdown_shotover();

// tokio has some per thread global state that sets the current runtime that is being executed in.
// If we try to drop a runtime while executing in a thread with a current runtime, the runtimes drop logic will panic.
// We can avoid this by sending the runtime off to a brand new thread and performing the drop there.
if let Some(runtime) = self.runtime.take() {
std::thread::spawn(move || {
std::mem::drop(runtime);
})
.join()
.unwrap();
}

result
}
}

impl Drop for ShotoverManager {
Expand All @@ -90,11 +107,11 @@ impl Drop for ShotoverManager {

if std::thread::panicking() {
// If already panicking do not panic while attempting to shutdown shotover in order to avoid a double panic.
if let Err(err) = self.shutdown_shotover() {
if let Err(err) = self.shutdown_tokio() {
println!("Failed to shutdown shotover: {err}")
}
} else {
self.shutdown_shotover().unwrap();
self.shutdown_tokio().unwrap();

// When a panic occurs in a shotover tokio task that isnt joined on, tokio will catch the panic, print the panic message and shotover will continue running happily.
// This behaviour is reasonable and makes shotover more robust but in our integration tests we want to ensure that panics never ever occur.
Expand Down
25 changes: 0 additions & 25 deletions shotover-proxy/tests/runner/runner_int_tests.rs
@@ -1,33 +1,8 @@
use serial_test::serial;
use std::any::Any;

use crate::helpers::ShotoverManager;
use test_helpers::shotover_process::ShotoverProcess;

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_runtime_use_existing() {
let shotover_manager =
ShotoverManager::from_topology_file("example-configs/null-redis/topology.yaml");

// Assert that shotover is using the test runtime
let handle = tokio::runtime::Handle::current();
assert_eq!(handle.type_id(), shotover_manager.runtime_handle.type_id());

// Assert that shotover did not create a runtime for itself
assert!(shotover_manager.runtime.is_none());
}

#[test]
#[serial]
fn test_runtime_create() {
let shotover_manager =
ShotoverManager::from_topology_file("example-configs/null-redis/topology.yaml");

// Assert that shotover created a runtime for itself
assert!(shotover_manager.runtime.is_some());
}

#[test]
#[serial]
fn test_early_shutdown_cassandra_source() {
Expand Down