Skip to content

Commit

Permalink
Implement JsonSerializer
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Sichert <mail@pablosichert.com>
  • Loading branch information
pablosichert committed Jan 12, 2022
1 parent 9f48092 commit 597a2d2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
72 changes: 68 additions & 4 deletions src/codecs/format/json.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::convert::TryInto;

use bytes::Bytes;
use bytes::{BufMut, Bytes};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use tokio_util::codec::Encoder;

use crate::{
codecs::decoding::{BoxedDeserializer, Deserializer, DeserializerConfig},
codecs::{
decoding::{BoxedDeserializer, Deserializer, DeserializerConfig},
encoding::{BoxedSerializer, SerializerConfig},
},
config::log_schema,
event::Event,
};
Expand Down Expand Up @@ -80,13 +84,60 @@ impl From<&JsonDeserializerConfig> for JsonDeserializer {
}
}

/// Config used to build a `JsonSerializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct JsonSerializerConfig;

impl JsonSerializerConfig {
/// Creates a new `JsonSerializerConfig`.
pub const fn new() -> Self {
Self
}
}

#[typetag::serde(name = "json")]
impl SerializerConfig for JsonSerializerConfig {
fn build(&self) -> crate::Result<BoxedSerializer> {
Ok(Box::new(JsonSerializer))
}
}

/// Serializer that converts an `Event` to bytes using the JSON format.
#[derive(Debug, Clone)]
pub struct JsonSerializer;

impl JsonSerializer {
/// Creates a new `JsonSerializer`.
pub const fn new() -> Self {
Self
}
}

impl Encoder<Event> for JsonSerializer {
type Error = crate::Error;

fn encode(&mut self, event: Event, buffer: &mut bytes::BytesMut) -> Result<(), Self::Error> {
let json = match event {
Event::Log(log) => serde_json::to_vec(&log),
Event::Metric(metric) => serde_json::to_vec(&metric),
}?;

buffer.put(json.as_slice());

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::config::log_schema;
use crate::event::Value;
use bytes::BytesMut;
use shared::btreemap;

#[test]
fn parse_json() {
fn deserialize_json() {
let input = Bytes::from(r#"{ "foo": 123 }"#);
let deserializer = JsonDeserializer::new();

Expand All @@ -104,7 +155,7 @@ mod tests {
}

#[test]
fn parse_json_array() {
fn deserialize_json_array() {
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
let deserializer = JsonDeserializer::new();

Expand All @@ -128,6 +179,19 @@ mod tests {
assert_eq!(events.next(), None);
}

#[test]
fn serialize_json() {
let event = Event::from(btreemap! {
"foo" => Value::from("bar")
});
let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), r#"{"foo":"bar"}"#);
}

#[test]
fn skip_empty() {
let input = Bytes::from("");
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ mod syslog;
pub use self::bytes::{BytesDeserializer, BytesDeserializerConfig};
#[cfg(feature = "sources-syslog")]
pub use self::syslog::{SyslogDeserializer, SyslogDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonSerializer, JsonSerializerConfig};
pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig};
2 changes: 1 addition & 1 deletion src/codecs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod ready_frames;
pub use decoding::Decoder;
pub use format::{
BytesDeserializer, BytesDeserializerConfig, JsonDeserializer, JsonDeserializerConfig,
RawMessageSerializer, RawMessageSerializerConfig,
JsonSerializer, JsonSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig,
};
#[cfg(feature = "sources-syslog")]
pub use format::{SyslogDeserializer, SyslogDeserializerConfig};
Expand Down

0 comments on commit 597a2d2

Please sign in to comment.