Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new encoding): add pretty json encoding #20384

Merged
merged 9 commits into from
Jun 10, 2024
3 changes: 3 additions & 0 deletions changelog.d/20384_pretty_json_config.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add `use_pretty_json` option to json coded to output a prettified json format.

authors: lsampras
2 changes: 2 additions & 0 deletions config/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ sinks:
inputs: ["parse_logs"]
encoding:
codec: "json"
json:
use_pretty_json: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use_pretty_json: true
pretty: true

I think we can shorten this up a bit.


# Vector's GraphQL API (disabled by default)
# Uncomment to try it out with the `vector top` command or
Expand Down
265 changes: 252 additions & 13 deletions lib/codecs/src/encoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,33 @@ pub struct JsonSerializerConfig {
/// metric. When set to `full`, all metric tags are exposed as separate assignments.
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub metric_tag_values: MetricTagValues,

/// Options for the JsonSerializer.
#[serde(default, rename = "json")]
pub options: JsonSerializerOptions,
}

/// Options for the JsonSerializer.
#[crate::configurable_component]
#[derive(Debug, Clone, Default)]
pub struct JsonSerializerOptions {
/// Whether to use pretty JSON formatting.
#[serde(default)]
pub use_pretty_json: bool,
}

impl JsonSerializerConfig {
/// Creates a new `JsonSerializerConfig`.
pub const fn new(metric_tag_values: MetricTagValues) -> Self {
Self { metric_tag_values }
pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
Self {
metric_tag_values,
options,
}
}

/// Build the `JsonSerializer` from this configuration.
pub const fn build(&self) -> JsonSerializer {
JsonSerializer::new(self.metric_tag_values)
pub fn build(&self) -> JsonSerializer {
JsonSerializer::new(self.metric_tag_values, self.options.clone())
}

/// The data type of events that are accepted by `JsonSerializer`.
Expand All @@ -44,12 +60,16 @@ impl JsonSerializerConfig {
#[derive(Debug, Clone)]
pub struct JsonSerializer {
metric_tag_values: MetricTagValues,
options: JsonSerializerOptions,
}

impl JsonSerializer {
/// Creates a new `JsonSerializer`.
pub const fn new(metric_tag_values: MetricTagValues) -> Self {
Self { metric_tag_values }
pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
Self {
metric_tag_values,
options,
}
}

/// Encode event and represent it as JSON value.
Expand All @@ -68,15 +88,28 @@ impl Encoder<Event> for JsonSerializer {

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let writer = buffer.writer();
match event {
Event::Log(log) => serde_json::to_writer(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
if self.options.use_pretty_json {
match event {
Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer_pretty(writer, &metric)
}
serde_json::to_writer(writer, &metric)
Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
}
} else {
match event {
Event::Log(log) => serde_json::to_writer(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer(writer, &metric)
}
Event::Trace(trace) => serde_json::to_writer(writer, &trace),
}
Event::Trace(trace) => serde_json::to_writer(writer, &trace),
}
.map_err(Into::into)
}
Expand Down Expand Up @@ -191,6 +224,7 @@ mod tests {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
options: JsonSerializerOptions::default(),
},
metric2(),
);
Expand All @@ -206,6 +240,7 @@ mod tests {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions::default(),
},
metric2(),
);
Expand Down Expand Up @@ -236,4 +271,208 @@ mod tests {
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}

mod pretty_json {

use super::*;
use bytes::{Bytes, BytesMut};
use chrono::{TimeZone, Timelike, Utc};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
use vector_core::metric_tags;
use vrl::btreemap;

fn get_pretty_json_config() -> JsonSerializerConfig {
JsonSerializerConfig {
options: JsonSerializerOptions {
use_pretty_json: true,
},
..Default::default()
}
}

#[test]
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(
btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"a": "0",
"x": "23",
"z": 25
}"#
);
}
#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(
metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
))
.with_timestamp(Some(
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp"),
)),
);
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "foos",
"namespace": "vector",
"tags": {
"Key3": "Value3",
"key1": "value1",
"key2": "value2"
},
"timestamp": "2018-11-14T08:09:10.000000011Z",
"kind": "incremental",
"counter": {
"value": 100.0
}
}"#
);
}
#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "users",
"kind": "incremental",
"set": {
"values": [
"bob"
]
}
}"#
);
}
#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "glork",
"kind": "incremental",
"distribution": {
"samples": [
{
"value": 10.0,
"rate": 1
}
],
"statistic": "histogram"
}
}"#
);
}
#[test]
fn serialize_equals_to_json_value() {
let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
let mut serializer = get_pretty_json_config().build();
let mut bytes = BytesMut::new();
serializer.encode(event.clone(), &mut bytes).unwrap();
let json = serializer.to_json_value(event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
}
#[test]
fn serialize_metric_tags_full() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
options: JsonSerializerOptions {
use_pretty_json: true,
},
},
metric2(),
);
assert_eq!(
bytes,
r#"{
"name": "counter",
"tags": {
"a": [
"first",
null,
"second"
]
},
"kind": "incremental",
"counter": {
"value": 1.0
}
}"#
);
}
#[test]
fn serialize_metric_tags_single() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions {
use_pretty_json: true,
},
},
metric2(),
);
assert_eq!(
bytes,
r#"{
"name": "counter",
"tags": {
"a": "second"
},
"kind": "incremental",
"counter": {
"value": 1.0
}
}"#
);
}
fn metric2() -> Event {
Event::Metric(
Metric::new(
"counter",
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(
metric_tags! ("a" => "first","a" => None,"a" => "second",),
)),
)
}
fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
let mut buffer = BytesMut::new();
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}
}
}
2 changes: 1 addition & 1 deletion lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use self::csv::{CsvSerializer, CsvSerializerConfig};
pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions};
use dyn_clone::DynClone;
pub use gelf::{GelfSerializer, GelfSerializerConfig};
pub use json::{JsonSerializer, JsonSerializerConfig};
pub use json::{JsonSerializer, JsonSerializerConfig, JsonSerializerOptions};
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
Expand Down
4 changes: 4 additions & 0 deletions lib/k8s-e2e-tests/tests/vector-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,8 @@ async fn custom_selectors() -> Result<(), Box<dyn std::error::Error>> {
inputs: [kubernetes_logs]
encoding:
codec: json
json:
use_pretty_json: false
"#};

let vector = framework
Expand Down Expand Up @@ -1395,6 +1397,8 @@ async fn glob_pattern_filtering() -> Result<(), Box<dyn std::error::Error>> {
inputs: [kubernetes_logs]
encoding:
codec: json
json:
use_pretty_json: false
"#};

let vector = framework
Expand Down
7 changes: 6 additions & 1 deletion src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Deserialize;
use serde_json::Value;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vector_lib::codecs::encoding::format::JsonSerializerOptions;

use crate::codecs::Encoder;
use vector_lib::codecs::{
Expand Down Expand Up @@ -179,7 +180,11 @@ pub fn encode_test_event(
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::new().into(),
JsonSerializer::new(MetricTagValues::default()).into(),
JsonSerializer::new(
MetricTagValues::default(),
JsonSerializerOptions::default(),
)
.into(),
)
};

Expand Down
Loading
Loading