Skip to content

Commit

Permalink
chore(core): Add a EventCount trait to bufferable types (#11227)
Browse files Browse the repository at this point in the history
This will be used by future changes to the disk buffer to deal with
event arrays, which may have more than one event per record.
  • Loading branch information
bruceg committed Feb 7, 2022
1 parent 6bcfae1 commit 1eab953
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 7 deletions.
8 changes: 7 additions & 1 deletion lib/vector-buffers/benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vector_buffers::{
builder::TopologyBuilder,
channel::{BufferReceiver, BufferSender},
},
BufferType,
BufferType, EventCount,
};
use vector_common::byte_size_of::ByteSizeOf;

Expand All @@ -37,6 +37,12 @@ impl<const N: usize> ByteSizeOf for Message<N> {
}
}

impl<const N: usize> EventCount for Message<N> {
fn event_count(&self) -> usize {
1
}
}

#[derive(Debug)]
pub struct EncodeError;

Expand Down
8 changes: 7 additions & 1 deletion lib/vector-buffers/examples/buffer_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use vector_buffers::{
builder::TopologyBuilder,
channel::{BufferReceiver, BufferSender},
},
Acker, BufferType, Bufferable, WhenFull,
Acker, BufferType, Bufferable, EventCount, WhenFull,
};
use vector_common::byte_size_of::ByteSizeOf;

Expand All @@ -48,6 +48,12 @@ impl ByteSizeOf for VariableMessage {
}
}

impl EventCount for VariableMessage {
fn event_count(&self) -> usize {
1
}
}

impl FixedEncodable for VariableMessage {
type EncodeError = EncodeError;
type DecodeError = DecodeError;
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-buffers/src/disk_v2/tests/known_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
ReaderError,
},
encoding::{AsMetadata, Encodable},
EventCount,
};

#[tokio::test]
Expand Down Expand Up @@ -725,6 +726,12 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
}
}

impl EventCount for ControllableRecord {
fn event_count(&self) -> usize {
1
}
}

with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

Expand Down
14 changes: 13 additions & 1 deletion lib/vector-buffers/src/disk_v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
buffer_usage_data::BufferUsageHandle,
disk_v2::{Buffer, DiskBufferConfig, Reader, Writer},
encoding::FixedEncodable,
Acker, Bufferable, WhenFull,
Acker, Bufferable, EventCount, WhenFull,
};

mod acknowledgements;
Expand Down Expand Up @@ -182,6 +182,12 @@ impl ByteSizeOf for SizedRecord {
}
}

impl EventCount for SizedRecord {
fn event_count(&self) -> usize {
1
}
}

impl FixedEncodable for SizedRecord {
type EncodeError = io::Error;
type DecodeError = io::Error;
Expand Down Expand Up @@ -221,6 +227,12 @@ impl ByteSizeOf for UndecodableRecord {
}
}

impl EventCount for UndecodableRecord {
fn event_count(&self) -> usize {
1
}
}

impl FixedEncodable for UndecodableRecord {
type EncodeError = io::Error;
type DecodeError = io::Error;
Expand Down
15 changes: 13 additions & 2 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,23 @@ impl Arbitrary for WhenFull {
///
/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
pub trait Bufferable:
ByteSizeOf + Encodable + Debug + Send + Sync + Unpin + Sized + 'static
ByteSizeOf + EventCount + Encodable + Debug + Send + Sync + Unpin + Sized + 'static
{
}

// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where
T: ByteSizeOf + Encodable + Debug + Send + Sync + Unpin + Sized + 'static
T: ByteSizeOf + EventCount + Encodable + Debug + Send + Sync + Unpin + Sized + 'static
{
}

pub trait EventCount {
fn event_count(&self) -> usize;
}

#[cfg(test)]
impl EventCount for u64 {
fn event_count(&self) -> usize {
1
}
}
8 changes: 7 additions & 1 deletion lib/vector-buffers/src/test/common/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::{Buf, BufMut};
use quickcheck::{Arbitrary, Gen};
use vector_common::byte_size_of::ByteSizeOf;

use crate::encoding::FixedEncodable;
use crate::{encoding::FixedEncodable, EventCount};

#[derive(Debug)]
pub struct EncodeError;
Expand Down Expand Up @@ -45,6 +45,12 @@ impl ByteSizeOf for Message {
}
}

impl EventCount for Message {
fn event_count(&self) -> usize {
1
}
}

impl Arbitrary for Message {
fn arbitrary(g: &mut Gen) -> Self {
Message {
Expand Down
8 changes: 7 additions & 1 deletion lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, SecondsFormat, Utc};
use enumflags2::{bitflags, BitFlags, FromBitsError};
use prost::Message;
use snafu::Snafu;
use vector_buffers::encoding::{AsMetadata, Encodable};
use vector_buffers::{encoding::AsMetadata, encoding::Encodable, EventCount};
use vector_common::EventDataEq;

use crate::ByteSizeOf;
Expand Down Expand Up @@ -64,6 +64,12 @@ impl ByteSizeOf for Event {
}
}

impl EventCount for Event {
fn event_count(&self) -> usize {
1
}
}

impl Finalizable for Event {
fn take_finalizers(&mut self) -> EventFinalizers {
match self {
Expand Down

0 comments on commit 1eab953

Please sign in to comment.