Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): add wildcard expansion to inputs #6170

Merged
merged 5 commits into from Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/config/builder.rs
Expand Up @@ -39,6 +39,21 @@ impl Clone for ConfigBuilder {
}
}

impl From<Config> for ConfigBuilder {
fn from(c: Config) -> Self {
ConfigBuilder {
global: c.global,
#[cfg(feature = "api")]
api: c.api,
healthchecks: c.healthchecks,
sources: c.sources,
sinks: c.sinks,
transforms: c.transforms,
tests: c.tests,
}
}
}

impl ConfigBuilder {
pub fn build(self) -> Result<Config, Vec<String>> {
self.build_with(false)
Expand Down
181 changes: 159 additions & 22 deletions src/config/compiler.rs
@@ -1,49 +1,51 @@
use super::{builder::ConfigBuilder, handle_warnings, validation, Config, TransformOuter};
use indexmap::IndexMap;

pub fn compile(raw: ConfigBuilder, deny_warnings: bool) -> Result<Config, Vec<String>> {
let mut config = Config {
global: raw.global,
#[cfg(feature = "api")]
api: raw.api,
healthchecks: raw.healthchecks,
sources: raw.sources,
sinks: raw.sinks,
transforms: raw.transforms,
tests: raw.tests,
expansions: Default::default(),
};

pub fn compile(mut builder: ConfigBuilder, deny_warnings: bool) -> Result<Config, Vec<String>> {
let mut errors = Vec::new();

expand_macros(&mut config)?;
expand_wildcards(&mut builder);

let expansions = expand_macros(&mut builder)?;

if let Err(warn) = handle_warnings(validation::warnings(&config), deny_warnings) {
if let Err(warn) = handle_warnings(validation::warnings(&builder), deny_warnings) {
errors.extend(warn);
}

if let Err(type_errors) = validation::check_shape(&config) {
if let Err(type_errors) = validation::check_shape(&builder) {
errors.extend(type_errors);
}

if let Err(type_errors) = validation::typecheck(&config) {
if let Err(type_errors) = validation::typecheck(&builder) {
errors.extend(type_errors);
}

if let Err(type_errors) = validation::check_resources(&config) {
if let Err(type_errors) = validation::check_resources(&builder) {
errors.extend(type_errors);
}

if errors.is_empty() {
Ok(config)
Ok(Config {
global: builder.global,
#[cfg(feature = "api")]
api: builder.api,
healthchecks: builder.healthchecks,
sources: builder.sources,
sinks: builder.sinks,
transforms: builder.transforms,
tests: builder.tests,
expansions,
})
} else {
Err(errors)
}
}

/// Some component configs can act like macros and expand themselves into multiple replacement
/// configs. Performs those expansions and records the relevant metadata.
pub(super) fn expand_macros(config: &mut Config) -> Result<(), Vec<String>> {
pub(super) fn expand_macros(
config: &mut ConfigBuilder,
) -> Result<IndexMap<String, Vec<String>>, Vec<String>> {
let mut expanded_transforms = IndexMap::new();
let mut expansions = IndexMap::new();
let mut errors = Vec::new();
Expand Down Expand Up @@ -78,7 +80,142 @@ pub(super) fn expand_macros(config: &mut Config) -> Result<(), Vec<String>> {
if !errors.is_empty() {
Err(errors)
} else {
config.expansions = expansions;
Ok(())
Ok(expansions)
}
}

/// Expand trailing `*` wildcards in input lists
fn expand_wildcards(config: &mut ConfigBuilder) {
let candidates = config
.sources
.keys()
.chain(config.transforms.keys())
.cloned()
.collect::<Vec<String>>();

for (name, transform) in config.transforms.iter_mut() {
expand_wildcards_inner(&mut transform.inputs, name, &candidates);
}

for (name, sink) in config.sinks.iter_mut() {
expand_wildcards_inner(&mut sink.inputs, name, &candidates);
}
}

fn expand_wildcards_inner(inputs: &mut Vec<String>, name: &str, candidates: &[String]) {
let raw_inputs = std::mem::take(inputs);
for raw_input in raw_inputs {
if raw_input.ends_with('*') {
let prefix = &raw_input[0..raw_input.len() - 1];
for input in candidates {
if input.starts_with(prefix) && input != name {
inputs.push(input.clone())
}
}
} else {
inputs.push(raw_input);
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{
config::{DataType, GlobalOptions, SinkConfig, SinkContext, SourceConfig, TransformConfig},
shutdown::ShutdownSignal,
sinks::{Healthcheck, VectorSink},
sources::Source,
transforms::Transform,
Pipeline,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct MockSourceConfig;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MockTransformConfig;

#[derive(Debug, Serialize, Deserialize)]
struct MockSinkConfig;

#[async_trait]
#[typetag::serde(name = "mock")]
impl SourceConfig for MockSourceConfig {
async fn build(
&self,
_name: &str,
_globals: &GlobalOptions,
_shutdown: ShutdownSignal,
_out: Pipeline,
) -> crate::Result<Source> {
unimplemented!()
}

fn source_type(&self) -> &'static str {
"mock"
}

fn output_type(&self) -> DataType {
DataType::Any
}
}

#[async_trait]
#[typetag::serde(name = "mock")]
impl TransformConfig for MockTransformConfig {
async fn build(&self) -> crate::Result<Transform> {
unimplemented!()
}

fn transform_type(&self) -> &'static str {
"mock"
}

fn input_type(&self) -> DataType {
DataType::Any
}

fn output_type(&self) -> DataType {
DataType::Any
}
}

#[async_trait]
#[typetag::serde(name = "mock")]
impl SinkConfig for MockSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
unimplemented!()
}

fn sink_type(&self) -> &'static str {
"mock"
}

fn input_type(&self) -> DataType {
DataType::Any
}
}

#[test]
fn wildcard_expansion() {
let mut builder = ConfigBuilder::default();
builder.add_source("foo1", MockSourceConfig);
builder.add_source("foo2", MockSourceConfig);
builder.add_source("bar", MockSourceConfig);
builder.add_transform("foos", &["foo*"], MockTransformConfig);
builder.add_sink("baz", &["foos*", "b*"], MockSinkConfig);
builder.add_sink("quux", &["*"], MockSinkConfig);

let config = builder.build().expect("build should succeed");

assert_eq!(config.transforms["foos"].inputs, vec!["foo1", "foo2"]);
assert_eq!(config.sinks["baz"].inputs, vec!["foos", "bar"]);
assert_eq!(
config.sinks["quux"].inputs,
vec!["foo1", "foo2", "bar", "foos"]
);
}
}
10 changes: 5 additions & 5 deletions src/config/unit_test.rs
Expand Up @@ -22,12 +22,14 @@ pub async fn build_unit_tests_main(
build_unit_tests(config).await
}

async fn build_unit_tests(builder: ConfigBuilder) -> Result<Vec<UnitTest>, Vec<String>> {
async fn build_unit_tests(mut builder: ConfigBuilder) -> Result<Vec<UnitTest>, Vec<String>> {
let mut tests = vec![];
let mut errors = vec![];

let expansions = super::compiler::expand_macros(&mut builder)?;

// Don't let this escape since it's not validated
let mut config = Config {
let config = Config {
global: builder.global,
#[cfg(feature = "api")]
api: builder.api,
Expand All @@ -36,11 +38,9 @@ async fn build_unit_tests(builder: ConfigBuilder) -> Result<Vec<UnitTest>, Vec<S
sinks: builder.sinks,
transforms: builder.transforms,
tests: builder.tests,
expansions: Default::default(),
expansions,
};

super::compiler::expand_macros(&mut config)?;

for test in &config.tests {
match build_unit_test(test, &config).await {
Ok(t) => tests.push(t),
Expand Down
14 changes: 7 additions & 7 deletions src/config/validation.rs
@@ -1,7 +1,7 @@
use super::{Config, DataType, Resource};
use super::{builder::ConfigBuilder, DataType, Resource};
use std::collections::HashMap;

pub fn check_shape(config: &Config) -> Result<(), Vec<String>> {
pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let mut errors = vec![];

if config.sources.is_empty() {
Expand Down Expand Up @@ -46,7 +46,7 @@ pub fn check_shape(config: &Config) -> Result<(), Vec<String>> {
}
}

pub fn check_resources(config: &Config) -> Result<(), Vec<String>> {
pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let source_resources = config
.sources
.iter()
Expand All @@ -73,7 +73,7 @@ pub fn check_resources(config: &Config) -> Result<(), Vec<String>> {
}
}

pub fn warnings(config: &Config) -> Vec<String> {
pub fn warnings(config: &ConfigBuilder) -> Vec<String> {
let mut warnings = vec![];

let source_names = config.sources.keys().map(|name| ("source", name.clone()));
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn warnings(config: &Config) -> Vec<String> {
warnings
}

pub fn typecheck(config: &Config) -> Result<(), Vec<String>> {
pub fn typecheck(config: &ConfigBuilder) -> Result<(), Vec<String>> {
Graph::from(config).typecheck()
}

Expand Down Expand Up @@ -223,8 +223,8 @@ impl Graph {
}
}

impl From<&Config> for Graph {
fn from(config: &Config) -> Self {
impl From<&ConfigBuilder> for Graph {
fn from(config: &ConfigBuilder) -> Self {
let mut graph = Graph::default();

// TODO: validate that node names are unique across sources/transforms/sinks?
Expand Down
3 changes: 2 additions & 1 deletion tests/config.rs
Expand Up @@ -8,8 +8,9 @@ async fn load(config: &str, format: config::FormatHint) -> Result<Vec<String>, V
match config::load_from_str(config, format) {
Ok(c) => {
let diff = ConfigDiff::initial(&c);
let c2 = config::load_from_str(config, format).unwrap();
match (
config::warnings(&c),
config::warnings(&c2.into()),
topology::builder::build_pieces(&c, &diff, HashMap::new()).await,
) {
(warnings, Ok(_pieces)) => Ok(warnings),
Expand Down