From dd6dddc96bc7c8f10c81207bf956f613321c3068 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 10 Jul 2023 09:53:28 -0400 Subject: [PATCH 01/10] feat(core): Migrate `LogSchema` `source_type_key` to new lookup code --- Cargo.toml | 1 + lib/opentelemetry-proto/src/convert.rs | 2 +- lib/vector-core/src/config/log_schema.rs | 25 ++++++++--------- lib/vector-core/src/config/mod.rs | 2 +- lib/vector-core/src/event/log_event.rs | 10 ++++--- lib/vector-core/src/schema/definition.rs | 6 ++-- .../src/lookup_v2/optional_path.rs | 5 ++++ src/sinks/datadog/events/sink.rs | 4 ++- src/sinks/influxdb/logs.rs | 23 ++++++++------- src/sources/amqp.rs | 4 +-- src/sources/aws_kinesis_firehose/handlers.rs | 2 +- src/sources/aws_s3/sqs.rs | 2 +- src/sources/datadog_agent/mod.rs | 3 +- src/sources/datadog_agent/tests.rs | 16 +++++------ src/sources/dnstap/mod.rs | 10 +++---- src/sources/docker_logs/mod.rs | 8 ++---- src/sources/docker_logs/tests.rs | 6 ++-- src/sources/exec/mod.rs | 6 ++-- src/sources/file.rs | 8 +++--- src/sources/fluent/mod.rs | 4 +-- src/sources/heroku_logs.rs | 8 +++--- src/sources/http_client/client.rs | 20 +++++++------ src/sources/http_client/integration_tests.rs | 4 +-- src/sources/http_server.rs | 14 ++++++---- src/sources/journald.rs | 4 +-- src/sources/kafka.rs | 7 +++-- src/sources/kubernetes_logs/mod.rs | 2 +- src/sources/logstash.rs | 2 +- src/sources/redis/mod.rs | 4 +-- src/sources/socket/mod.rs | 14 +++++----- src/sources/splunk_hec/mod.rs | 28 +++++++++---------- src/sources/syslog.rs | 25 +++++++++++++---- src/sources/util/framestream.rs | 13 +++++---- src/sources/util/message_decoding.rs | 2 +- src/sources/vector/mod.rs | 4 ++- 35 files changed, 167 insertions(+), 131 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ec07aaff16ba..7c4a145eca43c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,7 @@ members = [ [workspace.dependencies] vrl = { version = "0.5.0", features = ["cli", "test", "test_framework", "arbitrary"] } +#vrl = { path = "../vrl", features = ["cli", "test", "test_framework", "arbitrary"] } [dependencies] vrl.workspace = true diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 4ccfda0b726ba..057561df755bb 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -208,7 +208,7 @@ impl ResourceLog { log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(SOURCE_NAME.as_bytes()), ); diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index d54575f89f252..f4921a9bceeb7 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -1,6 +1,7 @@ -use lookup::lookup_v2::{parse_target_path, OptionalValuePath}; -use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath}; +use lookup::lookup_v2::{OptionalValuePath}; +use lookup::{OwnedTargetPath, OwnedValuePath}; use once_cell::sync::{Lazy, OnceCell}; +use vrl::path::parse_target_path; use vector_config::configurable_component; static LOG_SCHEMA: OnceCell = OnceCell::new(); @@ -60,7 +61,7 @@ pub struct LogSchema { /// /// This field will be set by the Vector source that the event was created in. #[serde(default = "LogSchema::default_source_type_key")] - source_type_key: String, + source_type_key: OptionalValuePath, /// The name of the event field to set the event metadata in. /// @@ -88,17 +89,15 @@ impl LogSchema { } fn default_timestamp_key() -> OptionalValuePath { - OptionalValuePath { - path: Some(owned_value_path!("timestamp")), - } + OptionalValuePath::new("timestamp") } fn default_host_key() -> String { String::from("host") } - fn default_source_type_key() -> String { - String::from("source_type") + fn default_source_type_key() -> OptionalValuePath { + OptionalValuePath::new("source_type") } fn default_metadata_key() -> String { @@ -126,8 +125,8 @@ impl LogSchema { &self.host_key } - pub fn source_type_key(&self) -> &str { - &self.source_type_key + pub fn source_type_key(&self) -> Option<&OwnedValuePath> { + self.source_type_key.path.as_ref() } pub fn metadata_key(&self) -> &str { @@ -146,8 +145,8 @@ impl LogSchema { self.host_key = v; } - pub fn set_source_type_key(&mut self, v: String) { - self.source_type_key = v; + pub fn set_source_type_key(&mut self, path: Option) { + self.source_type_key = OptionalValuePath { path }; } pub fn set_metadata_key(&mut self, v: String) { @@ -191,7 +190,7 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned()); } else { - self.set_source_type_key(other.source_type_key().to_string()); + self.set_source_type_key(other.source_type_key().cloned()); } if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key() && self.metadata_key() != other.metadata_key() diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 71786155d1d8f..609bf59b5b58f 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -476,7 +476,7 @@ impl LogNamespace { ) { self.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(source_name.as_bytes()), ); diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index e5755f12d7e66..e11423e122d13 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -466,10 +466,10 @@ impl LogEvent { /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace). // TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the // "Global Log Schema" are updated to the new path lookup code - pub fn source_type_path(&self) -> &'static str { + pub fn source_type_path(&self) -> Option { match self.namespace() { - LogNamespace::Vector => "%vector.source_type", - LogNamespace::Legacy => log_schema().source_type_key(), + LogNamespace::Vector => Some("%vector.source_type".to_string()), + LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string), } } @@ -514,7 +514,9 @@ impl LogEvent { pub fn get_source_type(&self) -> Option<&Value> { match self.namespace() { LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")), - LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())), + LogNamespace::Legacy => log_schema() + .source_type_key() + .and_then(|key| self.get((PathPrefix::Event, key))), } } } diff --git a/lib/vector-core/src/schema/definition.rs b/lib/vector-core/src/schema/definition.rs index a3c5afc034cb4..421b0b91a043a 100644 --- a/lib/vector-core/src/schema/definition.rs +++ b/lib/vector-core/src/schema/definition.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, BTreeSet}; use crate::config::{log_schema, LegacyKey, LogNamespace}; -use lookup::lookup_v2::{parse_value_path, TargetPath}; +use lookup::lookup_v2::TargetPath; use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix}; use vrl::value::{kind::Collection, Kind}; @@ -144,9 +144,7 @@ impl Definition { #[must_use] pub fn with_standard_vector_source_metadata(self) -> Self { self.with_vector_metadata( - parse_value_path(log_schema().source_type_key()) - .ok() - .as_ref(), + log_schema().source_type_key(), &owned_value_path!("source_type"), Kind::bytes(), None, diff --git a/lib/vector-lookup/src/lookup_v2/optional_path.rs b/lib/vector-lookup/src/lookup_v2/optional_path.rs index 9328aa8a2f138..6d77216c59628 100644 --- a/lib/vector-lookup/src/lookup_v2/optional_path.rs +++ b/lib/vector-lookup/src/lookup_v2/optional_path.rs @@ -1,3 +1,4 @@ +use vrl::owned_value_path; use vector_config::configurable_component; use crate::lookup_v2::PathParseError; @@ -56,6 +57,10 @@ impl OptionalValuePath { pub fn none() -> Self { Self { path: None } } + + pub fn new(path: &str) -> Self { + Self { path: Some(owned_value_path!(path)) } + } } impl TryFrom for OptionalValuePath { diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 0ee8538763268..19a7646c658b8 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option { } if !log.contains("source_type_name") { - log.rename_key(log.source_type_path(), "source_type_name") + if let Some(source_type_path) = log.source_type_path() { + log.rename_key(source_type_path.as_str(), "source_type_name") + } } Some(Event::from(log)) diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index ef901ff4e66aa..8d46907f5d7d5 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -206,11 +206,7 @@ impl SinkConfig for InfluxDbLogsConfig { let source_type_key = self .source_type_key .clone() - .and_then(|k| k.path) - .unwrap_or_else(|| { - parse_value_path(log_schema().source_type_key()) - .expect("global log_schema.source_type_key to be valid path") - }); + .and_then(|k| k.path).or(log_schema().source_type_key().cloned()).unwrap(); let sink = InfluxDbLogsSink { uri, @@ -280,11 +276,18 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { self.tags.replace(host_path.clone()); log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key)); } - self.tags.replace(log.source_type_path().to_string()); - log.rename_key( - log.source_type_path(), - (PathPrefix::Event, &self.source_type_key), - ); + + if let Some(source_type_path) = log.source_type_path() { + self.tags.replace(source_type_path); + } + + if let Some(source_type_key) = log_schema().source_type_key() { + log.rename_key( + (PathPrefix::Event, source_type_key), + (PathPrefix::Event, &self.source_type_key), + ); + } + self.tags.replace("metric_type".to_string()); log.insert("metric_type", "logs"); diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index f5cf22f493a61..445e38b6afd82 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -278,7 +278,7 @@ fn populate_event( log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()), ); @@ -713,7 +713,7 @@ mod integration_test { trace!("{:?}", log); assert_eq!(log[log_schema().message_key()], "my message".into()); assert_eq!(log["routing"], routing_key.into()); - assert_eq!(log[log_schema().source_type_key()], "amqp".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "amqp".into()); let log_ts = log[log_schema().timestamp_key().unwrap().to_string()] .as_timestamp() .unwrap(); diff --git a/src/sources/aws_kinesis_firehose/handlers.rs b/src/sources/aws_kinesis_firehose/handlers.rs index 11bc526b0648c..388e6187cd20f 100644 --- a/src/sources/aws_kinesis_firehose/handlers.rs +++ b/src/sources/aws_kinesis_firehose/handlers.rs @@ -93,7 +93,7 @@ pub(super) async fn firehose( if let Event::Log(ref mut log) = event { log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()), ); diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 3074a8f383d6e..508f6abbaed68 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -691,7 +691,7 @@ fn handle_single_log( log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(AwsS3Config::NAME.as_bytes()), ); diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 2e4dbf6656597..095d6222d2ee6 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -334,7 +334,8 @@ impl DatadogAgentSource { .expect("static regex always compiles"), }, log_schema_host_key: log_schema().host_key(), - log_schema_source_type_key: log_schema().source_type_key(), + log_schema_source_type_key: log_schema().source_type_key() + .map_or("", |key| Box::leak(key.to_string().into_boxed_str())), decoder, protocol, logs_schema_definition: Arc::new(logs_schema_definition), diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 4fc3ef4321ee1..beda54fc6738e 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -238,7 +238,7 @@ async fn full_payload_v1() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -300,7 +300,7 @@ async fn full_payload_v2() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -362,7 +362,7 @@ async fn no_api_key() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -423,7 +423,7 @@ async fn api_key_in_url() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -488,7 +488,7 @@ async fn api_key_in_query_params() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -559,7 +559,7 @@ async fn api_key_in_header() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -706,7 +706,7 @@ async fn ignores_api_key() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert!(event.metadata().datadog_api_key().is_none()); assert_eq!( event.metadata().schema_definition(), @@ -1398,7 +1398,7 @@ async fn split_outputs() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index 69a9d13728805..231c5fd096245 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -212,7 +212,7 @@ pub struct DnstapFrameHandler { socket_send_buffer_size: Option, host_key: Option, timestamp_key: Option, - source_type_key: String, + source_type_key: Option, bytes_received: Registered, log_namespace: LogNamespace, } @@ -242,7 +242,7 @@ impl DnstapFrameHandler { socket_send_buffer_size: config.socket_send_buffer_size, host_key, timestamp_key: timestamp_key.cloned(), - source_type_key: source_type_key.to_string(), + source_type_key: source_type_key.cloned(), bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))), log_namespace, } @@ -307,7 +307,7 @@ impl FrameHandler for DnstapFrameHandler { self.log_namespace.insert_vector_metadata( &mut log_event, - Some(self.source_type_key()), + self.source_type_key(), path!("source_type"), DnstapConfig::NAME, ); @@ -343,8 +343,8 @@ impl FrameHandler for DnstapFrameHandler { &self.host_key } - fn source_type_key(&self) -> &str { - self.source_type_key.as_str() + fn source_type_key(&self) -> Option<&OwnedValuePath> { + self.source_type_key.as_ref() } fn timestamp_key(&self) -> Option<&OwnedValuePath> { diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 9dc7ffeeedd09..79be761ddcaec 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -14,7 +14,7 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc}; use codecs::{BytesDeserializer, BytesDeserializerConfig}; use futures::{Stream, StreamExt}; use lookup::{ - lookup_v2::{parse_value_path, OptionalValuePath}, + lookup_v2::{OptionalValuePath}, metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix, }; use once_cell::sync::Lazy; @@ -338,9 +338,7 @@ impl SourceConfig for DockerLogsConfig { Some("timestamp"), ) .with_vector_metadata( - parse_value_path(log_schema().source_type_key()) - .ok() - .as_ref(), + log_schema().source_type_key(), &owned_value_path!("source_type"), Kind::bytes(), None, @@ -1119,7 +1117,7 @@ impl ContainerLogInfo { log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(DockerLogsConfig::NAME.as_bytes()), ); diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index c9569209074ba..d1f55d885e996 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -447,7 +447,7 @@ mod integration_tests { assert!(log.get(format!("label.{}", label).as_str()).is_some()); assert_eq!(events[0].as_log()[&NAME], name.into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], DockerLogsConfig::NAME.into() ); }) @@ -654,7 +654,7 @@ mod integration_tests { assert!(log.get(format!("label.{}", label).as_str()).is_some()); assert_eq!(events[0].as_log()[&NAME], name.into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], DockerLogsConfig::NAME.into() ); }) @@ -795,7 +795,7 @@ mod integration_tests { .is_some()); assert_eq!(events[0].as_log()[&NAME], name.into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], DockerLogsConfig::NAME.into() ); }) diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 8af71919a3d76..9093039d2ffcb 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -761,7 +761,7 @@ mod tests { assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "exec".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "exec".into()); assert!(log .get(( lookup::PathPrefix::Event, @@ -842,7 +842,7 @@ mod tests { assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "exec".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "exec".into()); assert!(log .get(( lookup::PathPrefix::Event, @@ -1041,7 +1041,7 @@ mod tests { let log = event.as_log(); assert_eq!(log[COMMAND_KEY], config.command.clone().into()); assert_eq!(log[STREAM_KEY], STDOUT.into()); - assert_eq!(log[log_schema().source_type_key()], "exec".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "exec".into()); assert_eq!(log[log_schema().message_key()], "Hello World!".into()); assert_eq!(log[log_schema().host_key()], "Some.Machine".into()); assert!(log.get(PID_KEY).is_some()); diff --git a/src/sources/file.rs b/src/sources/file.rs index 1c48a2cefe298..3c7e58a5dfe09 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -751,7 +751,7 @@ fn create_event( log_namespace.insert_vector_metadata( &mut event, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(FileConfig::NAME.as_bytes()), ); @@ -1037,7 +1037,7 @@ mod tests { assert_eq!(log["host"], "Some.Machine".into()); assert_eq!(log["offset"], 0.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "file".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "file".into()); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -1059,7 +1059,7 @@ mod tests { assert_eq!(log["hostname"], "Some.Machine".into()); assert_eq!(log["off"], 0.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "file".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "file".into()); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -1466,7 +1466,7 @@ mod tests { log_schema().host_key().to_string(), log_schema().message_key().to_string(), log_schema().timestamp_key().unwrap().to_string(), - log_schema().source_type_key().to_string() + log_schema().source_type_key().unwrap().to_string() ] .into_iter() .collect::>() diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index f25f413590324..e6fbb45b31d75 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -587,7 +587,7 @@ impl From> for LogEvent { log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(FluentConfig::NAME.as_bytes()), ); @@ -665,7 +665,7 @@ mod tests { Event::Log(LogEvent::from(BTreeMap::from([ (String::from("message"), Value::from(name)), ( - String::from(log_schema().source_type_key()), + log_schema().source_type_key().unwrap().to_string(), Value::from(FluentConfig::NAME), ), (String::from("tag"), Value::from("tag.name")), diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 2cdf7d7d44424..1369f570fbb18 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -531,7 +531,7 @@ mod tests { .into() ); assert_eq!(log[&log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); assert_eq!(log["appname"], "lumberjack-store".into()); assert_eq!(log["absent"], Value::Null); }).await; @@ -614,7 +614,7 @@ mod tests { .into() ); assert_eq!(log[log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); } #[test] @@ -634,7 +634,7 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); } #[test] @@ -653,7 +653,7 @@ mod tests { .into() ); assert_eq!(log[log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); } #[test] diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 55036ccde446b..fc2a39e0e5743 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -328,16 +328,20 @@ impl http_client::HttpClientContext for HttpClientContext { ); } Event::Metric(ref mut metric) => { - metric.replace_tag( - log_schema().source_type_key().to_string(), - HttpClientConfig::NAME.to_string(), - ); + if let Some(source_type_key) = log_schema().source_type_key() { + metric.replace_tag( + source_type_key.to_string(), + HttpClientConfig::NAME.to_string(), + ); + } } Event::Trace(ref mut trace) => { - trace.insert( - log_schema().source_type_key(), - Bytes::from(HttpClientConfig::NAME), - ); + if let Some(source_type_key) = log_schema().source_type_key() { + trace.insert( + source_type_key.to_string(), + Bytes::from(HttpClientConfig::NAME), + ); + } } } } diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index f7b04403e45d3..8b3f8c76d3ee4 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -84,7 +84,7 @@ async fn collected_logs_bytes() { // panics if not log event let log = events[0].as_log(); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], HttpClientConfig::NAME.into() ); } @@ -108,7 +108,7 @@ async fn collected_logs_json() { // panics if not log event let log = events[0].as_log(); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], HttpClientConfig::NAME.into() ); } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 2bd2afecee34f..d3460085490c6 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -662,7 +662,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); assert_eq!(log["http_path"], "/".into()); @@ -958,9 +958,11 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); + + let source_type_key_value = log.get((lookup::PathPrefix::Event, log_schema().source_type_key().unwrap())).unwrap(); assert_eq!( - log[log_schema().source_type_key()], - SimpleHttpConfig::NAME.into() + source_type_key_value.to_string(), + SimpleHttpConfig::NAME ); assert_eq!(log["http_path"], "/".into()); } @@ -1131,7 +1133,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); } @@ -1184,7 +1186,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); } @@ -1200,7 +1202,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 3ec938bda0b47..c28faf4318e24 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -756,7 +756,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) { // Add source type. log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), JournaldConfig::NAME, ); @@ -1165,7 +1165,7 @@ mod tests { Value::Bytes("System Initialization".into()) ); assert_eq!( - received[0].as_log()[log_schema().source_type_key()], + received[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "journald".into() ); assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000)); diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 8ba6511c53557..ede1fe78fc60d 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -23,6 +23,7 @@ use rdkafka::{ use serde_with::serde_as; use snafu::{ResultExt, Snafu}; use tokio_util::codec::FramedRead; +use vrl::path::PathPrefix; use vector_common::finalizer::OrderedFinalizer; use vector_config::configurable_component; @@ -602,7 +603,9 @@ impl ReceivedMessage { ); } LogNamespace::Legacy => { - log.insert(log_schema().source_type_key(), KafkaSourceConfig::NAME); + if let Some(source_type_key) = log_schema().source_type_key() { + log.insert((PathPrefix::Event, source_type_key), KafkaSourceConfig::NAME); + } } } @@ -1054,7 +1057,7 @@ mod integration_test { format!("{} {}", KEY, i).into() ); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "kafka".into() ); assert_eq!( diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index bd004a778881b..5b22564625cb6 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -910,7 +910,7 @@ fn create_event( log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from(Config::NAME), ); diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 8d58f09a10c42..2cbabbfacbc4c 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -209,7 +209,7 @@ impl TcpSource for LogstashSource { self.log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(LogstashConfig::NAME.as_bytes()), ); diff --git a/src/sources/redis/mod.rs b/src/sources/redis/mod.rs index 03f4da5fe5dd8..1239a04c60915 100644 --- a/src/sources/redis/mod.rs +++ b/src/sources/redis/mod.rs @@ -256,7 +256,7 @@ impl InputHandler { if let Event::Log(ref mut log) = event { self.log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from(RedisSourceConfig::NAME), ); @@ -482,7 +482,7 @@ mod integration_test { for event in events { assert_eq!(event.as_log()[log_schema().message_key()], text.into()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], RedisSourceConfig::NAME.into() ); } diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index d381b65cecfdd..910bf6617a697 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -497,7 +497,7 @@ mod test { let event = rx.next().await.unwrap(); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1121,7 +1121,7 @@ mod test { let events = collect_n(rx, 1).await; assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1327,7 +1327,7 @@ mod test { "test".into() ); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into()); @@ -1419,7 +1419,7 @@ mod test { "foo\nbar".into() ); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1506,7 +1506,7 @@ mod test { "test".into() ); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1546,12 +1546,12 @@ mod test { assert_eq!(events.len(), 2); assert_eq!(events[0].as_log()[log_schema().message_key()], "foo".into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); assert_eq!(events[1].as_log()[log_schema().message_key()], "bar".into()); assert_eq!( - events[1].as_log()[log_schema().source_type_key()], + events[1].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 222ffc2487041..be8506c9b1b81 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -661,7 +661,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { // Add source type self.log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), lookup::path!("source_type"), SplunkConfig::NAME, ); @@ -1420,7 +1420,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1447,7 +1447,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1478,7 +1478,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1506,7 +1506,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1537,7 +1537,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1567,7 +1567,7 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!(event[log_schema().source_type_key()], "splunk_hec".into()); + assert_eq!(event[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into()); assert!(event.metadata().splunk_hec_token().is_none()); } @@ -1608,7 +1608,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1635,7 +1635,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1874,7 +1874,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert_eq!( @@ -1951,7 +1951,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }) @@ -1981,7 +1981,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }) @@ -2009,7 +2009,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }) @@ -2038,7 +2038,7 @@ mod tests { assert_eq!(event.as_log()["bool"], true.into()); assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }).await; diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 728cda24a6c46..2d88d5a26560e 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -811,7 +811,10 @@ mod test { .single() .expect("invalid timestamp"), ); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert(( + PathPrefix::Event, + log_schema().source_type_key().unwrap(), + ), "syslog"); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); @@ -863,7 +866,10 @@ mod test { ); expected.insert(log_schema().host_key(), "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert(( + PathPrefix::Event, + log_schema().source_type_key().unwrap(), + ), "syslog"); expected.insert("severity", "notice"); expected.insert("facility", "user"); expected.insert("version", 1); @@ -1003,7 +1009,10 @@ mod test { expected_date, ); expected.insert(log_schema().host_key(), "74794bfb6795"); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert(( + PathPrefix::Event, + log_schema().source_type_key().unwrap(), + ), "syslog"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "notice"); expected.insert("facility", "user"); @@ -1048,7 +1057,10 @@ mod test { ), expected_date, ); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert(( + PathPrefix::Event, + log_schema().source_type_key().unwrap(), + ), "syslog"); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "info"); @@ -1085,7 +1097,10 @@ mod test { .and_then(|t| t.with_nanosecond(605_850 * 1000)) .expect("invalid timestamp"), ); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert(( + PathPrefix::Event, + log_schema().source_type_key().unwrap(), + ), "syslog"); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "info"); diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 8bae468a85241..16c9944725822 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -350,7 +350,7 @@ pub trait FrameHandler { fn socket_send_buffer_size(&self) -> Option; fn host_key(&self) -> &Option; fn timestamp_key(&self) -> Option<&OwnedValuePath>; - fn source_type_key(&self) -> &str; + fn source_type_key(&self) -> Option<&OwnedValuePath>; } /** @@ -603,6 +603,7 @@ mod test { time::{Duration, Instant}, }; use tokio_util::codec::{length_delimited, Framed}; + use vrl::path::PathPrefix; use vector_core::config::{LegacyKey, LogNamespace}; use super::{ @@ -630,7 +631,7 @@ mod test { extra_task_handling_routine: F, host_key: Option, timestamp_key: Option, - source_type_key: String, + source_type_key: Option, log_namespace: LogNamespace, } @@ -648,7 +649,7 @@ mod test { extra_task_handling_routine: extra_routine, host_key: Some(owned_value_path!("test_framestream")), timestamp_key: Some(owned_value_path!("my_timestamp")), - source_type_key: "source_type".to_string(), + source_type_key: Some(owned_value_path!("source_type")), log_namespace: LogNamespace::Legacy, } } @@ -665,7 +666,7 @@ mod test { fn handle_event(&self, received_from: Option, frame: Bytes) -> Option { let mut log_event = LogEvent::from(frame); - log_event.insert(log_schema().source_type_key(), "framestream"); + log_event.insert((PathPrefix::Event, log_schema().source_type_key().unwrap()), "framestream"); if let Some(host) = received_from { self.log_namespace.insert_source_metadata( "framestream", @@ -711,8 +712,8 @@ mod test { self.timestamp_key.as_ref() } - fn source_type_key(&self) -> &str { - self.source_type_key.as_str() + fn source_type_key(&self) -> Option<&OwnedValuePath> { + self.source_type_key.as_ref() } } diff --git a/src/sources/util/message_decoding.rs b/src/sources/util/message_decoding.rs index 3d9b959db678f..379f8c1b09583 100644 --- a/src/sources/util/message_decoding.rs +++ b/src/sources/util/message_decoding.rs @@ -33,7 +33,7 @@ pub fn decode_message<'a>( if let Event::Log(ref mut log) = event { log_namespace.insert_vector_metadata( log, - Some(schema.source_type_key()), + schema.source_type_key(), path!("source_type"), Bytes::from(source_type), ); diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index 44b0921d75bcd..9a141a2e1851e 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -273,6 +273,7 @@ mod test { #[cfg(feature = "sinks-vector")] #[cfg(test)] mod tests { + use vrl::path::PathPrefix; use vector_common::assert_event_data_eq; use vector_core::config::log_schema; @@ -305,10 +306,11 @@ mod tests { let (mut events, stream) = test_util::random_events_with_stream(100, 100, None); sink.run(stream).await.unwrap(); + let source_type_key = log_schema().source_type_key().unwrap(); for event in &mut events { event .as_mut_log() - .insert(log_schema().source_type_key(), "vector"); + .insert((PathPrefix::Event, source_type_key), "vector"); } let output = test_util::collect_ready(rx).await; From 21d9e313e0c4af1fd7b99c98c99ed687bebb61fe Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Jul 2023 09:28:24 -0400 Subject: [PATCH 02/10] remove debugging line --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7c4a145eca43c..5ec07aaff16ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,6 @@ members = [ [workspace.dependencies] vrl = { version = "0.5.0", features = ["cli", "test", "test_framework", "arbitrary"] } -#vrl = { path = "../vrl", features = ["cli", "test", "test_framework", "arbitrary"] } [dependencies] vrl.workspace = true From 6651128aabc8bd6f7470da81ff184a59ab0080fe Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Jul 2023 09:59:49 -0400 Subject: [PATCH 03/10] ran cargo vdev fmt --- lib/vector-core/src/config/log_schema.rs | 4 +- lib/vector-core/src/config/mod.rs | 5 ++- .../src/lookup_v2/optional_path.rs | 6 ++- src/sinks/influxdb/logs.rs | 6 ++- src/sources/amqp.rs | 5 ++- src/sources/datadog_agent/mod.rs | 3 +- src/sources/datadog_agent/tests.rs | 40 +++++++++++++++---- src/sources/docker_logs/mod.rs | 3 +- src/sources/exec/mod.rs | 15 +++++-- src/sources/file.rs | 10 ++++- src/sources/heroku_logs.rs | 15 +++++-- src/sources/http_server.rs | 12 +++--- src/sources/kafka.rs | 5 ++- src/sources/splunk_hec/mod.rs | 5 ++- src/sources/syslog.rs | 40 +++++++++---------- src/sources/util/framestream.rs | 7 +++- src/sources/vector/mod.rs | 2 +- 17 files changed, 125 insertions(+), 58 deletions(-) diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index f4921a9bceeb7..9f9eab7cbc519 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -1,8 +1,8 @@ -use lookup::lookup_v2::{OptionalValuePath}; +use lookup::lookup_v2::OptionalValuePath; use lookup::{OwnedTargetPath, OwnedValuePath}; use once_cell::sync::{Lazy, OnceCell}; -use vrl::path::parse_target_path; use vector_config::configurable_component; +use vrl::path::parse_target_path; static LOG_SCHEMA: OnceCell = OnceCell::new(); static LOG_SCHEMA_DEFAULT: Lazy = Lazy::new(LogSchema::default); diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 609bf59b5b58f..ed3dde2bae112 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -551,14 +551,15 @@ mod test { use chrono::Utc; use lookup::{event_path, owned_value_path, OwnedTargetPath}; use vector_common::btreemap; + use vrl::path::OwnedValuePath; use vrl::value::Kind; #[test] fn test_insert_standard_vector_source_metadata() { - let nested_path = "a.b.c.d"; + let nested_path = "a.b.c.d".to_string(); let mut schema = LogSchema::default(); - schema.set_source_type_key(nested_path.to_owned()); + schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap())); init_log_schema(schema, false); let namespace = LogNamespace::Legacy; diff --git a/lib/vector-lookup/src/lookup_v2/optional_path.rs b/lib/vector-lookup/src/lookup_v2/optional_path.rs index 6d77216c59628..ee15ed3509cf6 100644 --- a/lib/vector-lookup/src/lookup_v2/optional_path.rs +++ b/lib/vector-lookup/src/lookup_v2/optional_path.rs @@ -1,5 +1,5 @@ -use vrl::owned_value_path; use vector_config::configurable_component; +use vrl::owned_value_path; use crate::lookup_v2::PathParseError; use crate::{OwnedTargetPath, OwnedValuePath}; @@ -59,7 +59,9 @@ impl OptionalValuePath { } pub fn new(path: &str) -> Self { - Self { path: Some(owned_value_path!(path)) } + Self { + path: Some(owned_value_path!(path)), + } } } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 8d46907f5d7d5..5e8f5b33dc6e8 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -206,7 +206,9 @@ impl SinkConfig for InfluxDbLogsConfig { let source_type_key = self .source_type_key .clone() - .and_then(|k| k.path).or(log_schema().source_type_key().cloned()).unwrap(); + .and_then(|k| k.path) + .or(log_schema().source_type_key().cloned()) + .unwrap(); let sink = InfluxDbLogsSink { uri, @@ -281,7 +283,7 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { self.tags.replace(source_type_path); } - if let Some(source_type_key) = log_schema().source_type_key() { + if let Some(source_type_key) = log_schema().source_type_key() { log.rename_key( (PathPrefix::Event, source_type_key), (PathPrefix::Event, &self.source_type_key), diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index 445e38b6afd82..bbd93d1c81aeb 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -713,7 +713,10 @@ mod integration_test { trace!("{:?}", log); assert_eq!(log[log_schema().message_key()], "my message".into()); assert_eq!(log["routing"], routing_key.into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "amqp".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "amqp".into() + ); let log_ts = log[log_schema().timestamp_key().unwrap().to_string()] .as_timestamp() .unwrap(); diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 095d6222d2ee6..ba2f8c8bc7db9 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -334,7 +334,8 @@ impl DatadogAgentSource { .expect("static regex always compiles"), }, log_schema_host_key: log_schema().host_key(), - log_schema_source_type_key: log_schema().source_type_key() + log_schema_source_type_key: log_schema() + .source_type_key() .map_or("", |key| Box::leak(key.to_string().into_boxed_str())), decoder, protocol, diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index beda54fc6738e..9647688279027 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -238,7 +238,10 @@ async fn full_payload_v1() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -300,7 +303,10 @@ async fn full_payload_v2() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -362,7 +368,10 @@ async fn no_api_key() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -423,7 +432,10 @@ async fn api_key_in_url() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -488,7 +500,10 @@ async fn api_key_in_query_params() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -559,7 +574,10 @@ async fn api_key_in_header() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -706,7 +724,10 @@ async fn ignores_api_key() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert!(event.metadata().datadog_api_key().is_none()); assert_eq!( event.metadata().schema_definition(), @@ -1398,7 +1419,10 @@ async fn split_outputs() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 79be761ddcaec..a37ae3260946c 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -14,8 +14,7 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc}; use codecs::{BytesDeserializer, BytesDeserializerConfig}; use futures::{Stream, StreamExt}; use lookup::{ - lookup_v2::{OptionalValuePath}, - metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix, + lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix, }; use once_cell::sync::Lazy; use serde_with::serde_as; diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 9093039d2ffcb..627a4f123bbf9 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -761,7 +761,10 @@ mod tests { assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "exec".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "exec".into() + ); assert!(log .get(( lookup::PathPrefix::Event, @@ -842,7 +845,10 @@ mod tests { assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "exec".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "exec".into() + ); assert!(log .get(( lookup::PathPrefix::Event, @@ -1041,7 +1047,10 @@ mod tests { let log = event.as_log(); assert_eq!(log[COMMAND_KEY], config.command.clone().into()); assert_eq!(log[STREAM_KEY], STDOUT.into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "exec".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "exec".into() + ); assert_eq!(log[log_schema().message_key()], "Hello World!".into()); assert_eq!(log[log_schema().host_key()], "Some.Machine".into()); assert!(log.get(PID_KEY).is_some()); diff --git a/src/sources/file.rs b/src/sources/file.rs index 3c7e58a5dfe09..230e8aac42ffb 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1037,7 +1037,10 @@ mod tests { assert_eq!(log["host"], "Some.Machine".into()); assert_eq!(log["offset"], 0.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "file".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "file".into() + ); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -1059,7 +1062,10 @@ mod tests { assert_eq!(log["hostname"], "Some.Machine".into()); assert_eq!(log["off"], 0.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "file".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "file".into() + ); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 1369f570fbb18..c2a6b7959ac44 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -614,7 +614,10 @@ mod tests { .into() ); assert_eq!(log[log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "heroku_logs".into() + ); } #[test] @@ -634,7 +637,10 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "heroku_logs".into() + ); } #[test] @@ -653,7 +659,10 @@ mod tests { .into() ); assert_eq!(log[log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "heroku_logs".into() + ); } #[test] diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index d3460085490c6..faa2e51f3003f 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -959,11 +959,13 @@ mod tests { )) .is_some()); - let source_type_key_value = log.get((lookup::PathPrefix::Event, log_schema().source_type_key().unwrap())).unwrap(); - assert_eq!( - source_type_key_value.to_string(), - SimpleHttpConfig::NAME - ); + let source_type_key_value = log + .get(( + lookup::PathPrefix::Event, + log_schema().source_type_key().unwrap(), + )) + .unwrap(); + assert_eq!(source_type_key_value.to_string(), SimpleHttpConfig::NAME); assert_eq!(log["http_path"], "/".into()); } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index ede1fe78fc60d..fb64dd8a033f6 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -604,7 +604,10 @@ impl ReceivedMessage { } LogNamespace::Legacy => { if let Some(source_type_key) = log_schema().source_type_key() { - log.insert((PathPrefix::Event, source_type_key), KafkaSourceConfig::NAME); + log.insert( + (PathPrefix::Event, source_type_key), + KafkaSourceConfig::NAME, + ); } } } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index be8506c9b1b81..1b7a228023f69 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1567,7 +1567,10 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!(event[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into()); + assert_eq!( + event[log_schema().source_type_key().unwrap().to_string()], + "splunk_hec".into() + ); assert!(event.metadata().splunk_hec_token().is_none()); } diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 2d88d5a26560e..69b12a7ee3368 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -811,10 +811,10 @@ mod test { .single() .expect("invalid timestamp"), ); - expected.insert(( - PathPrefix::Event, - log_schema().source_type_key().unwrap(), - ), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); @@ -866,10 +866,10 @@ mod test { ); expected.insert(log_schema().host_key(), "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); - expected.insert(( - PathPrefix::Event, - log_schema().source_type_key().unwrap(), - ), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("severity", "notice"); expected.insert("facility", "user"); expected.insert("version", 1); @@ -1009,10 +1009,10 @@ mod test { expected_date, ); expected.insert(log_schema().host_key(), "74794bfb6795"); - expected.insert(( - PathPrefix::Event, - log_schema().source_type_key().unwrap(), - ), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "notice"); expected.insert("facility", "user"); @@ -1057,10 +1057,10 @@ mod test { ), expected_date, ); - expected.insert(( - PathPrefix::Event, - log_schema().source_type_key().unwrap(), - ), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "info"); @@ -1097,10 +1097,10 @@ mod test { .and_then(|t| t.with_nanosecond(605_850 * 1000)) .expect("invalid timestamp"), ); - expected.insert(( - PathPrefix::Event, - log_schema().source_type_key().unwrap(), - ), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "info"); diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 16c9944725822..e2a82ab74ce0a 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -603,8 +603,8 @@ mod test { time::{Duration, Instant}, }; use tokio_util::codec::{length_delimited, Framed}; - use vrl::path::PathPrefix; use vector_core::config::{LegacyKey, LogNamespace}; + use vrl::path::PathPrefix; use super::{ build_framestream_unix_source, spawn_event_handling_tasks, ControlField, ControlHeader, @@ -666,7 +666,10 @@ mod test { fn handle_event(&self, received_from: Option, frame: Bytes) -> Option { let mut log_event = LogEvent::from(frame); - log_event.insert((PathPrefix::Event, log_schema().source_type_key().unwrap()), "framestream"); + log_event.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "framestream", + ); if let Some(host) = received_from { self.log_namespace.insert_source_metadata( "framestream", diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index 9a141a2e1851e..4c4a746f5d886 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -273,9 +273,9 @@ mod test { #[cfg(feature = "sinks-vector")] #[cfg(test)] mod tests { - use vrl::path::PathPrefix; use vector_common::assert_event_data_eq; use vector_core::config::log_schema; + use vrl::path::PathPrefix; use super::*; use crate::{ From 4419cdb878c62390f64c400d68f05afe91dc19ff Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Jul 2023 10:26:24 -0400 Subject: [PATCH 04/10] update usages in http_client/integration_tests.rs --- src/sources/http_client/integration_tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index 8b3f8c76d3ee4..0e791d57355ff 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -136,7 +136,7 @@ async fn collected_metrics_native_json() { metric .tags() .unwrap() - .get(log_schema().source_type_key()) + .get(log_schema().source_type_key().unwrap().to_string().as_str()) .map(AsRef::as_ref), Some(HttpClientConfig::NAME) ); @@ -161,7 +161,7 @@ async fn collected_trace_native_json() { let trace = events[0].as_trace(); assert_eq!( - trace.as_map()[log_schema().source_type_key()], + trace.as_map()[log_schema().source_type_key().unwrap().to_string()], HttpClientConfig::NAME.into() ); } From b621e4d5179074a67fe641c75bb65388136ecfbb Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Jul 2023 10:58:08 -0400 Subject: [PATCH 05/10] make happy --- src/sources/http_client/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index 0e791d57355ff..4bd5552ccaf67 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -161,7 +161,7 @@ async fn collected_trace_native_json() { let trace = events[0].as_trace(); assert_eq!( - trace.as_map()[log_schema().source_type_key().unwrap().to_string()], + trace.as_map()[log_schema().source_type_key().unwrap().to_string().as_str()], HttpClientConfig::NAME.into() ); } From 3569542ac23e3763f905e74a545b52b3291b5800 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Jul 2023 11:39:18 -0400 Subject: [PATCH 06/10] fix http_server tests --- src/sources/http_server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index faa2e51f3003f..1ad581b0d7136 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -964,8 +964,8 @@ mod tests { lookup::PathPrefix::Event, log_schema().source_type_key().unwrap(), )) - .unwrap(); - assert_eq!(source_type_key_value.to_string(), SimpleHttpConfig::NAME); + .unwrap().as_str().unwrap(); + assert_eq!(source_type_key_value, SimpleHttpConfig::NAME); assert_eq!(log["http_path"], "/".into()); } From 07307a4d74c78f02f6d433cf696085864da7d578 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Jul 2023 13:10:52 -0400 Subject: [PATCH 07/10] fix typo in 'sinks/influxdb/logs.rs' --- src/sinks/influxdb/logs.rs | 7 ++----- src/sources/http_server.rs | 4 +++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 5e8f5b33dc6e8..91668ca41ec91 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -280,12 +280,9 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { } if let Some(source_type_path) = log.source_type_path() { - self.tags.replace(source_type_path); - } - - if let Some(source_type_key) = log_schema().source_type_key() { + self.tags.replace(source_type_path.clone()); log.rename_key( - (PathPrefix::Event, source_type_key), + (PathPrefix::Event, source_type_path.as_str()), (PathPrefix::Event, &self.source_type_key), ); } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 1ad581b0d7136..cadd154fc8795 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -964,7 +964,9 @@ mod tests { lookup::PathPrefix::Event, log_schema().source_type_key().unwrap(), )) - .unwrap().as_str().unwrap(); + .unwrap() + .as_str() + .unwrap(); assert_eq!(source_type_key_value, SimpleHttpConfig::NAME); assert_eq!(log["http_path"], "/".into()); } From d596ae2283fcbb320ab866bac8e52eca2a481236 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 13 Jul 2023 09:01:17 -0400 Subject: [PATCH 08/10] use string --- src/sources/datadog_agent/mod.rs | 4 ++-- src/sources/datadog_agent/traces.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index ba2f8c8bc7db9..18536acf3adac 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -284,7 +284,7 @@ pub struct ApiKeyQueryParams { pub(crate) struct DatadogAgentSource { pub(crate) api_key_extractor: ApiKeyExtractor, pub(crate) log_schema_host_key: &'static str, - pub(crate) log_schema_source_type_key: &'static str, + pub(crate) log_schema_source_type_key: String, pub(crate) log_namespace: LogNamespace, pub(crate) decoder: Decoder, protocol: &'static str, @@ -336,7 +336,7 @@ impl DatadogAgentSource { log_schema_host_key: log_schema().host_key(), log_schema_source_type_key: log_schema() .source_type_key() - .map_or("", |key| Box::leak(key.to_string().into_boxed_str())), + .map_or("".to_string(), |key| key.to_string()), decoder, protocol, logs_schema_definition: Arc::new(logs_schema_definition), diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index 3fb55de8508e6..889da554ba9ca 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -142,7 +142,7 @@ fn handle_dd_trace_payload_v1( .set_datadog_api_key(Arc::clone(k)); } trace_event.insert( - source.log_schema_source_type_key, + source.log_schema_source_type_key.as_str(), Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v2".to_string()); @@ -255,7 +255,7 @@ fn handle_dd_trace_payload_v0( trace_event.insert("language_name", lang.clone()); } trace_event.insert( - source.log_schema_source_type_key, + source.log_schema_source_type_key.as_str(), Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v1".to_string()); From 9d4aa7d6ef7c6144a773c873ab0c6788b71dd15f Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 13 Jul 2023 09:31:17 -0400 Subject: [PATCH 09/10] potential fix for influxdb2_logs_put_data --- src/sinks/influxdb/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 91668ca41ec91..ca12946f00bad 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -282,7 +282,7 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { if let Some(source_type_path) = log.source_type_path() { self.tags.replace(source_type_path.clone()); log.rename_key( - (PathPrefix::Event, source_type_path.as_str()), + source_type_path.as_str(), (PathPrefix::Event, &self.source_type_key), ); } From 74278a4da85c967a2a5cfd177433ccb8884af466 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 13 Jul 2023 12:19:36 -0400 Subject: [PATCH 10/10] remove another PathPrefix --- src/sources/util/framestream.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index e2a82ab74ce0a..87708f3866d32 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -604,7 +604,6 @@ mod test { }; use tokio_util::codec::{length_delimited, Framed}; use vector_core::config::{LegacyKey, LogNamespace}; - use vrl::path::PathPrefix; use super::{ build_framestream_unix_source, spawn_event_handling_tasks, ControlField, ControlHeader, @@ -667,7 +666,7 @@ mod test { let mut log_event = LogEvent::from(frame); log_event.insert( - (PathPrefix::Event, log_schema().source_type_key().unwrap()), + log_schema().source_type_key().unwrap().to_string().as_str(), "framestream", ); if let Some(host) = received_from {