Skip to content

Commit

Permalink
fix(kubernetes_logs source): Fix events being empty when log namespac…
Browse files Browse the repository at this point in the history
…ing is enabled (#18244)

* save

* save

* save

* added tests

* cleanup

* cleanup

* clippy cleanup

* fix formatting inside macro
  • Loading branch information
fuchsnj authored and neuronull committed Aug 16, 2023
1 parent a701dd8 commit 54ad661
Show file tree
Hide file tree
Showing 8 changed files with 490 additions and 113 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = false

[dependencies]
async-graphql = { version = "5.0.10", default-features = false, features = ["playground" ], optional = true }
async-stream = { version = "0.3.5", default-features = false }
async-trait = { version = "0.1", default-features = false }
bitmask-enum = { version = "2.2.2", default-features = false }
bytes = { version = "1.4.0", default-features = false, features = ["serde"] }
Expand Down
132 changes: 132 additions & 0 deletions lib/vector-core/src/stream/expiration_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::time::Duration;

#[derive(Default)]
pub struct Emitter<T> {
values: Vec<T>,
}

impl<T> Emitter<T> {
pub fn new() -> Self {
Self { values: vec![] }
}
pub fn emit(&mut self, value: T) {
self.values.push(value);
}
}

/// Similar to `stream.filter_map(..).flatten(..)` but also allows checking for expired events
/// and flushing when the input stream ends.
pub fn map_with_expiration<S, T, M, E, F>(
initial_state: S,
input: impl Stream<Item = T> + 'static,
expiration_interval: Duration,
// called for each event
mut map_fn: M,
// called periodically to allow expiring internal state
mut expiration_fn: E,
// called once at the end of the input stream
mut flush_fn: F,
) -> impl Stream<Item = T>
where
M: FnMut(&mut S, T, &mut Emitter<T>),
E: FnMut(&mut S, &mut Emitter<T>),
F: FnMut(&mut S, &mut Emitter<T>),
{
let mut state = initial_state;
let mut flush_stream = tokio::time::interval(expiration_interval);

Box::pin(stream! {
futures_util::pin_mut!(input);
loop {
let mut emitter = Emitter::<T>::new();
let done = tokio::select! {
_ = flush_stream.tick() => {
expiration_fn(&mut state, &mut emitter);
false
}
maybe_event = input.next() => {
match maybe_event {
None => {
flush_fn(&mut state, &mut emitter);
true
}
Some(event) => {
map_fn(&mut state, event, &mut emitter);
false
}
}
}
};
yield futures::stream::iter(emitter.values.into_iter());
if done { break }
}

})
.flatten()
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn test_simple() {
let input = futures::stream::iter([1, 2, 3]);

let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
*state += event;
emitter.emit(*state);
};
let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
// do nothing
};
let flush_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
emitter.emit(*state);
};
let stream: Vec<i32> = map_with_expiration(
0_i32,
input,
Duration::from_secs(100),
map_fn,
expiration_fn,
flush_fn,
)
.take(4)
.collect()
.await;

assert_eq!(vec![1, 3, 6, 6], stream);
}

#[tokio::test]
async fn test_expiration() {
// an input that never ends (to test expiration)
let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending());

let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
*state += event;
emitter.emit(*state);
};
let expiration_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
emitter.emit(*state);
};
let flush_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
// do nothing
};
let stream: Vec<i32> = map_with_expiration(
0_i32,
input,
Duration::from_secs(1),
map_fn,
expiration_fn,
flush_fn,
)
.take(4)
.collect()
.await;

assert_eq!(vec![1, 3, 6, 6], stream);
}
}
1 change: 1 addition & 0 deletions lib/vector-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod batcher;
mod concurrent_map;
mod driver;
pub mod expiration_map;
mod futures_unordered_count;
mod partitioned_batcher;

Expand Down
26 changes: 12 additions & 14 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//! running inside the cluster as a DaemonSet.

#![deny(missing_docs)]

use std::{path::PathBuf, time::Duration};

use bytes::Bytes;
Expand Down Expand Up @@ -32,11 +31,10 @@ use vector_common::{
TimeZone,
};
use vector_config::configurable_component;
use vector_core::{
config::LegacyKey, config::LogNamespace, transform::TaskTransform, EstimatedJsonEncodedSizeOf,
};
use vector_core::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf};
use vrl::value::{kind::Collection, Kind};

use crate::sources::kubernetes_logs::partial_events_merger::merge_partial_events;
use crate::{
config::{
log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig,
Expand Down Expand Up @@ -72,9 +70,6 @@ use self::node_metadata_annotator::NodeMetadataAnnotator;
use self::parser::Parser;
use self::pod_metadata_annotator::PodMetadataAnnotator;

/// The key we use for `file` field.
const FILE_KEY: &str = "file";

/// The `self_node_name` value env var key.
const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";

Expand Down Expand Up @@ -781,12 +776,6 @@ impl Source {

let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);

let mut parser = Parser::new(log_namespace);
let partial_events_merger = Box::new(partial_events_merger::build(
auto_partial_merge,
log_namespace,
));

let checkpoints = checkpointer.view();
let events = file_source_rx.flat_map(futures::stream::iter);
let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
Expand All @@ -800,6 +789,7 @@ impl Source {
ingestion_timestamp_field.as_ref(),
log_namespace,
);

let file_info = annotator.annotate(&mut event, &line.filename);

emit!(KubernetesLogsEventsReceived {
Expand Down Expand Up @@ -834,14 +824,22 @@ impl Source {
checkpoints.update(line.file_id, line.end_offset);
event
});

let mut parser = Parser::new(log_namespace);
let events = events.flat_map(move |event| {
let mut buf = OutputBuffer::with_capacity(1);
parser.transform(&mut buf, event);
futures::stream::iter(buf.into_events())
});

let (events_count, _) = events.size_hint();

let mut stream = partial_events_merger.transform(Box::pin(events));
let mut stream = if auto_partial_merge {
merge_partial_events(events, log_namespace).left_stream()
} else {
events.right_stream()
};

let event_processing_loop = out.send_event_stream(&mut stream);

let mut lifecycle = Lifecycle::new();
Expand Down
Loading

0 comments on commit 54ad661

Please sign in to comment.