Skip to content

Commit

Permalink
Implement StandardEncodingsMigrator
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Sichert <mail@pablosichert.com>
  • Loading branch information
pablosichert committed Apr 11, 2022
1 parent c0862c5 commit 04f78f2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
24 changes: 24 additions & 0 deletions src/sinks/util/encoding/codec.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::io;

#[cfg(feature = "codecs")]
use codecs::{encoding::SerializerConfig, JsonSerializerConfig, RawMessageSerializerConfig};
use serde::{Deserialize, Serialize};
use vector_core::config::log_schema;
use vector_core::event::{Event, LogEvent, TraceEvent};

use super::Encoder;
#[cfg(feature = "codecs")]
use super::EncodingConfigMigrator;

static DEFAULT_TEXT_ENCODER: StandardTextEncoding = StandardTextEncoding;
static DEFAULT_JSON_ENCODER: StandardJsonEncoding = StandardJsonEncoding;
Expand Down Expand Up @@ -154,6 +158,26 @@ impl Encoder<Vec<Event>> for StandardEncodings {
}
}

#[cfg(feature = "codecs")]
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Migrate the legacy `StandardEncodings` to the new `SerializerConfig` based
/// encoding system.
pub struct StandardEncodingsMigrator;

#[cfg(feature = "codecs")]
impl EncodingConfigMigrator for StandardEncodingsMigrator {
type Codec = StandardEncodings;

fn migrate(codec: &Self::Codec) -> SerializerConfig {
match codec {
StandardEncodings::Text => RawMessageSerializerConfig::new().into(),
StandardEncodings::Json | StandardEncodings::Ndjson => {
JsonSerializerConfig::new().into()
}
}
}
}

/// Standard implementation for encoding events as JSON.
///
/// All event types will be serialized to JSON, without pretty printing. Uses
Expand Down
32 changes: 25 additions & 7 deletions src/sinks/util/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ mod config;
mod fixed;
mod with_default;

use std::{fmt::Debug, io, sync::Arc};
use std::{fmt::Debug, io};

#[cfg(feature = "codecs")]
use bytes::BytesMut;
use serde::{Deserialize, Serialize};
#[cfg(feature = "codecs")]
use tokio_util::codec::Encoder as _;

use crate::{
event::{Event, LogEvent, MaybeAsLogMut, Value},
Expand All @@ -78,6 +82,8 @@ pub use adapter::{
EncodingConfigAdapter, EncodingConfigMigrator, EncodingConfigWithFramingAdapter,
EncodingConfigWithFramingMigrator, Transformer,
};
#[cfg(feature = "codecs")]
pub use codec::StandardEncodingsMigrator;
pub use codec::{as_tracked_write, StandardEncodings, StandardJsonEncoding, StandardTextEncoding};
pub use config::EncodingConfig;
pub use fixed::EncodingConfigFixed;
Expand All @@ -104,12 +110,24 @@ pub trait Encoder<T> {
}
}

impl<E, T> Encoder<T> for Arc<E>
where
E: Encoder<T>,
{
fn encode_input(&self, input: T, writer: &mut dyn io::Write) -> io::Result<usize> {
(**self).encode_input(input, writer)
#[cfg(feature = "codecs")]
impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<()>) {
fn encode_input(&self, events: Vec<Event>, writer: &mut dyn io::Write) -> io::Result<usize> {
let mut encoder = self.1.clone();

Ok(events
.into_iter()
.map(|mut event| {
self.0.transform(&mut event);
let mut bytes = BytesMut::new();
encoder
.encode(event, &mut bytes)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
writer.write(&bytes)
})
.collect::<io::Result<Vec<_>>>()?
.into_iter()
.sum())
}
}

Expand Down

0 comments on commit 04f78f2

Please sign in to comment.