Skip to content

Commit

Permalink
feat(new encoding): add pretty json encoding (#20384)
Browse files Browse the repository at this point in the history
* feat(new encoding): add pretty json encoding

* chore(lint): fix cargo fmt lints

* feat(new codec): update default console logger to use pretty json

* docs(new codec): update docs for pretty json

* chore(changelog): add changelog.d entry

* fix(tests): update tests for pretty_json codec

* refactor(pretty_json): adds a field to existing json codec instead of a new pretty_json codec

* refactor(codecs): rename pretty field name for json codec
  • Loading branch information
lsampras committed May 8, 2024
1 parent e1d1e85 commit f74d488
Show file tree
Hide file tree
Showing 37 changed files with 566 additions and 21 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20384_pretty_json_config.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add `pretty` option to json coded to output a prettified json format.

authors: lsampras
2 changes: 2 additions & 0 deletions config/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ sinks:
inputs: ["parse_logs"]
encoding:
codec: "json"
json:
pretty: true

# Vector's GraphQL API (disabled by default)
# Uncomment to try it out with the `vector top` command or
Expand Down
259 changes: 246 additions & 13 deletions lib/codecs/src/encoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,33 @@ pub struct JsonSerializerConfig {
/// metric. When set to `full`, all metric tags are exposed as separate assignments.
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub metric_tag_values: MetricTagValues,

/// Options for the JsonSerializer.
#[serde(default, rename = "json")]
pub options: JsonSerializerOptions,
}

/// Options for the JsonSerializer.
#[crate::configurable_component]
#[derive(Debug, Clone, Default)]
pub struct JsonSerializerOptions {
/// Whether to use pretty JSON formatting.
#[serde(default)]
pub pretty: bool,
}

impl JsonSerializerConfig {
/// Creates a new `JsonSerializerConfig`.
pub const fn new(metric_tag_values: MetricTagValues) -> Self {
Self { metric_tag_values }
pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
Self {
metric_tag_values,
options,
}
}

/// Build the `JsonSerializer` from this configuration.
pub const fn build(&self) -> JsonSerializer {
JsonSerializer::new(self.metric_tag_values)
pub fn build(&self) -> JsonSerializer {
JsonSerializer::new(self.metric_tag_values, self.options.clone())
}

/// The data type of events that are accepted by `JsonSerializer`.
Expand All @@ -44,12 +60,16 @@ impl JsonSerializerConfig {
#[derive(Debug, Clone)]
pub struct JsonSerializer {
metric_tag_values: MetricTagValues,
options: JsonSerializerOptions,
}

impl JsonSerializer {
/// Creates a new `JsonSerializer`.
pub const fn new(metric_tag_values: MetricTagValues) -> Self {
Self { metric_tag_values }
pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
Self {
metric_tag_values,
options,
}
}

/// Encode event and represent it as JSON value.
Expand All @@ -68,15 +88,28 @@ impl Encoder<Event> for JsonSerializer {

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let writer = buffer.writer();
match event {
Event::Log(log) => serde_json::to_writer(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
if self.options.pretty {
match event {
Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer_pretty(writer, &metric)
}
serde_json::to_writer(writer, &metric)
Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
}
} else {
match event {
Event::Log(log) => serde_json::to_writer(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer(writer, &metric)
}
Event::Trace(trace) => serde_json::to_writer(writer, &trace),
}
Event::Trace(trace) => serde_json::to_writer(writer, &trace),
}
.map_err(Into::into)
}
Expand Down Expand Up @@ -191,6 +224,7 @@ mod tests {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
options: JsonSerializerOptions::default(),
},
metric2(),
);
Expand All @@ -206,6 +240,7 @@ mod tests {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions::default(),
},
metric2(),
);
Expand Down Expand Up @@ -236,4 +271,202 @@ mod tests {
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}

mod pretty_json {

use super::*;
use bytes::{Bytes, BytesMut};
use chrono::{TimeZone, Timelike, Utc};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
use vector_core::metric_tags;
use vrl::btreemap;

fn get_pretty_json_config() -> JsonSerializerConfig {
JsonSerializerConfig {
options: JsonSerializerOptions { pretty: true },
..Default::default()
}
}

#[test]
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(
btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"a": "0",
"x": "23",
"z": 25
}"#
);
}
#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(
metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
))
.with_timestamp(Some(
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp"),
)),
);
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "foos",
"namespace": "vector",
"tags": {
"Key3": "Value3",
"key1": "value1",
"key2": "value2"
},
"timestamp": "2018-11-14T08:09:10.000000011Z",
"kind": "incremental",
"counter": {
"value": 100.0
}
}"#
);
}
#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "users",
"kind": "incremental",
"set": {
"values": [
"bob"
]
}
}"#
);
}
#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "glork",
"kind": "incremental",
"distribution": {
"samples": [
{
"value": 10.0,
"rate": 1
}
],
"statistic": "histogram"
}
}"#
);
}
#[test]
fn serialize_equals_to_json_value() {
let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
let mut serializer = get_pretty_json_config().build();
let mut bytes = BytesMut::new();
serializer.encode(event.clone(), &mut bytes).unwrap();
let json = serializer.to_json_value(event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
}
#[test]
fn serialize_metric_tags_full() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
options: JsonSerializerOptions { pretty: true },
},
metric2(),
);
assert_eq!(
bytes,
r#"{
"name": "counter",
"tags": {
"a": [
"first",
null,
"second"
]
},
"kind": "incremental",
"counter": {
"value": 1.0
}
}"#
);
}
#[test]
fn serialize_metric_tags_single() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions { pretty: true },
},
metric2(),
);
assert_eq!(
bytes,
r#"{
"name": "counter",
"tags": {
"a": "second"
},
"kind": "incremental",
"counter": {
"value": 1.0
}
}"#
);
}
fn metric2() -> Event {
Event::Metric(
Metric::new(
"counter",
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(
metric_tags! ("a" => "first","a" => None,"a" => "second",),
)),
)
}
fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
let mut buffer = BytesMut::new();
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}
}
}
2 changes: 1 addition & 1 deletion lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use self::csv::{CsvSerializer, CsvSerializerConfig};
pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions};
use dyn_clone::DynClone;
pub use gelf::{GelfSerializer, GelfSerializerConfig};
pub use json::{JsonSerializer, JsonSerializerConfig};
pub use json::{JsonSerializer, JsonSerializerConfig, JsonSerializerOptions};
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
Expand Down
4 changes: 4 additions & 0 deletions lib/k8s-e2e-tests/tests/vector-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,8 @@ async fn custom_selectors() -> Result<(), Box<dyn std::error::Error>> {
inputs: [kubernetes_logs]
encoding:
codec: json
json:
pretty: false
"#};

let vector = framework
Expand Down Expand Up @@ -1395,6 +1397,8 @@ async fn glob_pattern_filtering() -> Result<(), Box<dyn std::error::Error>> {
inputs: [kubernetes_logs]
encoding:
codec: json
json:
pretty: false
"#};

let vector = framework
Expand Down
7 changes: 6 additions & 1 deletion src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Deserialize;
use serde_json::Value;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vector_lib::codecs::encoding::format::JsonSerializerOptions;

use crate::codecs::Encoder;
use vector_lib::codecs::{
Expand Down Expand Up @@ -179,7 +180,11 @@ pub fn encode_test_event(
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::new().into(),
JsonSerializer::new(MetricTagValues::default()).into(),
JsonSerializer::new(
MetricTagValues::default(),
JsonSerializerOptions::default(),
)
.into(),
)
};

Expand Down

0 comments on commit f74d488

Please sign in to comment.