Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipelines transform): load and handle pipelines tranforms #9733

Merged
merged 20 commits into from
Nov 4, 2021

Conversation

jdrouet
Copy link
Contributor

@jdrouet jdrouet commented Oct 21, 2021

Resources

How it works

The pipelines transform will expand into the following graph

image

What's left to do

  • implement loading configuration
  • implement configuration expanding
  • add transform documentation

What's left to be done (other PR)

  • automatic namespacing
  • add filter at pipeline level
  • add route mode
  • add traces path

For those who want to change the chart

graph TD
    SOURCE_A[sources.a] --> FILTER_LOG
    SOURCE_B[sources.b] --> FILTER_LOG
    SOURCE_A --> FILTER_METRIC
    SOURCE_B --> FILTER_METRIC
    subgraph Pipelines Transform
    subgraph Log Route
    FILTER_LOG[transforms.foo.logs.filter] --> LOG_A_FILTER

    subgraph Log Pipeline A 
    LOG_A_FILTER[transforms.foo.logs.pipelines.a.filter] --> LOG_A_TRANSFORM_0
    LOG_A_TRANSFORM_0[transforms.foo.logs.pipelines.a.transforms.0] --> LOG_A_TRANSFORM_1
    LOG_A_TRANSFORM_1[transforms.foo.logs.pipelines.a.transforms.1] --> LOG_A_TRANSFORM_ALIAS
    LOG_A_TRANSFORM_ALIAS{{transforms.foo.logs.pipelines.a}}
    end

    subgraph Log Pipeline B
    LOG_A_TRANSFORM_ALIAS --> LOG_B_FILTER
    LOG_B_FILTER[transforms.foo.logs.pipelines.b.filter] --> LOG_B_TRANSFORM_0
    LOG_B_TRANSFORM_0[transforms.foo.logs.pipelines.b.transforms.1] --> LOG_B_TRANSFORM_1
    LOG_B_TRANSFORM_1[transforms.foo.logs.pipelines.b.transforms.1] --> LOG_B_TRANSFORM_ALIAS
    LOG_B_TRANSFORM_ALIAS{{transforms.foo.logs.pipelines.b}}
    end

    LOG_B_TRANSFORM_ALIAS --> LOG_ALIAS{{transforms.foo.logs}}
    end

    subgraph Metric Route
    FILTER_METRIC[transforms.foo.metrics.filter]--> METRIC_A_FILTER

    subgraph Metric Pipeline A 
    METRIC_A_FILTER[transforms.foo.metrics.pipelines.a.filter] --> METRIC_A_TRANSFORM_0
    METRIC_A_TRANSFORM_0[transforms.foo.metrics.pipelines.a.transforms.0] --> METRIC_A_TRANSFORM_1
    METRIC_A_TRANSFORM_1[transforms.foo.metrics.pipelines.a.transforms.1] --> METRIC_A_TRANSFORM_ALIAS
    METRIC_A_TRANSFORM_ALIAS{{transforms.foo.metrics.pipelines.a}}
    end
    subgraph Metric Pipeline B
    METRIC_A_TRANSFORM_ALIAS --> METRIC_B_FILTER
    METRIC_B_FILTER[transforms.foo.metrics.pipelines.b.filter] --> METRIC_B_TRANSFORM_0
    METRIC_B_TRANSFORM_0[transforms.foo.metrics.pipelines.b.transforms.0] --> METRIC_B_TRANSFORM_1
    METRIC_B_TRANSFORM_1[transforms.foo.metrics.pipelines.b.transforms.1] --> METRIC_B_TRANSFORM_ALIAS
    METRIC_B_TRANSFORM_ALIAS{{transforms.foo.metrics.pipelines.b}}
    end

    METRIC_B_TRANSFORM_ALIAS --> METRIC_ALIAS{{transforms.foo.metrics}}
    end

    LOG_ALIAS --> AGGREGATE
    METRIC_ALIAS --> AGGREGATE
    AGGREGATE{{transforms.foo}}
    end
Loading

@netlify
Copy link

netlify bot commented Oct 21, 2021

✔️ Deploy Preview for vector-project canceled.

🔨 Explore the source changes: 975d3ac

🔍 Inspect the deploy log: https://app.netlify.com/sites/vector-project/deploys/6183b343ad5373000715f3ba

@jdrouet jdrouet force-pushed the jdrouet/pipeline-structure branch 3 times, most recently from 5e23f55 to 6bb01bb Compare October 28, 2021 09:58
@jdrouet jdrouet changed the title feat(pipelines transform): create config structure feat(pipelines transform): load and handle pipelines tranforms Nov 2, 2021
@jdrouet jdrouet marked this pull request as ready for review November 2, 2021 09:34
jdrouet and others added 13 commits November 3, 2021 07:27
Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Copy link
Contributor

@tobz tobz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this seems reasonable. 👍🏻

My biggest complaint is just a lack of documentation which made it hard to understand what is going on when looking only at the code. It's not a blocker, but I do think some more documentation should be added either in this PR or a subsequent one.

lib/vector-core/src/transform/config.rs Outdated Show resolved Hide resolved
src/transforms/pipelines/mod.rs Show resolved Hide resolved
src/transforms/pipelines/mod.rs Outdated Show resolved Hide resolved
Comment on lines +47 to +29
// This is a hack around the issue of cloning
// trait objects. So instead to clone the config
// we first serialize it into JSON, then back from
// JSON. Originally we used TOML here but TOML does not
// support serializing `None`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should already be possible, since we require that TransformConfig is a supertrait of dyn_clone::DynClone, and we use the dyn_clone::clone_trait_object! macro to define a Clone impl for Box<dyn TransformConfig>:

pub trait TransformConfig: core::fmt::Debug + Send + Sync + dyn_clone::DynClone {
async fn build(&self, globals: &TransformContext)
-> crate::Result<crate::transform::Transform>;
fn input_type(&self) -> DataType;
fn output_type(&self) -> DataType;
fn named_outputs(&self) -> Vec<String> {
Vec::new()
}
fn transform_type(&self) -> &'static str;
/// Allows a transform configuration to expand itself into multiple "child"
/// transformations to replace it. This allows a transform to act as a macro
/// for various patterns.
fn expand(
&mut self,
) -> crate::Result<Option<(IndexMap<String, Box<dyn TransformConfig>>, ExpandType)>> {
Ok(None)
}
}
dyn_clone::clone_trait_object!(TransformConfig);

Should be as simple as deriving Clone on PipelineConfig. Does doing that return a specific error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's how it is currently done in the ConfigBuilder struct. I didn't want to reinvent the wheel on this.
Maybe we should create an issue related to this and handle it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this issue #9898

src/transforms/pipelines/mod.rs Outdated Show resolved Hide resolved
src/transforms/pipelines/mod.rs Outdated Show resolved Hide resolved
Copy link
Member

@leebenson leebenson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Echoing @tobz: it'd be useful to add some additional commentary around some of the struct changes and functions, to provide context for future readers.

I found myself jumping around the code a little, trying to figure out why a change was brought in - particularly around the transform Noop and the properties added to ExpandType.

I'd also like sign-off from @lukesteensen before merging.

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely would echo the desire for some docs on how this all fits together. This takes our macro expansion quite a bit further than anything else, and it's difficult to keep straight how all of the new things fit together to result in the chart in the PR description.

In particular, with things like Expander, EventRouterConfig, and EventFilterConfig, it seems like there are things that exist purely as intermediate expansions. I would like very much to simplify this by directly representing more of these concepts in the topology, but it would currently be difficult to refactor without worrying that something isn't going to be expanded in exactly the same way. Documentation and tests for how things are meant to expand would help that quite a bit.

Overall though, this is very neat and seems like it should work!

src/transforms/noop.rs Show resolved Hide resolved
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
@bits-bot
Copy link

bits-bot commented Nov 3, 2021

CLA assistant check
All committers have signed the CLA.

Signed-off-by: Jérémie Drouet <jeremie.drouet@datadoghq.com>
Comment on lines +14 to +16
/// This way of expanding will duplicate the inputs for every expanded node.
/// If `aggregates` is set to `true`, then a `Noop` transform will be added
/// so that you can use the original component name as an input.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense! If I were to tweak the wording, I'd say something like

Duplicate the inputs onto every expanded node, fanning out so that each node receives inputs in parallel. If aggregates is set to true, then a Noop transform will be added such that each expanded node's output is fanned back in to pass through that node, which can then be used as an input for other components.

Comment on lines +18 to +21
/// This ways of expanding will take all the components and chain then in order.
/// The first node will be renamed `component_name.0` and so on.
/// If `alias` is set to `true, then a `Noop` transform will be added as the
/// last component and named `component_name` so that it can be used as an input.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, just some small tweaks:

Chain components together one after another. Components will be named according to this order (e.g. component_name.0 and so on). If alias is set to true, then a Noop transform will be added as the last component and given the raw component_name identifier so that it can be used as an input for other components.

Comment on lines 15 to 16
/// This represent the configuration of a single pipeline,
/// not the pipelines transform itself.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/represent/represents/ and I'd say "not the pipelines transform itself, which can contain multiple individual pipelines" just to be more clear.

@@ -43,6 +45,7 @@ impl PipelineConfig {
}
}

/// This represent an ordered list of pipelines depending on the event type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a bit more elaboration. From the description alone I'm still not sure what exactly it does.

@@ -1,3 +1,61 @@
/// This pipelines transform is a bit complex and needs a simple example.
///
/// If we take the following example in consideration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would word this:

If we consider the following example:

Comment on lines +47 to +58
/// The pipelines transform will first expand into 2 parallel transforms for `logs` and
/// `metrics`. A `Noop` transform will be also added to aggregate `logs` and `metrics`
/// into a single transform and to be able to use the transform name (`my_pipelines`) as an input.
///
/// Then the `logs` group of pipelines will be expanded into a `EventFilter` followed by
/// a series `PipelineConfig` via the `EventRouter` transform. At the end, a `Noop` alias is added
/// to be able to refer `logs` as `my_pipelines.logs`.
/// Same thing for the `metrics` group of pipelines.
///
/// Each pipeline will then be expanded into a list of its transforms and at the end of each
/// expansion, a `Noop` transform will be added to use the `pipeline` name as an alias
/// (`my_pipelines.logs.transforms.foo`).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really helpful! Thanks for adding this.

examples: [
{
title: "Filter by log level and reformat"
configuration: """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting, this should be structured data, not a string. @jdrouet was there precedence in another component for using strings here? I can't find one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants