Skip to content

Commit

Permalink
feat(config): add wildcard expansion to inputs (#6170)
Browse files Browse the repository at this point in the history
Closes #2550

This allows for very simple wildcards in input lists. The `*` must be
the last character in the string and we do a simple prefix match against
all valid inputs (i.e. self is excluded). We could choose to expand this
to a more full-featured glob syntax in the future, but there were enough
questions there that this seemed like a better start. It is also my
expectation that this should cover >95% of valuable use cases, so it
may well be all we ever need.

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
  • Loading branch information
lukesteensen committed Jan 27, 2021
1 parent 39571a9 commit e777720
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 35 deletions.
15 changes: 15 additions & 0 deletions src/config/builder.rs
Expand Up @@ -39,6 +39,21 @@ impl Clone for ConfigBuilder {
}
}

impl From<Config> for ConfigBuilder {
fn from(c: Config) -> Self {
ConfigBuilder {
global: c.global,
#[cfg(feature = "api")]
api: c.api,
healthchecks: c.healthchecks,
sources: c.sources,
sinks: c.sinks,
transforms: c.transforms,
tests: c.tests,
}
}
}

impl ConfigBuilder {
pub fn build(self) -> Result<Config, Vec<String>> {
self.build_with(false)
Expand Down
181 changes: 159 additions & 22 deletions src/config/compiler.rs
@@ -1,49 +1,51 @@
use super::{builder::ConfigBuilder, handle_warnings, validation, Config, TransformOuter};
use indexmap::IndexMap;

pub fn compile(raw: ConfigBuilder, deny_warnings: bool) -> Result<Config, Vec<String>> {
let mut config = Config {
global: raw.global,
#[cfg(feature = "api")]
api: raw.api,
healthchecks: raw.healthchecks,
sources: raw.sources,
sinks: raw.sinks,
transforms: raw.transforms,
tests: raw.tests,
expansions: Default::default(),
};

pub fn compile(mut builder: ConfigBuilder, deny_warnings: bool) -> Result<Config, Vec<String>> {
let mut errors = Vec::new();

expand_macros(&mut config)?;
expand_wildcards(&mut builder);

let expansions = expand_macros(&mut builder)?;

if let Err(warn) = handle_warnings(validation::warnings(&config), deny_warnings) {
if let Err(warn) = handle_warnings(validation::warnings(&builder), deny_warnings) {
errors.extend(warn);
}

if let Err(type_errors) = validation::check_shape(&config) {
if let Err(type_errors) = validation::check_shape(&builder) {
errors.extend(type_errors);
}

if let Err(type_errors) = validation::typecheck(&config) {
if let Err(type_errors) = validation::typecheck(&builder) {
errors.extend(type_errors);
}

if let Err(type_errors) = validation::check_resources(&config) {
if let Err(type_errors) = validation::check_resources(&builder) {
errors.extend(type_errors);
}

if errors.is_empty() {
Ok(config)
Ok(Config {
global: builder.global,
#[cfg(feature = "api")]
api: builder.api,
healthchecks: builder.healthchecks,
sources: builder.sources,
sinks: builder.sinks,
transforms: builder.transforms,
tests: builder.tests,
expansions,
})
} else {
Err(errors)
}
}

/// Some component configs can act like macros and expand themselves into multiple replacement
/// configs. Performs those expansions and records the relevant metadata.
pub(super) fn expand_macros(config: &mut Config) -> Result<(), Vec<String>> {
pub(super) fn expand_macros(
config: &mut ConfigBuilder,
) -> Result<IndexMap<String, Vec<String>>, Vec<String>> {
let mut expanded_transforms = IndexMap::new();
let mut expansions = IndexMap::new();
let mut errors = Vec::new();
Expand Down Expand Up @@ -78,7 +80,142 @@ pub(super) fn expand_macros(config: &mut Config) -> Result<(), Vec<String>> {
if !errors.is_empty() {
Err(errors)
} else {
config.expansions = expansions;
Ok(())
Ok(expansions)
}
}

/// Expand trailing `*` wildcards in input lists
fn expand_wildcards(config: &mut ConfigBuilder) {
let candidates = config
.sources
.keys()
.chain(config.transforms.keys())
.cloned()
.collect::<Vec<String>>();

for (name, transform) in config.transforms.iter_mut() {
expand_wildcards_inner(&mut transform.inputs, name, &candidates);
}

for (name, sink) in config.sinks.iter_mut() {
expand_wildcards_inner(&mut sink.inputs, name, &candidates);
}
}

fn expand_wildcards_inner(inputs: &mut Vec<String>, name: &str, candidates: &[String]) {
let raw_inputs = std::mem::take(inputs);
for raw_input in raw_inputs {
if raw_input.ends_with('*') {
let prefix = &raw_input[0..raw_input.len() - 1];
for input in candidates {
if input.starts_with(prefix) && input != name {
inputs.push(input.clone())
}
}
} else {
inputs.push(raw_input);
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{
config::{DataType, GlobalOptions, SinkConfig, SinkContext, SourceConfig, TransformConfig},
shutdown::ShutdownSignal,
sinks::{Healthcheck, VectorSink},
sources::Source,
transforms::Transform,
Pipeline,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct MockSourceConfig;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MockTransformConfig;

#[derive(Debug, Serialize, Deserialize)]
struct MockSinkConfig;

#[async_trait]
#[typetag::serde(name = "mock")]
impl SourceConfig for MockSourceConfig {
async fn build(
&self,
_name: &str,
_globals: &GlobalOptions,
_shutdown: ShutdownSignal,
_out: Pipeline,
) -> crate::Result<Source> {
unimplemented!()
}

fn source_type(&self) -> &'static str {
"mock"
}

fn output_type(&self) -> DataType {
DataType::Any
}
}

#[async_trait]
#[typetag::serde(name = "mock")]
impl TransformConfig for MockTransformConfig {
async fn build(&self) -> crate::Result<Transform> {
unimplemented!()
}

fn transform_type(&self) -> &'static str {
"mock"
}

fn input_type(&self) -> DataType {
DataType::Any
}

fn output_type(&self) -> DataType {
DataType::Any
}
}

#[async_trait]
#[typetag::serde(name = "mock")]
impl SinkConfig for MockSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
unimplemented!()
}

fn sink_type(&self) -> &'static str {
"mock"
}

fn input_type(&self) -> DataType {
DataType::Any
}
}

#[test]
fn wildcard_expansion() {
let mut builder = ConfigBuilder::default();
builder.add_source("foo1", MockSourceConfig);
builder.add_source("foo2", MockSourceConfig);
builder.add_source("bar", MockSourceConfig);
builder.add_transform("foos", &["foo*"], MockTransformConfig);
builder.add_sink("baz", &["foos*", "b*"], MockSinkConfig);
builder.add_sink("quux", &["*"], MockSinkConfig);

let config = builder.build().expect("build should succeed");

assert_eq!(config.transforms["foos"].inputs, vec!["foo1", "foo2"]);
assert_eq!(config.sinks["baz"].inputs, vec!["foos", "bar"]);
assert_eq!(
config.sinks["quux"].inputs,
vec!["foo1", "foo2", "bar", "foos"]
);
}
}
10 changes: 5 additions & 5 deletions src/config/unit_test.rs
Expand Up @@ -21,12 +21,14 @@ pub async fn build_unit_tests_main(
build_unit_tests(config).await
}

async fn build_unit_tests(builder: ConfigBuilder) -> Result<Vec<UnitTest>, Vec<String>> {
async fn build_unit_tests(mut builder: ConfigBuilder) -> Result<Vec<UnitTest>, Vec<String>> {
let mut tests = vec![];
let mut errors = vec![];

let expansions = super::compiler::expand_macros(&mut builder)?;

// Don't let this escape since it's not validated
let mut config = Config {
let config = Config {
global: builder.global,
#[cfg(feature = "api")]
api: builder.api,
Expand All @@ -35,11 +37,9 @@ async fn build_unit_tests(builder: ConfigBuilder) -> Result<Vec<UnitTest>, Vec<S
sinks: builder.sinks,
transforms: builder.transforms,
tests: builder.tests,
expansions: Default::default(),
expansions,
};

super::compiler::expand_macros(&mut config)?;

for test in &config.tests {
match build_unit_test(test, &config).await {
Ok(t) => tests.push(t),
Expand Down
14 changes: 7 additions & 7 deletions src/config/validation.rs
@@ -1,7 +1,7 @@
use super::{Config, DataType, Resource};
use super::{builder::ConfigBuilder, DataType, Resource};
use std::collections::HashMap;

pub fn check_shape(config: &Config) -> Result<(), Vec<String>> {
pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let mut errors = vec![];

if config.sources.is_empty() {
Expand Down Expand Up @@ -46,7 +46,7 @@ pub fn check_shape(config: &Config) -> Result<(), Vec<String>> {
}
}

pub fn check_resources(config: &Config) -> Result<(), Vec<String>> {
pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let source_resources = config
.sources
.iter()
Expand All @@ -73,7 +73,7 @@ pub fn check_resources(config: &Config) -> Result<(), Vec<String>> {
}
}

pub fn warnings(config: &Config) -> Vec<String> {
pub fn warnings(config: &ConfigBuilder) -> Vec<String> {
let mut warnings = vec![];

let source_names = config.sources.keys().map(|name| ("source", name.clone()));
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn warnings(config: &Config) -> Vec<String> {
warnings
}

pub fn typecheck(config: &Config) -> Result<(), Vec<String>> {
pub fn typecheck(config: &ConfigBuilder) -> Result<(), Vec<String>> {
Graph::from(config).typecheck()
}

Expand Down Expand Up @@ -223,8 +223,8 @@ impl Graph {
}
}

impl From<&Config> for Graph {
fn from(config: &Config) -> Self {
impl From<&ConfigBuilder> for Graph {
fn from(config: &ConfigBuilder) -> Self {
let mut graph = Graph::default();

// TODO: validate that node names are unique across sources/transforms/sinks?
Expand Down
3 changes: 2 additions & 1 deletion tests/config.rs
Expand Up @@ -8,8 +8,9 @@ async fn load(config: &str, format: config::FormatHint) -> Result<Vec<String>, V
match config::load_from_str(config, format) {
Ok(c) => {
let diff = ConfigDiff::initial(&c);
let c2 = config::load_from_str(config, format).unwrap();
match (
config::warnings(&c),
config::warnings(&c2.into()),
topology::builder::build_pieces(&c, &diff, HashMap::new()).await,
) {
(warnings, Ok(_pieces)) => Ok(warnings),
Expand Down

0 comments on commit e777720

Please sign in to comment.