Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migrate LogSchema source_type_key to new lookup code #17947

Merged
merged 10 commits into from
Jul 14, 2023
2 changes: 1 addition & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl ResourceLog {

log_namespace.insert_vector_metadata(
&mut log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(SOURCE_NAME.as_bytes()),
);
Expand Down
25 changes: 12 additions & 13 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
use lookup::lookup_v2::OptionalValuePath;
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::parse_target_path;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -60,7 +61,7 @@ pub struct LogSchema {
///
/// This field will be set by the Vector source that the event was created in.
#[serde(default = "LogSchema::default_source_type_key")]
source_type_key: String,
source_type_key: OptionalValuePath,

/// The name of the event field to set the event metadata in.
///
Expand Down Expand Up @@ -88,17 +89,15 @@ impl LogSchema {
}

fn default_timestamp_key() -> OptionalValuePath {
OptionalValuePath {
path: Some(owned_value_path!("timestamp")),
}
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
}

fn default_source_type_key() -> String {
String::from("source_type")
fn default_source_type_key() -> OptionalValuePath {
OptionalValuePath::new("source_type")
}

fn default_metadata_key() -> String {
Expand Down Expand Up @@ -126,8 +125,8 @@ impl LogSchema {
&self.host_key
}

pub fn source_type_key(&self) -> &str {
&self.source_type_key
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.path.as_ref()
}

pub fn metadata_key(&self) -> &str {
Expand All @@ -146,8 +145,8 @@ impl LogSchema {
self.host_key = v;
}

pub fn set_source_type_key(&mut self, v: String) {
self.source_type_key = v;
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalValuePath { path };
}

pub fn set_metadata_key(&mut self, v: String) {
Expand Down Expand Up @@ -191,7 +190,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
} else {
self.set_source_type_key(other.source_type_key().to_string());
self.set_source_type_key(other.source_type_key().cloned());
}
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
&& self.metadata_key() != other.metadata_key()
Expand Down
7 changes: 4 additions & 3 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogNamespace {
) {
self.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
Expand Down Expand Up @@ -551,14 +551,15 @@ mod test {
use chrono::Utc;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use vector_common::btreemap;
use vrl::path::OwnedValuePath;
use vrl::value::Kind;

#[test]
fn test_insert_standard_vector_source_metadata() {
let nested_path = "a.b.c.d";
let nested_path = "a.b.c.d".to_string();

let mut schema = LogSchema::default();
schema.set_source_type_key(nested_path.to_owned());
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
init_log_schema(schema, false);

let namespace = LogNamespace::Legacy;
Expand Down
10 changes: 6 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ impl LogEvent {
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will tackle this in a follow-up PR.

// "Global Log Schema" are updated to the new path lookup code
pub fn source_type_path(&self) -> &'static str {
pub fn source_type_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => "%vector.source_type",
LogNamespace::Legacy => log_schema().source_type_key(),
LogNamespace::Vector => Some("%vector.source_type".to_string()),
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -514,7 +514,9 @@ impl LogEvent {
pub fn get_source_type(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())),
LogNamespace::Legacy => log_schema()
.source_type_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions lib/vector-core/src/schema/definition.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};

use crate::config::{log_schema, LegacyKey, LogNamespace};
use lookup::lookup_v2::{parse_value_path, TargetPath};
use lookup::lookup_v2::TargetPath;
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use vrl::value::{kind::Collection, Kind};

Expand Down Expand Up @@ -144,9 +144,7 @@ impl Definition {
#[must_use]
pub fn with_standard_vector_source_metadata(self) -> Self {
self.with_vector_metadata(
parse_value_path(log_schema().source_type_key())
.ok()
.as_ref(),
log_schema().source_type_key(),
&owned_value_path!("source_type"),
Kind::bytes(),
None,
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vector_config::configurable_component;
use vrl::owned_value_path;

use crate::lookup_v2::PathParseError;
use crate::{OwnedTargetPath, OwnedValuePath};
Expand Down Expand Up @@ -56,6 +57,12 @@ impl OptionalValuePath {
pub fn none() -> Self {
Self { path: None }
}

pub fn new(path: &str) -> Self {
Self {
path: Some(owned_value_path!(path)),
}
}
}

impl TryFrom<String> for OptionalValuePath {
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
}

if !log.contains("source_type_name") {
log.rename_key(log.source_type_path(), "source_type_name")
if let Some(source_type_path) = log.source_type_path() {
log.rename_key(source_type_path.as_str(), "source_type_name")
}
}

Some(Event::from(log))
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ impl SinkConfig for InfluxDbLogsConfig {
.source_type_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().source_type_key())
.expect("global log_schema.source_type_key to be valid path")
});
.or(log_schema().source_type_key().cloned())
.unwrap();

let sink = InfluxDbLogsSink {
uri,
Expand Down Expand Up @@ -280,11 +278,15 @@ impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
self.tags.replace(host_path.clone());
log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key));
}
self.tags.replace(log.source_type_path().to_string());
log.rename_key(
log.source_type_path(),
(PathPrefix::Event, &self.source_type_key),
);

if let Some(source_type_path) = log.source_type_path() {
self.tags.replace(source_type_path.clone());
log.rename_key(
source_type_path.as_str(),
(PathPrefix::Event, &self.source_type_key),
);
}

self.tags.replace("metric_type".to_string());
log.insert("metric_type", "logs");

Expand Down
7 changes: 5 additions & 2 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn populate_event(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
);
Expand Down Expand Up @@ -713,7 +713,10 @@ mod integration_test {
trace!("{:?}", log);
assert_eq!(log[log_schema().message_key()], "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(log[log_schema().source_type_key()], "amqp".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"amqp".into()
);
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub(super) async fn firehose(
if let Event::Log(ref mut log) = event {
log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
);
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ fn handle_single_log(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsS3Config::NAME.as_bytes()),
);
Expand Down
6 changes: 4 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub struct ApiKeyQueryParams {
pub(crate) struct DatadogAgentSource {
pub(crate) api_key_extractor: ApiKeyExtractor,
pub(crate) log_schema_host_key: &'static str,
pub(crate) log_schema_source_type_key: &'static str,
pub(crate) log_schema_source_type_key: String,
pub(crate) log_namespace: LogNamespace,
pub(crate) decoder: Decoder,
protocol: &'static str,
Expand Down Expand Up @@ -334,7 +334,9 @@ impl DatadogAgentSource {
.expect("static regex always compiles"),
},
log_schema_host_key: log_schema().host_key(),
log_schema_source_type_key: log_schema().source_type_key(),
log_schema_source_type_key: log_schema()
.source_type_key()
.map_or("".to_string(), |key| key.to_string()),
decoder,
protocol,
logs_schema_definition: Arc::new(logs_schema_definition),
Expand Down
40 changes: 32 additions & 8 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ async fn full_payload_v1() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -300,7 +303,10 @@ async fn full_payload_v2() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -362,7 +368,10 @@ async fn no_api_key() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -423,7 +432,10 @@ async fn api_key_in_url() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -488,7 +500,10 @@ async fn api_key_in_query_params() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -559,7 +574,10 @@ async fn api_key_in_header() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -706,7 +724,10 @@ async fn ignores_api_key() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
event.metadata().schema_definition(),
Expand Down Expand Up @@ -1398,7 +1419,10 @@ async fn split_outputs() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down
4 changes: 2 additions & 2 deletions src/sources/datadog_agent/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn handle_dd_trace_payload_v1(
.set_datadog_api_key(Arc::clone(k));
}
trace_event.insert(
source.log_schema_source_type_key,
source.log_schema_source_type_key.as_str(),
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v2".to_string());
Expand Down Expand Up @@ -255,7 +255,7 @@ fn handle_dd_trace_payload_v0(
trace_event.insert("language_name", lang.clone());
}
trace_event.insert(
source.log_schema_source_type_key,
source.log_schema_source_type_key.as_str(),
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v1".to_string());
Expand Down