-
Notifications
You must be signed in to change notification settings - Fork 16
/
runner.rs
235 lines (202 loc) · 7.26 KB
/
runner.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
use std::env;
use std::net::SocketAddr;
use anyhow::{anyhow, Result};
use clap::{crate_version, Clap};
use metrics_runtime::Receiver;
use tokio::runtime::{self, Runtime};
use tokio::signal;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::layer::Layered;
use tracing_subscriber::reload::Handle;
use tracing_subscriber::{EnvFilter, Registry};
use crate::admin::httpserver::LogFilterHttpExporter;
use crate::config::topology::Topology;
use crate::config::Config;
use crate::transforms::Transforms;
use crate::transforms::Wrapper;
#[derive(Clap, Clone)]
#[clap(version = crate_version!(), author = "Instaclustr")]
pub struct ConfigOpts {
#[clap(short, long, default_value = "config/topology.yaml")]
pub topology_file: String,
#[clap(short, long, default_value = "config/config.yaml")]
pub config_file: String,
#[clap(long, default_value = "4")]
pub core_threads: usize,
// 2,097,152 = 2 * 1024 * 1024 (2MiB)
#[clap(long, default_value = "2097152")]
pub stack_size: usize,
}
impl Default for ConfigOpts {
fn default() -> Self {
Self {
topology_file: "config/topology.yaml".into(),
config_file: "config/config.yaml".into(),
core_threads: 4,
stack_size: 2097152,
}
}
}
pub struct Runner {
runtime: Runtime,
topology: Topology,
config: Config,
tracing: TracingState,
}
impl Runner {
pub fn new(params: ConfigOpts) -> Result<Self> {
let config = Config::from_file(params.config_file.clone())?;
let topology = Topology::from_file(params.topology_file.clone())?;
let runtime = runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("RPProxy-Thread")
.thread_stack_size(params.stack_size)
.worker_threads(params.core_threads)
.build()
.unwrap();
let tracing = TracingState::new(config.main_log_level.as_str())?;
Ok(Runner {
runtime,
topology,
config,
tracing,
})
}
pub fn with_observability_interface(self) -> Result<Self> {
let receiver = Receiver::builder()
.build()
.expect("failed to create receiver");
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter =
LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());
receiver.install();
self.runtime.spawn(exporter.async_run());
Ok(self)
}
pub fn run_spawn(self) -> RunnerSpawned {
let (trigger_shutdown_tx, _) = broadcast::channel(1);
let handle =
self.runtime
.spawn(run(self.topology, self.config, trigger_shutdown_tx.clone()));
RunnerSpawned {
runtime: self.runtime,
tracing_guard: self.tracing.guard,
trigger_shutdown_tx,
handle,
}
}
pub fn run_block(self) -> Result<()> {
let (trigger_shutdown_tx, _) = broadcast::channel(1);
let trigger_shutdown_tx_clone = trigger_shutdown_tx.clone();
self.runtime.spawn(async move {
signal::ctrl_c().await.unwrap();
trigger_shutdown_tx_clone.send(()).unwrap();
});
self.runtime
.block_on(run(self.topology, self.config, trigger_shutdown_tx))
}
}
struct TracingState {
/// Once this is dropped tracing logs are ignored
guard: WorkerGuard,
handle:
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>,
}
/// Returns a new `EnvFilter` by parsing each directive string, or an error if any directive is invalid.
/// The parsing is robust to formatting, but will reject the first invalid directive (e.g. bad log level).
fn try_parse_log_directives(directives: &[Option<&str>]) -> Result<EnvFilter> {
let directives: Vec<Directive> = directives
.iter()
.flat_map(Option::as_deref)
.flat_map(|s| s.split(','))
.map(str::trim)
.filter(|s| !s.is_empty())
.map(|s| s.parse().map_err(|e| anyhow!("{}: {}", e, s)))
.collect::<Result<_>>()?;
let filter = directives
.into_iter()
.fold(EnvFilter::default(), |filter, directive| {
filter.add_directive(directive)
});
Ok(filter)
}
impl TracingState {
fn new(log_level: &str) -> Result<Self> {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());
let builder = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter({
// Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority.
// In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466.
let overrides = env::var(EnvFilter::DEFAULT_ENV).ok();
try_parse_log_directives(&[Some(log_level), overrides.as_deref()])?
})
.with_filter_reloading();
let handle = builder.reload_handle();
// To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init.
// Currently the implementation of try_init will only fail when it is called multiple times.
builder.try_init().ok();
Ok(TracingState { guard, handle })
}
}
pub struct RunnerSpawned {
pub runtime: Runtime,
pub handle: JoinHandle<Result<()>>,
pub tracing_guard: WorkerGuard,
pub trigger_shutdown_tx: broadcast::Sender<()>,
}
pub async fn run(
topology: Topology,
config: Config,
trigger_shutdown_tx: broadcast::Sender<()>,
) -> Result<()> {
info!("Starting Shotover {}", crate_version!());
info!(configuration = ?config);
info!(topology = ?topology);
debug!(
"Transform overhead size on stack is {}",
std::mem::size_of::<Transforms>()
);
debug!(
"Wrapper overhead size on stack is {}",
std::mem::size_of::<Wrapper<'_>>()
);
match topology.run_chains(trigger_shutdown_tx).await {
Ok((_, mut shutdown_complete_rx)) => {
shutdown_complete_rx.recv().await;
info!("Shotover was shutdown cleanly.");
Ok(())
}
Err(error) => {
error!("{:?}", error);
Err(anyhow!(
"Shotover failed to initialize, the fatal error was logged."
))
}
}
}
#[test]
fn test_try_parse_log_directives() {
assert_eq!(
try_parse_log_directives(&[
Some("info,short=warn,error"),
None,
Some("debug"),
Some("alongname=trace")
])
.unwrap()
.to_string(),
// Ordered by descending specificity.
"alongname=trace,short=warn,debug"
);
match try_parse_log_directives(&[Some("good=info,bad=blah,warn")]) {
Ok(_) => panic!(),
Err(e) => assert_eq!(e.to_string(), "invalid filter directive: bad=blah"),
}
}