/
builder.rs
154 lines (136 loc) · 4.87 KB
/
builder.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#[cfg(feature = "api")]
use super::api;
use super::{
compiler, default_data_dir, Config, GlobalOptions, HealthcheckOptions, SinkConfig, SinkOuter,
SourceConfig, TestDefinition, TransformConfig, TransformOuter,
};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ConfigBuilder {
#[serde(flatten)]
pub global: GlobalOptions,
#[cfg(feature = "api")]
#[serde(default)]
pub api: api::Options,
#[serde(default)]
pub healthchecks: HealthcheckOptions,
#[serde(default)]
pub sources: IndexMap<String, Box<dyn SourceConfig>>,
#[serde(default)]
pub sinks: IndexMap<String, SinkOuter>,
#[serde(default)]
pub transforms: IndexMap<String, TransformOuter>,
#[serde(default)]
pub tests: Vec<TestDefinition>,
}
impl Clone for ConfigBuilder {
fn clone(&self) -> Self {
// This is a hack around the issue of cloning
// trait objects. So instead to clone the config
// we first serialize it into JSON, then back from
// JSON. Originally we used TOML here but TOML does not
// support serializing `None`.
let json = serde_json::to_value(self).unwrap();
serde_json::from_value(json).unwrap()
}
}
impl From<Config> for ConfigBuilder {
fn from(c: Config) -> Self {
ConfigBuilder {
global: c.global,
#[cfg(feature = "api")]
api: c.api,
healthchecks: c.healthchecks,
sources: c.sources,
sinks: c.sinks,
transforms: c.transforms,
tests: c.tests,
}
}
}
impl ConfigBuilder {
pub fn build(self) -> Result<Config, Vec<String>> {
self.build_with(false)
}
pub fn build_with(self, deny_warnings: bool) -> Result<Config, Vec<String>> {
compiler::compile(self, deny_warnings)
}
pub fn add_source<S: SourceConfig + 'static, T: Into<String>>(&mut self, name: T, source: S) {
self.sources.insert(name.into(), Box::new(source));
}
pub fn add_sink<S: SinkConfig + 'static, T: Into<String>>(
&mut self,
name: T,
inputs: &[&str],
sink: S,
) {
let inputs = inputs.iter().map(|&s| s.to_owned()).collect::<Vec<_>>();
let sink = SinkOuter::new(inputs, Box::new(sink));
self.sinks.insert(name.into(), sink);
}
pub fn add_transform<T: TransformConfig + 'static, S: Into<String>>(
&mut self,
name: S,
inputs: &[&str],
transform: T,
) {
let inputs = inputs.iter().map(|&s| s.to_owned()).collect::<Vec<_>>();
let transform = TransformOuter {
inner: Box::new(transform),
inputs,
};
self.transforms.insert(name.into(), transform);
}
pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
let mut errors = Vec::new();
#[cfg(feature = "api")]
if let Err(error) = self.api.merge(with.api) {
errors.push(error);
}
if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() {
self.global.data_dir = with.global.data_dir;
} else if with.global.data_dir != default_data_dir()
&& self.global.data_dir != with.global.data_dir
{
// If two configs both set 'data_dir' and have conflicting values
// we consider this an error.
errors.push("conflicting values for 'data_dir' found".to_owned());
}
// If the user has multiple config files, we must *merge* log schemas until we meet a
// conflict, then we are allowed to error.
if let Err(merge_errors) = self.global.log_schema.merge(with.global.log_schema) {
errors.extend(merge_errors);
}
self.healthchecks.merge(with.healthchecks);
with.sources.keys().for_each(|k| {
if self.sources.contains_key(k) {
errors.push(format!("duplicate source name found: {}", k));
}
});
with.sinks.keys().for_each(|k| {
if self.sinks.contains_key(k) {
errors.push(format!("duplicate sink name found: {}", k));
}
});
with.transforms.keys().for_each(|k| {
if self.transforms.contains_key(k) {
errors.push(format!("duplicate transform name found: {}", k));
}
});
with.tests.iter().for_each(|wt| {
if self.tests.iter().any(|t| t.name == wt.name) {
errors.push(format!("duplicate test name found: {}", wt.name));
}
});
if !errors.is_empty() {
return Err(errors);
}
self.sources.extend(with.sources);
self.sinks.extend(with.sinks);
self.transforms.extend(with.transforms);
self.tests.extend(with.tests);
Ok(())
}
}