Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new encoding): add pretty json encoding #20384

Merged
merged 9 commits into from
Jun 10, 2024
3 changes: 3 additions & 0 deletions changelog.d/20384_pretty_json_config.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add `pretty_json` encoding codec to output a prettified json format.

authors: lsampras
2 changes: 1 addition & 1 deletion config/vector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sinks:
type: "console"
inputs: ["parse_logs"]
encoding:
codec: "json"
codec: "pretty_json"

# Vector's GraphQL API (disabled by default)
# Uncomment to try it out with the `vector top` command or
Expand Down
2 changes: 2 additions & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod json;
mod logfmt;
mod native;
mod native_json;
mod pretty_json;
mod protobuf;
mod raw_message;
mod text;
Expand All @@ -25,6 +26,7 @@ pub use json::{JsonSerializer, JsonSerializerConfig};
pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig};
pub use native::{NativeSerializer, NativeSerializerConfig};
pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig};
pub use pretty_json::{PrettyJsonSerializer, PrettyJsonSerializerConfig};
pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
pub use text::{TextSerializer, TextSerializerConfig};
Expand Down
239 changes: 239 additions & 0 deletions lib/codecs/src/encoding/format/pretty_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
use bytes::{BufMut, BytesMut};
use tokio_util::codec::Encoder;
use vector_core::{config::DataType, event::Event, schema};

use crate::MetricTagValues;

/// Config used to build a `PrettyJsonSerializer`.
#[crate::configurable_component]
#[derive(Debug, Clone, Default)]
pub struct PrettyJsonSerializerConfig {
/// Controls how metric tag values are encoded.
///
/// When set to `single`, only the last non-bare value of tags are displayed with the
/// metric. When set to `full`, all metric tags are exposed as separate assignments.
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub metric_tag_values: MetricTagValues,
}

impl PrettyJsonSerializerConfig {
/// Creates a new `PrettyJsonSerializerConfig`.
pub const fn new(metric_tag_values: MetricTagValues) -> Self {
Self { metric_tag_values }
}

/// Build the `PrettyJsonSerializer` from this configuration.
pub const fn build(&self) -> PrettyJsonSerializer {
PrettyJsonSerializer::new(self.metric_tag_values)
}

/// The data type of events that are accepted by `PrettyJsonSerializer`.
pub fn input_type(&self) -> DataType {
DataType::all()
}

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
// While technically we support `Value` variants that can't be losslessly serialized to
// PrettyJson, we don't want to enforce that limitation to users yet.
schema::Requirement::empty()
}
}

/// Serializer that converts an `Event` to bytes using the PrettyJson format.
#[derive(Debug, Clone)]
pub struct PrettyJsonSerializer {
metric_tag_values: MetricTagValues,
}

impl PrettyJsonSerializer {
/// Creates a new `PrettyJsonSerializer`.
pub const fn new(metric_tag_values: MetricTagValues) -> Self {
Self { metric_tag_values }
}

/// Encode event and represent it as PrettyJson value.
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
match event {
Event::Log(log) => serde_json::to_value(&log),
Event::Metric(metric) => serde_json::to_value(&metric),
Event::Trace(trace) => serde_json::to_value(&trace),
}
.map_err(|e| e.to_string().into())
}
}

impl Encoder<Event> for PrettyJsonSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let writer = buffer.writer();
match event {
Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer_pretty(writer, &metric)
}
Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
}
.map_err(Into::into)
}
}

#[cfg(test)]
mod tests {
use bytes::{Bytes, BytesMut};
use chrono::{TimeZone, Timelike, Utc};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
use vector_core::metric_tags;
use vrl::btreemap;

use super::*;

#[test]
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(btreemap! {
"x" => Value::from("23"),
"z" => Value::from(25),
"a" => Value::from("0"),
}));
let bytes = serialize(PrettyJsonSerializerConfig::default(), event);

assert_eq!(bytes, r#"{"a":"0","x":"23","z":25}"#);
}

#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(metric_tags!(
"key2" => "value2",
"key1" => "value1",
"Key3" => "Value3",
)))
.with_timestamp(Some(
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp"),
)),
);

let bytes = serialize(PrettyJsonSerializerConfig::default(), event);

assert_eq!(
bytes,
r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
);
}

#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));

let bytes = serialize(PrettyJsonSerializerConfig::default(), event);

assert_eq!(
bytes,
r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
);
}

#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));

let bytes = serialize(PrettyJsonSerializerConfig::default(), event);

assert_eq!(
bytes,
r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
);
}

#[test]
fn serialize_equals_to_json_value() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
}));
let mut serializer = PrettyJsonSerializerConfig::default().build();
let mut bytes = BytesMut::new();

serializer.encode(event.clone(), &mut bytes).unwrap();

let json = serializer.to_json_value(event).unwrap();

assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
}

#[test]
fn serialize_metric_tags_full() {
let bytes = serialize(
PrettyJsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
},
metric2(),
);

assert_eq!(
bytes,
r#"{"name":"counter","tags":{"a":["first",null,"second"]},"kind":"incremental","counter":{"value":1.0}}"#
);
}

#[test]
fn serialize_metric_tags_single() {
let bytes = serialize(
PrettyJsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
},
metric2(),
);

assert_eq!(
bytes,
r#"{"name":"counter","tags":{"a":"second"},"kind":"incremental","counter":{"value":1.0}}"#
);
}

fn metric2() -> Event {
Event::Metric(
Metric::new(
"counter",
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(metric_tags! (
"a" => "first",
"a" => None,
"a" => "second",
))),
)
}

fn serialize(config: PrettyJsonSerializerConfig, input: Event) -> Bytes {
let mut buffer = BytesMut::new();
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}
}
20 changes: 19 additions & 1 deletion lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub use framing::{
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};

use self::format::{PrettyJsonSerializer, PrettyJsonSerializerConfig};

/// An error that occurred while building an encoder.
pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -213,6 +215,11 @@ pub enum SerializerConfig {
/// [json]: https://www.json.org/
Json(JsonSerializerConfig),

/// Encodes an event as [JSON][json] in pretty format.
///
/// [json]: https://www.json.org/
PrettyJson(PrettyJsonSerializerConfig),

/// Encodes an event as a [logfmt][logfmt] message.
///
/// [logfmt]: https://brandur.org/logfmt
Expand Down Expand Up @@ -329,6 +336,7 @@ impl SerializerConfig {
SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
SerializerConfig::Gelf => Ok(Serializer::Gelf(GelfSerializerConfig::new().build())),
SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
SerializerConfig::PrettyJson(config) => Ok(Serializer::PrettyJson(config.build())),
SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
SerializerConfig::NativeJson => {
Expand Down Expand Up @@ -364,6 +372,7 @@ impl SerializerConfig {
SerializerConfig::Csv(_)
| SerializerConfig::Gelf
| SerializerConfig::Json(_)
| SerializerConfig::PrettyJson(_)
| SerializerConfig::Logfmt
| SerializerConfig::NativeJson
| SerializerConfig::RawMessage
Expand All @@ -380,6 +389,7 @@ impl SerializerConfig {
SerializerConfig::Csv(config) => config.input_type(),
SerializerConfig::Gelf { .. } => GelfSerializerConfig::input_type(),
SerializerConfig::Json(config) => config.input_type(),
SerializerConfig::PrettyJson(config) => config.input_type(),
SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
SerializerConfig::Native => NativeSerializerConfig.input_type(),
SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
Expand All @@ -398,6 +408,7 @@ impl SerializerConfig {
SerializerConfig::Csv(config) => config.schema_requirement(),
SerializerConfig::Gelf { .. } => GelfSerializerConfig::schema_requirement(),
SerializerConfig::Json(config) => config.schema_requirement(),
SerializerConfig::PrettyJson(config) => config.schema_requirement(),
SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
Expand All @@ -419,6 +430,8 @@ pub enum Serializer {
Gelf(GelfSerializer),
/// Uses a `JsonSerializer` for serialization.
Json(JsonSerializer),
/// Uses a `JsonSerializer` for serialization.
PrettyJson(PrettyJsonSerializer),
/// Uses a `LogfmtSerializer` for serialization.
Logfmt(LogfmtSerializer),
/// Uses a `NativeSerializer` for serialization.
Expand All @@ -437,7 +450,10 @@ impl Serializer {
/// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
pub fn supports_json(&self) -> bool {
match self {
Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
Serializer::Json(_)
| Self::PrettyJson(_)
| Serializer::NativeJson(_)
| Serializer::Gelf(_) => true,
Serializer::Avro(_)
| Serializer::Csv(_)
| Serializer::Logfmt(_)
Expand All @@ -458,6 +474,7 @@ impl Serializer {
match self {
Serializer::Gelf(serializer) => serializer.to_json_value(event),
Serializer::Json(serializer) => serializer.to_json_value(event),
Serializer::PrettyJson(serializer) => serializer.to_json_value(event),
Serializer::NativeJson(serializer) => serializer.to_json_value(event),
Serializer::Avro(_)
| Serializer::Csv(_)
Expand Down Expand Up @@ -541,6 +558,7 @@ impl tokio_util::codec::Encoder<Event> for Serializer {
Serializer::Csv(serializer) => serializer.encode(event, buffer),
Serializer::Gelf(serializer) => serializer.encode(event, buffer),
Serializer::Json(serializer) => serializer.encode(event, buffer),
Serializer::PrettyJson(serializer) => serializer.encode(event, buffer),
Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
Serializer::Native(serializer) => serializer.encode(event, buffer),
Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/encoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl EncodingConfigWithFraming {

let framer = match (framer, &serializer) {
(Some(framer), _) => framer,
(None, Serializer::Json(_)) => match sink_type {
(None, Serializer::Json(_) | Serializer::PrettyJson(_)) => match sink_type {
SinkType::StreamBased => NewlineDelimitedEncoder::new().into(),
SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
},
Expand Down
13 changes: 9 additions & 4 deletions src/codecs/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ impl Encoder<Framer> {
/// Get the HTTP content type.
pub const fn content_type(&self) -> &'static str {
match (&self.serializer, &self.framer) {
(Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
"application/x-ndjson"
}
(
Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
Serializer::Json(_) | Serializer::PrettyJson(_) | Serializer::NativeJson(_),
Framer::NewlineDelimited(_),
) => "application/x-ndjson",
(
Serializer::Gelf(_)
| Serializer::Json(_)
| Serializer::PrettyJson(_)
| Serializer::NativeJson(_),
Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
) => "application/json",
(Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
Expand All @@ -121,6 +125,7 @@ impl Encoder<Framer> {
| Serializer::Json(_)
| Serializer::Logfmt(_)
| Serializer::NativeJson(_)
| Serializer::PrettyJson(_)
| Serializer::RawMessage(_)
| Serializer::Text(_),
_,
Expand Down