Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup shotover::Runner api #1103

Merged
merged 4 commits into from Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 3 additions & 31 deletions shotover-proxy/src/main.rs
@@ -1,33 +1,5 @@
#![warn(rust_2018_idioms)]
#![recursion_limit = "256"]
use shotover::runner::Shotover;

use anyhow::{Context, Result};
use clap::Parser;
use shotover::runner::{ConfigOpts, Runner, TracingState};
use tokio::runtime::Runtime;

fn main() -> Result<()> {
let opts = ConfigOpts::parse();
let log_format = opts.log_format.clone();
match run(opts) {
Ok(()) => Ok(()),
Err(err) => {
let rt = Runtime::new()
.context("Failed to create runtime while trying to report {err:?}")
.unwrap();
let _guard = rt.enter();
let _tracing_state = TracingState::new("error", log_format)
.context("Failed to create TracingState while trying to report {err:?}")
.unwrap();

tracing::error!("{:?}", err.context("Failed to start shotover"));
std::process::exit(1);
}
}
}

fn run(opts: ConfigOpts) -> Result<()> {
Runner::new(opts)?
.with_observability_interface()?
.run_block()
fn main() {
Shotover::new().run_block();
}
53 changes: 30 additions & 23 deletions shotover-proxy/tests/runner/runner_int_tests.rs
Expand Up @@ -59,12 +59,13 @@ async fn test_shotover_shutdown_when_invalid_topology_non_terminating_last() {
.assert_fails_to_start(&[EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover::runner")
.with_message(
"Topology errors
redis_chain:
Non-terminating transform \"DebugPrinter\" is last in chain. Last transform must be terminating.
",
)])
.with_message("Failed to start shotover

Caused by:
Topology errors
redis_chain:
Non-terminating transform \"DebugPrinter\" is last in chain. Last transform must be terminating.
")])
.await;
}

Expand All @@ -77,10 +78,13 @@ async fn test_shotover_shutdown_when_invalid_topology_terminating_not_last() {
.assert_fails_to_start(&[EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover::runner")
.with_message("Topology errors
redis_chain:
Terminating transform \"NullSink\" is not last in chain. Terminating transform must be last in chain.
")])
.with_message("Failed to start shotover

Caused by:
Topology errors
redis_chain:
Terminating transform \"NullSink\" is not last in chain. Terminating transform must be last in chain.
")])
.await;
}

Expand All @@ -93,23 +97,26 @@ async fn test_shotover_shutdown_when_topology_invalid_topology_subchains() {
&[
EventMatcher::new().with_level(Level::Error)
.with_target("shotover::runner")
.with_message(r#"Topology errors
a_first_chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_second_chain:
TuneableConsistencyScatter:
a_chain_1:
.with_message(r#"Failed to start shotover

Caused by:
Topology errors
a_first_chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_chain_2:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
c_chain_3:
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_second_chain:
TuneableConsistencyScatter:
sub_chain_2:
a_chain_1:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_chain_2:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
"#),
c_chain_3:
TuneableConsistencyScatter:
sub_chain_2:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
"#),
EventMatcher::new().with_level(Level::Warn)
.with_target("shotover::transforms::distributed::tuneable_consistency_scatter")
.with_message("Using this transform is considered unstable - Does not work with REDIS pipelines")
Expand Down
86 changes: 64 additions & 22 deletions shotover/src/runner.rs
Expand Up @@ -3,6 +3,7 @@ use crate::config::Config;
use crate::observability::LogFilterHttpExporter;
use crate::transforms::Transforms;
use crate::transforms::Wrapper;
use anyhow::Context;
use anyhow::{anyhow, Result};
use clap::{crate_version, Parser};
use metrics_exporter_prometheus::PrometheusBuilder;
Expand Down Expand Up @@ -46,7 +47,7 @@ pub struct ConfigOpts {
pub log_format: LogFormat,
}

#[derive(clap::ValueEnum, Clone)]
#[derive(clap::ValueEnum, Clone, Copy)]
pub enum LogFormat {
Human,
Json,
Expand All @@ -64,44 +65,76 @@ impl Default for ConfigOpts {
}
}

pub struct Runner {
pub struct Shotover {
runtime: Runtime,
topology: Topology,
config: Config,
tracing: TracingState,
}

impl Runner {
pub fn new(params: ConfigOpts) -> Result<Self> {
impl Shotover {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let opts = ConfigOpts::parse();
let log_format = opts.log_format;

match Shotover::new_inner(opts) {
Ok(x) => x,
Err(err) => {
// If initialization failed then we have no tokio runtime or tracing to use.
// Create the simplest runtime + tracing so we can write out an `error!`, if even that fails then just panic.
// Put it all in its own scope so we drop it (and therefore perform tracing log flushing) before we exit
{
let rt = Runtime::new()
.context("Failed to create runtime while trying to report {err:?}")
.unwrap();
let _guard = rt.enter();
let _tracing_state = TracingState::new("error", log_format)
.context("Failed to create TracingState while trying to report {err:?}")
.unwrap();

tracing::error!("{:?}", err.context("Failed to start shotover"));
}
std::process::exit(1);
}
}
}

fn new_inner(params: ConfigOpts) -> Result<Self> {
let config = Config::from_file(params.config_file)?;
let topology = Topology::from_file(&params.topology_file)?;

let tracing = TracingState::new(config.main_log_level.as_str(), params.log_format)?;
let runtime = Shotover::create_runtime(params.stack_size, params.core_threads);

let runtime = Runner::create_runtime(params.stack_size, params.core_threads);
Shotover::start_observability_interface(&runtime, &config, &tracing)?;

Ok(Runner {
Ok(Shotover {
runtime,
topology,
config,
tracing,
})
}

pub fn with_observability_interface(self) -> Result<Self> {
fn start_observability_interface(
runtime: &Runtime,
config: &Config,
tracing: &TracingState,
) -> Result<()> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
metrics::set_boxed_recorder(Box::new(recorder))?;

let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(handle, socket, self.tracing.handle.clone());
let socket: SocketAddr = config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(handle, socket, tracing.handle.clone());

self.runtime.spawn(exporter.async_run());

Ok(self)
runtime.spawn(exporter.async_run());
Ok(())
}

pub fn run_block(self) -> Result<()> {
/// Begins running shotover, permanently handing control of the appplication over to shotover.
/// As such this method never returns.
pub fn run_block(self) -> ! {
let (trigger_shutdown_tx, trigger_shutdown_rx) = watch::channel(false);

// We need to block on this part to ensure that we immediately register these signals.
Expand All @@ -125,8 +158,23 @@ impl Runner {
trigger_shutdown_tx.send(true).unwrap();
});

self.runtime
let code = match self
.runtime
.block_on(run(self.topology, self.config, trigger_shutdown_rx))
{
Ok(()) => {
info!("Shotover was shutdown cleanly.");
0
}
Err(err) => {
error!("{:?}", err.context("Failed to start shotover"));
1
}
};
// Ensure tracing is flushed by dropping before exiting
std::mem::drop(self.tracing);
std::mem::drop(self.runtime);
std::process::exit(code);
}

fn create_runtime(stack_size: usize, worker_threads: Option<usize>) -> Runtime {
Expand Down Expand Up @@ -264,15 +312,9 @@ pub async fn run(
match topology.run_chains(trigger_shutdown_rx).await {
Ok(sources) => {
futures::future::join_all(sources.into_iter().map(|x| x.into_join_handle())).await;
info!("Shotover was shutdown cleanly.");
Ok(())
}
Err(error) => {
error!("{:?}", error);
Err(anyhow!(
"Shotover failed to initialize, the fatal error was logged."
))
}
Err(err) => Err(err),
}
}

Expand Down