From 1eb775047ec20a15dca57b2fcd922d383a616348 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 29 Jun 2023 11:37:10 -0400 Subject: [PATCH] feat: track runtime schema definitions for log events (#17692) closes https://github.com/vectordotdev/vector/issues/16732 In order for sinks to use semantic meaning, they need a mapping of meanings to fields. This is included in the schema definition of events, but the exact definition that needs to be used depends on the path the event took to get to the sink. The schema definition of an event is tracked at runtime so this can be determined. A `parent_id` was added to event metadata to track the previous component that an event came from, which lets the topology select the correct schema definition to attach to events. For sources, there is only one definition that can be attached (for each port). This is automatically attached in the topology layer (after an event is emitted by a source), so there is no additional work in each source to support this. For transforms, it's slightly more complicated. The schema definition depends on both the output port _and_ the component the event came from. A map is generated at Vector startup, and the correct definition is obtained from that at runtime. This also happens in the topology layer so transforms don't need to worry about this. Previously the `remap` transform had custom code to support runtime schema definitions (for the VRL meaning functions). This was removed since it's now handled automatically. The `reduce` and `lua` transforms are special cases since there is no clear "path" that an event takes through the topology, since multiple events can be merged (from different inputs) in `reduce`. For `lua`, output events may not be related to input events at all. In these cases the schema definition map will have the same value for all inputs (they are all merged). The topology will then arbitrarily pick one (since they are all the same). --------- Signed-off-by: Stephen Wakely Co-authored-by: Stephen Wakely --- lib/vector-core/Cargo.toml | 2 +- lib/vector-core/src/config/mod.rs | 28 +- lib/vector-core/src/event/metadata.rs | 27 +- lib/vector-core/src/event/mod.rs | 14 +- lib/vector-core/src/transform/mod.rs | 155 +++++----- src/config/transform.rs | 3 + src/source_sender/mod.rs | 90 ++++-- src/sources/kafka.rs | 2 +- src/sources/opentelemetry/tests.rs | 8 +- src/sources/socket/mod.rs | 2 +- src/sources/statsd/mod.rs | 4 +- src/test_util/mock/mod.rs | 12 +- src/topology/builder.rs | 29 +- src/topology/test/compliance.rs | 14 + src/topology/test/mod.rs | 54 ++++ src/transforms/aggregate.rs | 9 +- src/transforms/dedupe.rs | 69 +++++ src/transforms/filter.rs | 5 + src/transforms/log_to_metric.rs | 52 +++- src/transforms/lua/v2/mod.rs | 5 +- src/transforms/metric_to_log.rs | 274 +++++++++--------- src/transforms/reduce/mod.rs | 242 ++++++++++------ src/transforms/remap.rs | 113 ++------ src/transforms/route.rs | 4 +- src/transforms/tag_cardinality_limit/tests.rs | 58 ++++ 25 files changed, 847 insertions(+), 428 deletions(-) diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 5326901b16bc0..7317a47e0ea70 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -94,7 +94,7 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua"] } +vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua", "test"] } [features] api = ["dep:async-graphql"] diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 3ff5152a293a7..71786155d1d8f 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, fmt, num::NonZeroUsize}; use bitmask_enum::bitmask; @@ -111,7 +112,7 @@ pub struct SourceOutput { // NOTE: schema definitions are only implemented/supported for log-type events. There is no // inherent blocker to support other types as well, but it'll require additional work to add // the relevant schemas, and store them separately in this type. - pub schema_definition: Option, + pub schema_definition: Option>, } impl SourceOutput { @@ -129,7 +130,7 @@ impl SourceOutput { Self { port: None, ty, - schema_definition: Some(schema_definition), + schema_definition: Some(Arc::new(schema_definition)), } } @@ -168,17 +169,15 @@ impl SourceOutput { /// Schema enabled is set in the users configuration. #[must_use] pub fn schema_definition(&self, schema_enabled: bool) -> Option { + use std::ops::Deref; + self.schema_definition.as_ref().map(|definition| { if schema_enabled { - definition.clone() + definition.deref().clone() } else { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); new_definition } }) @@ -203,7 +202,7 @@ pub struct TransformOutput { /// enabled, at least one definition should be output. If the transform /// has multiple connected sources, it is possible to have multiple output /// definitions - one for each input. - log_schema_definitions: HashMap, + pub log_schema_definitions: HashMap, } impl TransformOutput { @@ -245,11 +244,7 @@ impl TransformOutput { .map(|(output, definition)| { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); (output.clone(), new_definition) }) .collect() @@ -606,7 +601,10 @@ mod test { // There should be the default legacy definition without schemas enabled. assert_eq!( - Some(schema::Definition::default_legacy_namespace()), + Some( + schema::Definition::default_legacy_namespace() + .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork") + ), output.schema_definition(false) ); } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index f13bee6a5e009..d86884be7582c 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -7,7 +7,10 @@ use vector_common::{config::ComponentKey, EventDataEq}; use vrl::value::{Kind, Secrets, Value}; use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus}; -use crate::{config::LogNamespace, schema, ByteSizeOf}; +use crate::{ + config::{LogNamespace, OutputId}, + schema, ByteSizeOf, +}; const DATADOG_API_KEY: &str = "datadog_api_key"; const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token"; @@ -30,8 +33,15 @@ pub struct EventMetadata { /// The id of the source source_id: Option>, + /// The id of the component this event originated from. This is used to + /// determine which schema definition to attach to an event in transforms. + /// This should always have a value set for events in transforms. It will always be `None` + /// in a source, and there is currently no use-case for reading the value in a sink. + upstream_id: Option>, + /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). + /// This definition is only currently valid for logs, and shouldn't be used for other event types. /// /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] @@ -71,17 +81,29 @@ impl EventMetadata { &mut self.secrets } - /// Returns a reference to the metadata source. + /// Returns a reference to the metadata source id. #[must_use] pub fn source_id(&self) -> Option<&Arc> { self.source_id.as_ref() } + /// Returns a reference to the metadata parent id. This is the `OutputId` + /// of the previous component the event was sent through (if any). + #[must_use] + pub fn upstream_id(&self) -> Option<&OutputId> { + self.upstream_id.as_deref() + } + /// Sets the `source_id` in the metadata to the provided value. pub fn set_source_id(&mut self, source_id: Arc) { self.source_id = Some(source_id); } + /// Sets the `upstream_id` in the metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.upstream_id = Some(upstream_id); + } + /// Return the datadog API key, if it exists pub fn datadog_api_key(&self) -> Option> { self.secrets.get(DATADOG_API_KEY).cloned() @@ -111,6 +133,7 @@ impl Default for EventMetadata { finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, + upstream_id: None, } } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index ae2e51e8a23a8..9547f58dc5ed3 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use crate::ByteSizeOf; +use crate::{config::OutputId, ByteSizeOf}; pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray}; pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf; pub use finalization::{ @@ -309,12 +309,24 @@ impl Event { self.metadata_mut().set_source_id(source_id); } + /// Sets the `upstream_id` in the event metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.metadata_mut().set_upstream_id(upstream_id); + } + /// Sets the `source_id` in the event metadata to the provided value. #[must_use] pub fn with_source_id(mut self, source_id: Arc) -> Self { self.metadata_mut().set_source_id(source_id); self } + + /// Sets the `upstream_id` in the event metadata to the provided value. + #[must_use] + pub fn with_upstream_id(mut self, upstream_id: Arc) -> Self { + self.metadata_mut().set_upstream_id(upstream_id); + self + } } impl EventDataEq for Event { diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index a60cd85c8200a..af81c51aa69a1 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, error, pin::Pin}; use futures::{Stream, StreamExt}; @@ -7,13 +8,16 @@ use vector_common::internal_event::{ use vector_common::json_size::JsonSize; use vector_common::EventDataEq; +use crate::config::{ComponentKey, OutputId}; +use crate::event::EventMutRef; +use crate::schema::Definition; use crate::{ config, event::{ into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef, }, fanout::{self, Fanout}, - ByteSizeOf, + schema, ByteSizeOf, }; #[cfg(any(feature = "lua"))] @@ -178,6 +182,8 @@ impl SyncTransform for Box { struct TransformOutput { fanout: Fanout, events_sent: Registered, + log_schema_definitions: HashMap>, + output_id: Arc, } pub struct TransformOutputs { @@ -189,6 +195,7 @@ pub struct TransformOutputs { impl TransformOutputs { pub fn new( outputs_in: Vec, + component_key: &ComponentKey, ) -> (Self, HashMap, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; @@ -197,6 +204,13 @@ impl TransformOutputs { for output in outputs_in { let (fanout, control) = Fanout::new(); + + let log_schema_definitions = output + .log_schema_definitions + .into_iter() + .map(|(id, definition)| (id, Arc::new(definition))) + .collect(); + match output.port { None => { primary_output = Some(TransformOutput { @@ -204,6 +218,11 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( DEFAULT_OUTPUT.into(), )))), + log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: None, + }), }); controls.insert(None, control); } @@ -215,6 +234,11 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( name.clone().into(), )))), + log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: Some(name.clone()), + }), }, ); controls.insert(Some(name.clone()), control); @@ -246,31 +270,61 @@ impl TransformOutputs { buf: &mut TransformOutputsBuf, ) -> Result<(), Box> { if let Some(primary) = self.primary_output.as_mut() { - let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); - let byte_size = buf.primary_buffer.as_ref().map_or( - JsonSize::new(0), - EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, - ); - buf.primary_buffer - .as_mut() - .expect("mismatched outputs") - .send(&mut primary.fanout) - .await?; - primary.events_sent.emit(CountByteSize(count, byte_size)); + let buf = buf.primary_buffer.as_mut().expect("mismatched outputs"); + Self::send_single_buffer(buf, primary).await?; } - for (key, buf) in &mut buf.named_buffers { - let count = buf.len(); - let byte_size = buf.estimated_json_encoded_size_of(); let output = self.named_outputs.get_mut(key).expect("unknown output"); - buf.send(&mut output.fanout).await?; - output.events_sent.emit(CountByteSize(count, byte_size)); + Self::send_single_buffer(buf, output).await?; } + Ok(()) + } + async fn send_single_buffer( + buf: &mut OutputBuffer, + output: &mut TransformOutput, + ) -> Result<(), Box> { + for event in buf.events_mut() { + update_runtime_schema_definition( + event, + &output.output_id, + &output.log_schema_definitions, + ); + } + let count = buf.len(); + let byte_size = buf.estimated_json_encoded_size_of(); + buf.send(&mut output.fanout).await?; + output.events_sent.emit(CountByteSize(count, byte_size)); Ok(()) } } +#[allow(clippy::implicit_hasher)] +/// `event`: The event that will be updated +/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`) +/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event +pub fn update_runtime_schema_definition( + mut event: EventMutRef, + output_id: &Arc, + log_schema_definitions: &HashMap>, +) { + if let EventMutRef::Log(log) = &mut event { + if let Some(parent_component_id) = log.metadata().upstream_id() { + if let Some(definition) = log_schema_definitions.get(parent_component_id) { + log.metadata_mut().set_schema_definition(definition); + } + } else { + // there is no parent defined. That means this event originated from a component that + // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the + // schema definitions _must_ be the same, so the first one is picked + if let Some(definition) = log_schema_definitions.values().next() { + log.metadata_mut().set_schema_definition(definition); + } + } + } + event.metadata_mut().set_upstream_id(Arc::clone(output_id)); +} + #[derive(Debug, Clone)] pub struct TransformOutputsBuf { primary_buffer: Option, @@ -299,34 +353,17 @@ impl TransformOutputsBuf { } } - pub fn push(&mut self, event: Event) { - self.primary_buffer - .as_mut() - .expect("no default output") - .push(event); - } - - pub fn push_named(&mut self, name: &str, event: Event) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .push(event); - } - - pub fn append(&mut self, slice: &mut Vec) { - self.primary_buffer - .as_mut() - .expect("no default output") - .append(slice); - } - - pub fn append_named(&mut self, name: &str, slice: &mut Vec) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .append(slice); + /// Adds a new event to the transform output buffer + pub fn push(&mut self, name: Option<&str>, event: Event) { + match name { + Some(name) => self.named_buffers.get_mut(name), + None => self.primary_buffer.as_mut(), + } + .expect("unknown output") + .push(event); } + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.primary_buffer .as_mut() @@ -334,6 +371,7 @@ impl TransformOutputsBuf { .drain() } + #[cfg(any(feature = "test", test))] pub fn drain_named(&mut self, name: &str) -> impl Iterator + '_ { self.named_buffers .get_mut(name) @@ -341,33 +379,15 @@ impl TransformOutputsBuf { .drain() } - pub fn extend(&mut self, events: impl Iterator) { - self.primary_buffer - .as_mut() - .expect("no default output") - .extend(events); - } - + #[cfg(any(feature = "test", test))] pub fn take_primary(&mut self) -> OutputBuffer { std::mem::take(self.primary_buffer.as_mut().expect("no default output")) } + #[cfg(any(feature = "test", test))] pub fn take_all_named(&mut self) -> HashMap { std::mem::take(&mut self.named_buffers) } - - pub fn len(&self) -> usize { - self.primary_buffer.as_ref().map_or(0, OutputBuffer::len) - + self - .named_buffers - .values() - .map(OutputBuffer::len) - .sum::() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } } impl ByteSizeOf for TransformOutputsBuf { @@ -439,6 +459,7 @@ impl OutputBuffer { }) } + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.0.drain(..).flat_map(EventArray::into_events) } @@ -458,12 +479,12 @@ impl OutputBuffer { self.0.iter().flat_map(EventArray::iter_events) } - pub fn into_events(self) -> impl Iterator { - self.0.into_iter().flat_map(EventArray::into_events) + fn events_mut(&mut self) -> impl Iterator { + self.0.iter_mut().flat_map(EventArray::iter_events_mut) } - pub fn take_events(&mut self) -> Vec { - std::mem::take(&mut self.0) + pub fn into_events(self) -> impl Iterator { + self.0.into_iter().flat_map(EventArray::into_events) } } diff --git a/src/config/transform.rs b/src/config/transform.rs index c2be848d53361..1b9f442ef0786 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -195,6 +195,9 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + &self, enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], + + // This only exists for transforms that create logs from non-logs, to know which namespace + // to use, such as `metric_to_log` global_log_namespace: LogNamespace, ) -> Vec; diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index a4f4eaae3b751..fea4a3980b64d 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -1,4 +1,5 @@ #![allow(missing_docs)] +use std::sync::Arc; use std::{collections::HashMap, fmt}; use chrono::Utc; @@ -19,6 +20,8 @@ use vrl::value::Value; mod errors; +use crate::config::{ComponentKey, OutputId}; +use crate::schema::Definition; pub use errors::{ClosedError, StreamSendError}; use lookup::PathPrefix; @@ -48,17 +51,37 @@ impl Builder { } } - pub fn add_source_output(&mut self, output: SourceOutput) -> LimitedReceiver { + pub fn add_source_output( + &mut self, + output: SourceOutput, + component_key: ComponentKey, + ) -> LimitedReceiver { let lag_time = self.lag_time.clone(); + let log_definition = output.schema_definition.clone(); + let output_id = OutputId { + component: component_key, + port: output.port.clone(), + }; match output.port { None => { - let (inner, rx) = - Inner::new_with_buffer(self.buf_size, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + DEFAULT_OUTPUT.to_owned(), + lag_time, + log_definition, + output_id, + ); self.inner = Some(inner); rx } Some(name) => { - let (inner, rx) = Inner::new_with_buffer(self.buf_size, name.clone(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + name.clone(), + lag_time, + log_definition, + output_id, + ); self.named_inners.insert(name, inner); rx } @@ -91,9 +114,15 @@ impl SourceSender { } } - pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver) { + #[cfg(test)] + pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); + let output_id = OutputId { + component: "test".to_string().into(), + port: None, + }; + let (inner, rx) = + Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id); ( Self { inner: Some(inner), @@ -105,14 +134,14 @@ impl SourceSender { #[cfg(test)] pub fn new_test() -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(test)] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -131,7 +160,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -161,7 +190,11 @@ impl SourceSender { ) -> impl Stream + Unpin { // The lag_time parameter here will need to be filled in if this function is ever used for // non-test situations. - let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None); + let output_id = OutputId { + component: "test".to_string().into(), + port: Some(name.clone()), + }; + let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, None, output_id); let recv = recv.into_stream().map(move |mut events| { events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -225,6 +258,11 @@ struct Inner { output: String, lag_time: Option, events_sent: Registered, + /// The schema definition that will be attached to Log events sent through here + log_definition: Option>, + /// The OutputId related to this source sender. This is set as the `upstream_id` in + /// `EventMetadata` for all event sent through here. + output_id: Arc, } impl fmt::Debug for Inner { @@ -242,6 +280,8 @@ impl Inner { n: usize, output: String, lag_time: Option, + log_definition: Option>, + output_id: OutputId, ) -> (Self, LimitedReceiver) { let (tx, rx) = channel::limited(n); ( @@ -252,16 +292,29 @@ impl Inner { events_sent: register!(EventsSent::from(internal_event::Output(Some( output.into() )))), + log_definition, + output_id: Arc::new(output_id), }, rx, ) } - async fn send(&mut self, events: EventArray) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); events .iter_events() .for_each(|event| self.emit_lag_time(event, reference)); + + events.iter_events_mut().for_each(|mut event| { + // attach runtime schema definitions from the source + if let Some(log_definition) = &self.log_definition { + event.metadata_mut().set_schema_definition(log_definition); + } + event + .metadata_mut() + .set_upstream_id(Arc::clone(&self.output_id)); + }); + let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); self.inner.send(events).await.map_err(|_| ClosedError)?; @@ -290,23 +343,10 @@ impl Inner { E: Into + ByteSizeOf, I: IntoIterator, { - let reference = Utc::now().timestamp_millis(); let events = events.into_iter().map(Into::into); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { - events - .iter_events() - .for_each(|event| self.emit_lag_time(event, reference)); - let cbs = CountByteSize(events.len(), events.estimated_json_encoded_size_of()); - match self.inner.send(events).await { - Ok(()) => { - self.events_sent.emit(cbs); - } - Err(error) => { - return Err(error.into()); - } - } + self.send(events).await?; } - Ok(()) } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index ca904de314a62..8ba6511c53557 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1121,7 +1121,7 @@ mod integration_test { delay: Duration, status: EventStatus, ) -> (SourceSender, impl Stream + Unpin) { - let (pipe, recv) = SourceSender::new_with_buffer(100); + let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100); let recv = BufferReceiver::new(recv.into()).into_stream(); let recv = recv.then(move |mut events| async move { events.iter_logs_mut().for_each(|log| { diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 798759fa1138d..fc538efad199d 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -10,10 +10,12 @@ use opentelemetry_proto::proto::{ }; use similar_asserts::assert_eq; use std::collections::BTreeMap; +use std::sync::Arc; use tonic::Request; use vector_core::config::LogNamespace; use vrl::value; +use crate::config::OutputId; use crate::{ config::{SourceConfig, SourceContext}, event::{into_event_stream, Event, EventStatus, LogEvent, Value}, @@ -269,7 +271,11 @@ async fn receive_grpc_logs_legacy_namespace() { ("observed_timestamp", Utc.timestamp_nanos(2).into()), ("source_type", "opentelemetry".into()), ]); - let expect_event = Event::from(LogEvent::from(expect_vec)); + let mut expect_event = Event::from(LogEvent::from(expect_vec)); + expect_event.set_upstream_id(Arc::new(OutputId { + component: "test".into(), + port: Some("logs".into()), + })); assert_eq!(actual_event, expect_event); }) .await; diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 93366629f3420..58ef30c3fcf99 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -727,7 +727,7 @@ mod test { // shutdown. let addr = next_addr(); - let (source_tx, source_rx) = SourceSender::new_with_buffer(10_000); + let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 467dae41fec30..a1ae446c56d36 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -453,7 +453,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_with_buffer(4096); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -547,7 +547,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_with_buffer(4096); + let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 62b0d96d76f10..3fb594b677e7a 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -30,12 +30,12 @@ pub fn backpressure_source(counter: &Arc) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -43,7 +43,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -75,7 +75,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -84,7 +84,7 @@ pub fn basic_sink_with_data( channel_size: usize, data: &str, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -92,7 +92,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 4a858acb7d113..b7ace14acd57b 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -20,6 +20,7 @@ use vector_common::internal_event::{ self, CountByteSize, EventsSent, InternalEventHandle as _, Registered, }; use vector_core::config::LogNamespace; +use vector_core::transform::update_runtime_schema_definition; use vector_core::{ buffers::{ topology::{ @@ -242,7 +243,7 @@ impl<'a> Builder<'a> { let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); + let mut rx = builder.add_source_output(output.clone(), key.clone()); let (mut fanout, control) = Fanout::new(); let source = Arc::new(key.clone()); @@ -735,6 +736,7 @@ fn build_transform( node.input_details.data_type(), node.typetag, &node.key, + &node.outputs, ), } } @@ -744,7 +746,7 @@ fn build_sync_transform( node: TransformNode, input_rx: BufferReceiver, ) -> (Task, HashMap) { - let (outputs, controls) = TransformOutputs::new(node.outputs); + let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -926,6 +928,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, + outputs: &[TransformOutput], ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -941,8 +944,30 @@ fn build_task_transform( )) }); let events_sent = register!(EventsSent::from(internal_event::Output(None))); + let output_id = Arc::new(OutputId { + component: key.clone(), + port: None, + }); + + // Task transforms can only write to the default output, so only a single schema def map is needed + let schema_definition_map = outputs + .iter() + .find(|x| x.port.is_none()) + .expect("output for default port required for task transforms") + .log_schema_definitions + .clone() + .into_iter() + .map(|(key, value)| (key, Arc::new(value))) + .collect(); + let stream = t .transform(Box::pin(filtered)) + .map(move |mut events| { + for event in events.iter_events_mut() { + update_runtime_schema_definition(event, &output_id, &schema_definition_map); + } + events + }) .inspect(move |events: &EventArray| { events_sent.emit(CountByteSize( events.len(), diff --git a/src/topology/test/compliance.rs b/src/topology/test/compliance.rs index a716d29593998..8f4602aa1bba3 100644 --- a/src/topology/test/compliance.rs +++ b/src/topology/test/compliance.rs @@ -2,8 +2,10 @@ use std::sync::Arc; use tokio::sync::oneshot::{channel, Receiver}; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; use vector_core::event::{Event, EventArray, EventContainer, LogEvent}; +use crate::config::schema::Definition; use crate::{ config::{unit_test::UnitTestSourceConfig, ConfigBuilder}, test_util::{ @@ -57,6 +59,10 @@ async fn test_function_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -78,6 +84,10 @@ async fn test_sync_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -98,6 +108,10 @@ async fn test_task_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index aa5720382e96c..b8b9c3a0fd5d0 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use crate::schema::Definition; use crate::{ config::{Config, ConfigDiff, SinkOuter}, event::{into_event_stream, Event, EventArray, EventContainer, LogEvent}, @@ -27,6 +28,7 @@ use tokio::{ }; use vector_buffers::{BufferConfig, BufferType, WhenFull}; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; mod backpressure; mod compliance; @@ -149,6 +151,10 @@ async fn topology_source_and_sink() { let res = out1.flat_map(into_event_stream).collect::>().await; event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res); } @@ -184,6 +190,16 @@ async fn topology_multiple_sources() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); event2.set_source_id(Arc::new(ComponentKey::from("in2"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out_event1, Some(event1.into())); assert_eq!(out_event2, Some(event2.into())); } @@ -218,6 +234,12 @@ async fn topology_multiple_sinks() { // We should see that both sinks got the exact same event: event.set_source_id(Arc::new(ComponentKey::from("in1"))); + + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let expected = vec![event]; assert_eq!(expected, res1); assert_eq!(expected, res2); @@ -293,6 +315,11 @@ async fn topology_remove_one_source() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let res = h_out1.await.unwrap(); assert_eq!(vec![event1], res); } @@ -332,6 +359,11 @@ async fn topology_remove_one_sink() { event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event], res1); assert_eq!(Vec::::new(), res2); } @@ -442,6 +474,11 @@ async fn topology_swap_source() { assert_eq!(Vec::::new(), res1); event2.set_source_id(Arc::new(ComponentKey::from("in2"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event2], res2); } @@ -554,6 +591,10 @@ async fn topology_swap_sink() { assert_eq!(Vec::::new(), res1); event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event1], res2); } @@ -663,6 +704,15 @@ async fn topology_rebuild_connected() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); event2.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event1, event2], res); } @@ -715,6 +765,10 @@ async fn topology_rebuild_connected_transform() { assert_eq!(Vec::::new(), res1); event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res2); } diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index a591305764df1..ca5a7ae8679cb 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -156,8 +156,10 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; + use vrl::value::Kind; use super::*; + use crate::schema::Definition; use crate::{ event::{metric, Event, Metric}, test_util::components::assert_transform_compliance, @@ -174,8 +176,13 @@ mod tests { kind: metric::MetricKind, value: metric::MetricValue, ) -> Event { - Event::Metric(Metric::new(name, kind, value)) + let mut event = Event::Metric(Metric::new(name, kind, value)) .with_source_id(Arc::new(ComponentKey::from("in"))) + .with_upstream_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event } #[test] diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 4a6497628d78a..513a91ce9115e 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -289,7 +289,9 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; + use vector_core::config::OutputId; + use crate::config::schema::Definition; use crate::{ event::{Event, LogEvent, Value}, test_util::components::assert_transform_compliance, @@ -363,6 +365,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event differs in matched field so should be output even though it @@ -371,6 +378,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); // Third event has the same value for "matched" as first event, so it should be dropped. @@ -413,6 +425,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event has a different matched field name with the same value, @@ -421,6 +438,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -466,6 +488,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event is the same just with different field order, so it @@ -511,6 +538,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event gets output because it's not a dupe. This causes the first @@ -519,6 +552,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(new_event, event2); // Third event is a dupe but gets output anyway because the first @@ -568,6 +607,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -576,6 +620,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -621,6 +670,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -629,6 +683,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -667,6 +726,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through as null is different than @@ -675,6 +739,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 95e8877bee255..e14f0c7347ab7 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -104,6 +104,7 @@ mod test { use vector_core::event::{Metric, MetricKind, MetricValue}; use super::*; + use crate::config::schema::Definition; use crate::{ conditions::ConditionConfig, event::{Event, LogEvent}, @@ -129,6 +130,10 @@ mod test { tx.send(log.clone()).await.unwrap(); log.set_source_id(Arc::new(ComponentKey::from("in"))); + log.set_upstream_id(Arc::new(OutputId::from("transform"))); + log.metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out.recv().await.unwrap(), log); let metric = Event::from(Metric::new( diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index ad44b0a9e6d55..cb99cb186de8b 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, num::ParseFloatError}; use chrono::Utc; @@ -5,6 +6,7 @@ use indexmap::IndexMap; use vector_config::configurable_component; use vector_core::config::LogNamespace; +use crate::config::schema::Definition; use crate::{ config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, @@ -256,7 +258,10 @@ fn to_metric(config: &MetricConfig, event: &Event) -> Result Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); - let mut schema_definition = - Definition::default_for_namespace(&BTreeSet::from([log_namespace])) - .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("namespace"), - Kind::bytes().or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("tags"), - Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), - None, - ) - .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("counter"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("gauge"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("set"), - Kind::object(Collection::empty().with_known( - "values", - Kind::array(Collection::empty().with_unknown(Kind::bytes())), - )) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("distribution"), - Kind::object( - Collection::empty() - .with_known( - "samples", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("value", Kind::float()) - .with_known("rate", Kind::integer()), - )), - ), - ) - .with_known("statistic", Kind::bytes()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_histogram"), - Kind::object( - Collection::empty() - .with_known( - "buckets", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("upper_limit", Kind::float()) - .with_known("count", Kind::integer()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_summary"), - Kind::object( - Collection::empty() - .with_known( - "quantiles", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("quantile", Kind::float()) - .with_known("value", Kind::float()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("sketch"), - Kind::any().or_undefined(), - None, - ); - - match log_namespace { - LogNamespace::Vector => { - // from serializing the Metric (Legacy moves it to another field) - schema_definition = schema_definition.with_event_field( - &owned_value_path!("timestamp"), - Kind::bytes().or_undefined(), - None, - ); - - // This is added as a "marker" field to determine which namespace is being used at runtime. - // This is normally handled automatically by sources, but this is a special case. - schema_definition = schema_definition.with_metadata_field( - &owned_value_path!("vector"), - Kind::object(Collection::empty()), - None, - ); - } - LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - schema_definition = - schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); - } - - schema_definition = schema_definition.with_event_field( - &parse_value_path(log_schema().host_key()).expect("valid host key"), - Kind::bytes().or_undefined(), - None, - ); - } - } + let schema_definition = schema_definition(log_namespace); vec![TransformOutput::new( DataType::Log, @@ -249,6 +120,137 @@ impl TransformConfig for MetricToLogConfig { } } +fn schema_definition(log_namespace: LogNamespace) -> Definition { + let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace])) + .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("namespace"), + Kind::bytes().or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("tags"), + Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), + None, + ) + .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("counter"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("gauge"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("set"), + Kind::object(Collection::empty().with_known( + "values", + Kind::array(Collection::empty().with_unknown(Kind::bytes())), + )) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("distribution"), + Kind::object( + Collection::empty() + .with_known( + "samples", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("value", Kind::float()) + .with_known("rate", Kind::integer()), + )), + ), + ) + .with_known("statistic", Kind::bytes()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_histogram"), + Kind::object( + Collection::empty() + .with_known( + "buckets", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("upper_limit", Kind::float()) + .with_known("count", Kind::integer()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_summary"), + Kind::object( + Collection::empty() + .with_known( + "quantiles", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("quantile", Kind::float()) + .with_known("value", Kind::float()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("sketch"), + Kind::any().or_undefined(), + None, + ); + + match log_namespace { + LogNamespace::Vector => { + // from serializing the Metric (Legacy moves it to another field) + schema_definition = schema_definition.with_event_field( + &owned_value_path!("timestamp"), + Kind::bytes().or_undefined(), + None, + ); + + // This is added as a "marker" field to determine which namespace is being used at runtime. + // This is normally handled automatically by sources, but this is a special case. + schema_definition = schema_definition.with_metadata_field( + &owned_value_path!("vector"), + Kind::object(Collection::empty()), + None, + ); + } + LogNamespace::Legacy => { + if let Some(timestamp_key) = log_schema().timestamp_key() { + schema_definition = + schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); + } + + schema_definition = schema_definition.with_event_field( + &parse_value_path(log_schema().host_key()).expect("valid host key"), + Kind::bytes().or_undefined(), + None, + ); + } + } + schema_definition +} + #[derive(Clone, Debug)] pub struct MetricToLog { host_tag: String, @@ -412,6 +414,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = counter.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(counter).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -440,6 +444,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = gauge.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(gauge).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -468,6 +474,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = set.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(set).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -498,6 +506,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = distro.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(distro).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -547,6 +557,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = histo.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(histo).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -594,6 +606,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = summary.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(summary).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 455a4b142e4d6..90c9294b0cb63 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -26,6 +26,7 @@ use crate::{ mod merge_strategy; +use crate::config::schema::Definition; use crate::event::Value; pub use merge_strategy::*; use vector_core::config::LogNamespace; @@ -133,94 +134,101 @@ impl TransformConfig for ReduceConfig { input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - let mut output_definitions = HashMap::new(); - - for (output, input) in input_definitions { - let mut schema_definition = input.clone(); - - for (key, merge_strategy) in self.merge_strategies.iter() { - let key = if let Ok(key) = parse_target_path(key) { - key - } else { - continue; - }; - - let input_kind = match key.prefix { - PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), - PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), - }; - - let new_kind = match merge_strategy { - MergeStrategy::Discard | MergeStrategy::Retain => { - /* does not change the type */ - input_kind.clone() + // Events may be combined, so there isn't a true single "source" for events. + // All of the definitions must be merged. + let merged_definition: Definition = input_definitions + .iter() + .map(|(_output, definition)| definition.clone()) + .reduce(Definition::merge) + .unwrap_or_else(Definition::any); + + let mut schema_definition = merged_definition; + + for (key, merge_strategy) in self.merge_strategies.iter() { + let key = if let Ok(key) = parse_target_path(key) { + key + } else { + continue; + }; + + let input_kind = match key.prefix { + PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), + PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), + }; + + let new_kind = match merge_strategy { + MergeStrategy::Discard | MergeStrategy::Retain => { + /* does not change the type */ + input_kind.clone() + } + MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { + // only keeps integer / float values + match (input_kind.contains_integer(), input_kind.contains_float()) { + (true, true) => Kind::float().or_integer(), + (true, false) => Kind::integer(), + (false, true) => Kind::float(), + (false, false) => Kind::undefined(), } - MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { - // only keeps integer / float values - match (input_kind.contains_integer(), input_kind.contains_float()) { - (true, true) => Kind::float().or_integer(), - (true, false) => Kind::integer(), - (false, true) => Kind::float(), - (false, false) => Kind::undefined(), - } + } + MergeStrategy::Array => { + let unknown_kind = input_kind.clone(); + Kind::array(Collection::empty().with_unknown(unknown_kind)) + } + MergeStrategy::Concat => { + let mut new_kind = Kind::never(); + + if input_kind.contains_bytes() { + new_kind.add_bytes(); } - MergeStrategy::Array => { - let unknown_kind = input_kind.clone(); - Kind::array(Collection::empty().with_unknown(unknown_kind)) + if let Some(array) = input_kind.as_array() { + // array elements can be either any type that the field can be, or any + // element of the array + let array_elements = array.reduced_kind().union(input_kind.without_array()); + new_kind.add_array(Collection::empty().with_unknown(array_elements)); } - MergeStrategy::Concat => { - let mut new_kind = Kind::never(); - - if input_kind.contains_bytes() { - new_kind.add_bytes(); - } - if let Some(array) = input_kind.as_array() { - // array elements can be either any type that the field can be, or any - // element of the array - let array_elements = - array.reduced_kind().union(input_kind.without_array()); - new_kind.add_array(Collection::empty().with_unknown(array_elements)); - } - new_kind + new_kind + } + MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { + // can only produce bytes (or undefined) + if input_kind.contains_bytes() { + Kind::bytes() + } else { + Kind::undefined() } - MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { - // can only produce bytes (or undefined) - if input_kind.contains_bytes() { - Kind::bytes() - } else { - Kind::undefined() - } + } + MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { + if let Some(array) = input_kind.as_array() { + Kind::array(array.clone()) + } else { + Kind::undefined() } - MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { - if let Some(array) = input_kind.as_array() { - Kind::array(array.clone()) - } else { - Kind::undefined() - } + } + MergeStrategy::FlatUnique => { + let mut array_elements = input_kind.without_array().without_object(); + if let Some(array) = input_kind.as_array() { + array_elements = array_elements.union(array.reduced_kind()); } - MergeStrategy::FlatUnique => { - let mut array_elements = input_kind.without_array().without_object(); - if let Some(array) = input_kind.as_array() { - array_elements = array_elements.union(array.reduced_kind()); - } - if let Some(object) = input_kind.as_object() { - array_elements = array_elements.union(object.reduced_kind()); - } - Kind::array(Collection::empty().with_unknown(array_elements)) + if let Some(object) = input_kind.as_object() { + array_elements = array_elements.union(object.reduced_kind()); } - }; + Kind::array(Collection::empty().with_unknown(array_elements)) + } + }; - // all of the merge strategies are optional. They won't produce a value unless a value actually exists - let new_kind = if input_kind.contains_undefined() { - new_kind.or_undefined() - } else { - new_kind - }; + // all of the merge strategies are optional. They won't produce a value unless a value actually exists + let new_kind = if input_kind.contains_undefined() { + new_kind.or_undefined() + } else { + new_kind + }; - schema_definition = schema_definition.with_field(&key, new_kind, None); - } + schema_definition = schema_definition.with_field(&key, new_kind, None); + } - output_definitions.insert(output.clone(), schema_definition); + // the same schema definition is used for all inputs + let mut output_definitions = HashMap::new(); + for (output, _input) in input_definitions { + output_definitions.insert(output.clone(), schema_definition.clone()); } vec![TransformOutput::new(DataType::Log, output_definitions)] @@ -474,12 +482,15 @@ impl TaskTransform for Reduce { #[cfg(test)] mod test { + use enrichment::TableRegistry; use serde_json::json; + use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vrl::value::Kind; use super::*; + use crate::config::schema::Definition; use crate::event::{LogEvent, Value}; use crate::test_util::components::assert_transform_compliance; use crate::transforms::test::create_topology; @@ -528,18 +539,33 @@ group_by = [ "request_id" ] .schema_definitions(true) .clone(); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_3 = LogEvent::from("test message 3"); e_3.insert("counter", 3); @@ -603,6 +629,18 @@ merge_strategies.baz = "max" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); @@ -610,7 +648,9 @@ merge_strategies.baz = "max" e_1.insert("bar", "first bar"); e_1.insert("baz", 2); e_1.insert("request_id", "1"); - let metadata = e_1.metadata().clone(); + let mut metadata = e_1.metadata().clone(); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); @@ -660,17 +700,32 @@ group_by = [ "request_id" ] assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); @@ -852,20 +907,37 @@ merge_strategies.bar = "concat" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", json!([1, 3])); e_1.insert("bar", json!([1, 3])); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); + tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("foo", json!([2, 4])); e_2.insert("bar", json!([2, 4])); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index a6b01dbc8844d..0cbd2af1c119e 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use std::{ collections::BTreeMap, fs::File, @@ -376,8 +375,6 @@ where drop_on_error: bool, drop_on_abort: bool, reroute_dropped: bool, - default_schema_definition: Arc, - dropped_schema_definition: Arc, runner: Runner, metric_tag_values: MetricTagValues, } @@ -444,28 +441,6 @@ where program: Program, runner: Runner, ) -> crate::Result { - let default_schema_definition = context - .schema_definitions - .get(&None) - .expect("default schema required") - // TODO we can now have multiple possible definitions. - // This is going to need to be updated to store these possible definitions and then - // choose the correct one based on the input the event has come from. - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - - let dropped_schema_definition = context - .schema_definitions - .get(&Some(DROPPED.to_owned())) - .or_else(|| context.schema_definitions.get(&None)) - .expect("dropped schema required") - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - Ok(Remap { component_key: context.key.clone(), program, @@ -475,8 +450,6 @@ where drop_on_error: config.drop_on_error, drop_on_abort: config.drop_on_abort, reroute_dropped: config.reroute_dropped, - default_schema_definition: Arc::new(default_schema_definition), - dropped_schema_definition: Arc::new(dropped_schema_definition), runner, metric_tag_values: config.metric_tag_values, }) @@ -587,13 +560,11 @@ where match result { Ok(_) => match target.into_events() { - TargetEvents::One(event) => { - push_default(event, output, &self.default_schema_definition) + TargetEvents::One(event) => push_default(event, output), + TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)), + TargetEvents::Traces(events) => { + events.for_each(|event| push_default(event, output)) } - TargetEvents::Logs(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), - TargetEvents::Traces(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), }, Err(reason) => { let (reason, error, drop) = match reason { @@ -617,12 +588,12 @@ where if !drop { let event = original_event.expect("event will be set"); - push_default(event, output, &self.default_schema_definition); + push_default(event, output); } else if self.reroute_dropped { let mut event = original_event.expect("event will be set"); self.annotate_dropped(&mut event, reason, error); - push_dropped(event, output, &self.dropped_schema_definition); + push_dropped(event, output); } } } @@ -630,29 +601,13 @@ where } #[inline] -fn push_default( - mut event: Event, - output: &mut TransformOutputsBuf, - schema_definition: &Arc, -) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push(event) +fn push_default(event: Event, output: &mut TransformOutputsBuf) { + output.push(None, event) } #[inline] -fn push_dropped( - mut event: Event, - output: &mut TransformOutputsBuf, - schema_definition: &Arc, -) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push_named(DROPPED, event) +fn push_dropped(event: Event, output: &mut TransformOutputsBuf) { + output.push(Some(DROPPED), event); } /// If the VRL returns a value that is not an array (see [`merge_array_definitions`]), @@ -721,6 +676,7 @@ pub enum BuildError { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::sync::Arc; use indoc::{formatdoc, indoc}; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; @@ -841,10 +797,6 @@ mod tests { let result1 = transform_one(&mut tform, event1).unwrap(); assert_eq!(get_field_string(&result1, "message"), "event1"); assert_eq!(get_field_string(&result1, "foo"), "bar"); - assert_eq!( - result1.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); let event2 = { @@ -854,10 +806,6 @@ mod tests { let result2 = transform_one(&mut tform, event2).unwrap(); assert_eq!(get_field_string(&result2, "message"), "event2"); assert_eq!(result2.as_log().get("foo"), Some(&Value::Null)); - assert_eq!( - result2.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); } @@ -889,11 +837,6 @@ mod tests { assert_eq!(get_field_string(&result, "foo"), "bar"); assert_eq!(get_field_string(&result, "bar"), "baz"); assert_eq!(get_field_string(&result, "copy"), "buz"); - - assert_eq!( - result.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -927,17 +870,8 @@ mod tests { let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "foo"); - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "bar"); - - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -1103,7 +1037,9 @@ mod tests { "zork", MetricKind::Incremental, MetricValue::Counter { value: 1.0 }, - metadata.with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + metadata ) .with_namespace(Some("zerk")) .with_tags(Some(metric_tags! { @@ -1313,8 +1249,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "world", @@ -1331,8 +1270,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "goodbye", @@ -1352,8 +1294,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "not_hello" => "oops", diff --git a/src/transforms/route.rs b/src/transforms/route.rs index adcac43ff504c..e410277914a8f 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -42,13 +42,13 @@ impl SyncTransform for Route { for (output_name, condition) in &self.conditions { let (result, event) = condition.check(event.clone()); if result { - output.push_named(output_name, event); + output.push(Some(output_name), event); } else { check_failed += 1; } } if check_failed == self.conditions.len() { - output.push_named(UNMATCHED_ROUTE, event); + output.push(Some(UNMATCHED_ROUTE), event); } } } diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 8488658e8ea55..5753d0176dd3b 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; use vector_core::metric_tags; use super::*; +use crate::config::schema::Definition; +use crate::config::LogNamespace; use crate::event::metric::TagValue; use crate::event::{metric, Event, Metric, MetricTags}; use crate::test_util::components::assert_transform_compliance; @@ -13,6 +16,7 @@ use crate::transforms::tag_cardinality_limit::config::{ use crate::transforms::test::create_topology; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use vrl::compiler::prelude::Kind; #[test] fn generate_config() { @@ -88,6 +92,16 @@ async fn drop_event(config: TagCardinalityLimitConfig) { event1.set_source_id(Arc::new(ComponentKey::from("in"))); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // Third value rejected since value_limit is 2. @@ -135,6 +149,20 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // The third event should have been modified to remove "tag1" @@ -207,6 +235,21 @@ async fn drop_tag_multi_value(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + drop(tx); topology.stop().await; @@ -257,6 +300,21 @@ async fn separate_value_limit_per_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); assert_eq!(new_event3, Some(event3));