Skip to content

Commit

Permalink
feat(core): Migrate LogSchema source_type_key to new lookup code
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Jul 11, 2023
1 parent bc5822c commit 8b69240
Show file tree
Hide file tree
Showing 35 changed files with 167 additions and 130 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ members = [

[workspace.dependencies]
vrl = { version = "0.5.0", features = ["cli", "test", "test_framework", "arbitrary"] }
#vrl = { path = "../vrl", features = ["cli", "test", "test_framework", "arbitrary"] }

[dependencies]
vrl.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl ResourceLog {

log_namespace.insert_vector_metadata(
&mut log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(SOURCE_NAME.as_bytes()),
);
Expand Down
25 changes: 12 additions & 13 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
use lookup::lookup_v2::{OptionalValuePath};
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vrl::path::parse_target_path;
use vector_config::configurable_component;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
Expand Down Expand Up @@ -60,7 +61,7 @@ pub struct LogSchema {
///
/// This field will be set by the Vector source that the event was created in.
#[serde(default = "LogSchema::default_source_type_key")]
source_type_key: String,
source_type_key: OptionalValuePath,

/// The name of the event field to set the event metadata in.
///
Expand Down Expand Up @@ -88,17 +89,15 @@ impl LogSchema {
}

fn default_timestamp_key() -> OptionalValuePath {
OptionalValuePath {
path: Some(owned_value_path!("timestamp")),
}
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
}

fn default_source_type_key() -> String {
String::from("source_type")
fn default_source_type_key() -> OptionalValuePath {
OptionalValuePath::new("source_type")
}

fn default_metadata_key() -> String {
Expand Down Expand Up @@ -126,8 +125,8 @@ impl LogSchema {
&self.host_key
}

pub fn source_type_key(&self) -> &str {
&self.source_type_key
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.path.as_ref()
}

pub fn metadata_key(&self) -> &str {
Expand All @@ -146,8 +145,8 @@ impl LogSchema {
self.host_key = v;
}

pub fn set_source_type_key(&mut self, v: String) {
self.source_type_key = v;
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalValuePath { path };
}

pub fn set_metadata_key(&mut self, v: String) {
Expand Down Expand Up @@ -191,7 +190,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
} else {
self.set_source_type_key(other.source_type_key().to_string());
self.set_source_type_key(other.source_type_key().cloned());
}
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
&& self.metadata_key() != other.metadata_key()
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogNamespace {
) {
self.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
Expand Down
10 changes: 6 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ impl LogEvent {
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn source_type_path(&self) -> &'static str {
pub fn source_type_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => "%vector.source_type",
LogNamespace::Legacy => log_schema().source_type_key(),
LogNamespace::Vector => Some("%vector.source_type".to_string()),
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -514,7 +514,9 @@ impl LogEvent {
pub fn get_source_type(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())),
LogNamespace::Legacy => log_schema()
.source_type_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions lib/vector-core/src/schema/definition.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};

use crate::config::{log_schema, LegacyKey, LogNamespace};
use lookup::lookup_v2::{parse_value_path, TargetPath};
use lookup::lookup_v2::TargetPath;
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use vrl::value::{kind::Collection, Kind};

Expand Down Expand Up @@ -144,9 +144,7 @@ impl Definition {
#[must_use]
pub fn with_standard_vector_source_metadata(self) -> Self {
self.with_vector_metadata(
parse_value_path(log_schema().source_type_key())
.ok()
.as_ref(),
log_schema().source_type_key(),
&owned_value_path!("source_type"),
Kind::bytes(),
None,
Expand Down
5 changes: 5 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use vrl::owned_value_path;
use vector_config::configurable_component;

use crate::lookup_v2::PathParseError;
Expand Down Expand Up @@ -56,6 +57,10 @@ impl OptionalValuePath {
pub fn none() -> Self {
Self { path: None }
}

pub fn new(path: &str) -> Self {
Self { path: Some(owned_value_path!(path)) }
}
}

impl TryFrom<String> for OptionalValuePath {
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
}

if !log.contains("source_type_name") {
log.rename_key(log.source_type_path(), "source_type_name")
if let Some(source_type_path) = log.source_type_path() {
log.rename_key(source_type_path.as_str(), "source_type_name")
}
}

Some(Event::from(log))
Expand Down
22 changes: 13 additions & 9 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,7 @@ impl SinkConfig for InfluxDbLogsConfig {
.source_type_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().source_type_key())
.expect("global log_schema.source_type_key to be valid path")
});
.unwrap_or(log_schema().source_type_key());

Check failure on line 210 in src/sinks/influxdb/logs.rs

View workflow job for this annotation

GitHub Actions / Build - x86_64-unknown-linux-gnu

mismatched types

let sink = InfluxDbLogsSink {
uri,
Expand Down Expand Up @@ -280,11 +277,18 @@ impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
self.tags.replace(host_path.clone());
log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key));
}
self.tags.replace(log.source_type_path().to_string());
log.rename_key(
log.source_type_path(),
(PathPrefix::Event, &self.source_type_key),
);

if let Some(source_type_path) = log.source_type_path() {
self.tags.replace(source_type_path);
}

if let Some(source_type_key) = log_schema().source_type_key() {
log.rename_key(
(PathPrefix::Event, source_type_key),
(PathPrefix::Event, &self.source_type_key),
);
}

self.tags.replace("metric_type".to_string());
log.insert("metric_type", "logs");

Expand Down
4 changes: 2 additions & 2 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn populate_event(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
);
Expand Down Expand Up @@ -713,7 +713,7 @@ mod integration_test {
trace!("{:?}", log);
assert_eq!(log[log_schema().message_key()], "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(log[log_schema().source_type_key()], "amqp".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "amqp".into());
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub(super) async fn firehose(
if let Event::Log(ref mut log) = event {
log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
);
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ fn handle_single_log(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsS3Config::NAME.as_bytes()),
);
Expand Down
3 changes: 2 additions & 1 deletion src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ impl DatadogAgentSource {
.expect("static regex always compiles"),
},
log_schema_host_key: log_schema().host_key(),
log_schema_source_type_key: log_schema().source_type_key(),
log_schema_source_type_key: log_schema().source_type_key()
.map_or("", |key| key.to_string().as_str()),
decoder,
protocol,
logs_schema_definition: Arc::new(logs_schema_definition),
Expand Down
16 changes: 8 additions & 8 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async fn full_payload_v1() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -300,7 +300,7 @@ async fn full_payload_v2() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -362,7 +362,7 @@ async fn no_api_key() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -423,7 +423,7 @@ async fn api_key_in_url() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -488,7 +488,7 @@ async fn api_key_in_query_params() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -559,7 +559,7 @@ async fn api_key_in_header() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -706,7 +706,7 @@ async fn ignores_api_key() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
event.metadata().schema_definition(),
Expand Down Expand Up @@ -1398,7 +1398,7 @@ async fn split_outputs() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down
10 changes: 5 additions & 5 deletions src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub struct DnstapFrameHandler {
socket_send_buffer_size: Option<usize>,
host_key: Option<OwnedValuePath>,
timestamp_key: Option<OwnedValuePath>,
source_type_key: String,
source_type_key: Option<OwnedValuePath>,
bytes_received: Registered<BytesReceived>,
log_namespace: LogNamespace,
}
Expand Down Expand Up @@ -242,7 +242,7 @@ impl DnstapFrameHandler {
socket_send_buffer_size: config.socket_send_buffer_size,
host_key,
timestamp_key: timestamp_key.cloned(),
source_type_key: source_type_key.to_string(),
source_type_key: source_type_key.cloned(),
bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
log_namespace,
}
Expand Down Expand Up @@ -307,7 +307,7 @@ impl FrameHandler for DnstapFrameHandler {

self.log_namespace.insert_vector_metadata(
&mut log_event,
Some(self.source_type_key()),
self.source_type_key(),
path!("source_type"),
DnstapConfig::NAME,
);
Expand Down Expand Up @@ -343,8 +343,8 @@ impl FrameHandler for DnstapFrameHandler {
&self.host_key
}

fn source_type_key(&self) -> &str {
self.source_type_key.as_str()
fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.as_ref()
}

fn timestamp_key(&self) -> Option<&OwnedValuePath> {
Expand Down
8 changes: 3 additions & 5 deletions src/sources/docker_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
use codecs::{BytesDeserializer, BytesDeserializerConfig};
use futures::{Stream, StreamExt};
use lookup::{
lookup_v2::{parse_value_path, OptionalValuePath},
lookup_v2::{OptionalValuePath},
metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix,
};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -338,9 +338,7 @@ impl SourceConfig for DockerLogsConfig {
Some("timestamp"),
)
.with_vector_metadata(
parse_value_path(log_schema().source_type_key())
.ok()
.as_ref(),
log_schema().source_type_key(),
&owned_value_path!("source_type"),
Kind::bytes(),
None,
Expand Down Expand Up @@ -1119,7 +1117,7 @@ impl ContainerLogInfo {

log_namespace.insert_vector_metadata(
&mut log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
);
Expand Down
Loading

0 comments on commit 8b69240

Please sign in to comment.