Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(codecs): Implement additional Encoder/EncodingConfig without framing #12156

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use hyper::{
use tokio::runtime::Runtime;
use vector::{
config, sinks,
sinks::util::{encoding::EncodingConfigAdapter, BatchConfig, Compression},
sinks::util::{encoding::EncodingConfigWithFramingAdapter, BatchConfig, Compression},
sources,
test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp},
Error,
Expand Down Expand Up @@ -53,7 +53,7 @@ fn benchmark_http(c: &mut Criterion) {
auth: Default::default(),
headers: Default::default(),
batch,
encoding: EncodingConfigAdapter::legacy(
encoding: EncodingConfigWithFramingAdapter::legacy(
sinks::http::Encoding::Text.into(),
),
request: Default::default(),
Expand Down
101 changes: 61 additions & 40 deletions src/codecs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ use crate::{
};
use bytes::BytesMut;
use codecs::{
encoding::{Error, Framer, FramingConfig, Serializer, SerializerConfig},
encoding::{Error, Framer, Serializer},
NewlineDelimitedEncoder, RawMessageSerializer,
};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder as _;

#[derive(Debug, Clone)]
/// An encoder that can encode structured events into byte frames.
pub struct Encoder {
pub struct Encoder<Framer>
where
Framer: Clone,
Comment on lines -14 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why generics instead of just making framer an Option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this allows us to specialize the implementations for impl tokio_util::codec::Encoder<Event> for Encoder<Framer> and impl tokio_util::codec::Encoder<Event> for Encoder<()> at compile time such that they are branch free:

impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
    type Error = Error;

    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        let len = buffer.len();
        let mut payload = buffer.split_off(len);

        self.serialize(event, &mut payload)?;

        // Frame the serialized event.
        self.framer.encode((), &mut payload).map_err(|error| {
            emit!(EncoderFramingFailed { error: &error });
            Error::FramingError(error)
        })?;

        buffer.unsplit(payload);

        Ok(())
    }
}
impl tokio_util::codec::Encoder<Event> for Encoder<()> {
    type Error = Error;

    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
        let len = buffer.len();
        let mut payload = buffer.split_off(len);

        self.serialize(event, &mut payload)?;

        buffer.unsplit(payload);

        Ok(())
    }
}

Meaning that when we know that no framer is used we don't need to check for its known absence, same (but inverse) when using an encoder with a known framer. Since this is a very hot path, the difference can be within the order of 5-10% throughput, as can also be seen in the benchmarks in #10684 (comment).

{
framer: Framer,
serializer: Serializer,
}

impl Default for Encoder {
impl Default for Encoder<Framer> {
fn default() -> Self {
Self {
framer: Framer::NewlineDelimited(NewlineDelimitedEncoder::new()),
Expand All @@ -25,7 +28,29 @@ impl Default for Encoder {
}
}

impl Encoder {
impl Default for Encoder<()> {
fn default() -> Self {
Self {
framer: (),
serializer: Serializer::RawMessage(RawMessageSerializer::new()),
}
}
}

impl<Framer> Encoder<Framer>
where
Framer: Clone,
{
// Serialize the event.
fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
self.serializer.encode(event, buffer).map_err(|error| {
emit!(EncoderSerializeFailed { error: &error });
Error::SerializingError(error)
})
}
}

impl Encoder<Framer> {
/// Creates a new `Encoder` with the specified `Serializer` to produce bytes
/// from a structured event, and the `Framer` to wrap these into a byte
/// frame.
Expand All @@ -44,21 +69,30 @@ impl Encoder {
}
}

impl tokio_util::codec::Encoder<Event> for Encoder {
impl Encoder<()> {
/// Creates a new `Encoder` with the specified `Serializer` to produce bytes
/// from a structured event.
pub const fn new(serializer: Serializer) -> Self {
Self {
framer: (),
serializer,
}
}

/// Get the serializer.
pub const fn serializer(&self) -> &Serializer {
&self.serializer
}
}

impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
type Error = Error;

fn encode(&mut self, item: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let len = buffer.len();

let mut payload = buffer.split_off(len);

// Serialize the event.
self.serializer
.encode(item, &mut payload)
.map_err(|error| {
emit!(EncoderSerializeFailed { error: &error });
Error::SerializingError(error)
})?;
self.serialize(event, &mut payload)?;

// Frame the serialized event.
self.framer.encode((), &mut payload).map_err(|error| {
Expand All @@ -72,31 +106,18 @@ impl tokio_util::codec::Encoder<Event> for Encoder {
}
}

/// Config used to build an `Encoder`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct EncodingConfig {
/// The framing config.
framing: FramingConfig,
/// The encoding config.
encoding: SerializerConfig,
}
impl tokio_util::codec::Encoder<Event> for Encoder<()> {
type Error = Error;

impl EncodingConfig {
/// Creates a new `EncodingConfig` with the provided `FramingConfig` and
/// `SerializerConfig`.
pub const fn new(framing: FramingConfig, encoding: SerializerConfig) -> Self {
Self { framing, encoding }
}
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let len = buffer.len();
let mut payload = buffer.split_off(len);

/// Builds an `Encoder` from the provided configuration.
pub const fn build(self) -> Encoder {
// Build the framer.
let framer = self.framing.build();
self.serialize(event, &mut payload)?;

// Build the serializer.
let serializer = self.encoding.build();
buffer.unsplit(payload);

Encoder::new(framer, serializer)
Ok(())
}
}

Expand Down Expand Up @@ -164,7 +185,7 @@ mod tests {

#[tokio::test]
async fn test_encode_events_sink_empty() {
let encoder = Encoder::new(
let encoder = Encoder::<Framer>::new(
Framer::Boxed(Box::new(ParenEncoder::new())),
Serializer::RawMessage(RawMessageSerializer::new()),
);
Expand All @@ -183,7 +204,7 @@ mod tests {

#[tokio::test]
async fn test_encode_events_sink_non_empty() {
let encoder = Encoder::new(
let encoder = Encoder::<Framer>::new(
Framer::Boxed(Box::new(ParenEncoder::new())),
Serializer::RawMessage(RawMessageSerializer::new()),
);
Expand All @@ -202,7 +223,7 @@ mod tests {

#[tokio::test]
async fn test_encode_events_sink_empty_handle_framing_error() {
let encoder = Encoder::new(
let encoder = Encoder::<Framer>::new(
Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
Serializer::RawMessage(RawMessageSerializer::new()),
);
Expand All @@ -222,7 +243,7 @@ mod tests {

#[tokio::test]
async fn test_encode_events_sink_non_empty_handle_framing_error() {
let encoder = Encoder::new(
let encoder = Encoder::<Framer>::new(
Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
Serializer::RawMessage(RawMessageSerializer::new()),
);
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ mod encoder;
mod ready_frames;

pub use decoder::{Decoder, DecodingConfig};
pub use encoder::{Encoder, EncodingConfig};
pub use encoder::Encoder;
pub use ready_frames::ReadyFrames;
24 changes: 14 additions & 10 deletions src/sinks/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use crate::{
http::{Auth, HttpClient, MaybeAuth},
sinks::util::{
self,
encoding::{EncodingConfig, EncodingConfigAdapter, EncodingConfigMigrator, Transformer},
encoding::{
EncodingConfig, EncodingConfigWithFramingAdapter, EncodingConfigWithFramingMigrator,
Transformer,
},
http::{BatchedHttpSink, HttpEventEncoder, RequestConfig},
BatchConfig, Buffer, Compression, RealtimeSizeBasedDefaultBatchSettings,
TowerRequestConfig, UriSerde,
Expand All @@ -52,7 +55,7 @@ enum BuildError {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Migrator;

impl EncodingConfigMigrator for Migrator {
impl EncodingConfigWithFramingMigrator for Migrator {
type Codec = Encoding;

fn migrate(codec: &Self::Codec) -> (Option<FramingConfig>, SerializerConfig) {
Expand Down Expand Up @@ -80,7 +83,7 @@ pub struct HttpSinkConfig {
#[serde(default)]
pub compression: Compression,
#[serde(flatten)]
pub encoding: EncodingConfigAdapter<EncodingConfig<Encoding>, Migrator>,
pub encoding: EncodingConfigWithFramingAdapter<EncodingConfig<Encoding>, Migrator>,
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
#[serde(default)]
Expand Down Expand Up @@ -144,21 +147,22 @@ struct HttpSink {
pub auth: Option<Auth>,
pub compression: Compression,
pub transformer: Transformer,
pub encoder: Encoder,
pub encoder: Encoder<Framer>,
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
pub request: RequestConfig,
}

#[cfg(test)]
fn default_sink(encoding: Encoding) -> HttpSink {
let encoding =
EncodingConfigAdapter::<EncodingConfig<Encoding>, Migrator>::legacy(encoding.into())
.encoding();
let encoding = EncodingConfigWithFramingAdapter::<EncodingConfig<Encoding>, Migrator>::legacy(
encoding.into(),
)
.encoding();
let framing = encoding
.0
.unwrap_or_else(|| NewlineDelimitedEncoder::new().into());
let serializer = encoding.1;
let encoder = Encoder::new(framing, serializer);
let encoder = Encoder::<Framer>::new(framing, serializer);

HttpSink {
uri: Default::default(),
Expand Down Expand Up @@ -197,7 +201,7 @@ impl SinkConfig for HttpSinkConfig {
.0
.unwrap_or_else(|| NewlineDelimitedEncoder::new().into());
let serializer = encoding.1;
let encoder = Encoder::new(framing, serializer);
let encoder = Encoder::<Framer>::new(framing, serializer);

let sink = HttpSink {
uri: self.uri.with_default_parts(),
Expand Down Expand Up @@ -245,7 +249,7 @@ impl SinkConfig for HttpSinkConfig {
}

pub struct HttpSinkEventEncoder {
encoder: Encoder,
encoder: Encoder<Framer>,
transformer: Transformer,
}

Expand Down
Loading