Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): correct an error with line aggregation in continue_through and halt_before #3262

Merged
merged 10 commits into from Jul 31, 2020
112 changes: 103 additions & 9 deletions src/sources/file/line_agg.rs
Expand Up @@ -79,6 +79,11 @@ pub(super) struct LineAgg<T, K> {
/// Key is usually a filename or other line source identifier.
buffers: HashMap<K, BytesMut>,

/// Stashed lines. When line aggreation results in more than one line being
/// emitted, we have to stash lines and return them into the stream after
/// that before doing any other work.
stashed: Option<(Bytes, K)>,

/// Draining queue. We switch to draining mode when we get `None` from
/// the inner stream. In this mode we stop polling `inner` for new lines
/// and just flush all the buffered data.
Expand All @@ -102,6 +107,7 @@ where
config,

draining: None,
stashed: None,
buffers: HashMap::new(),
timeouts: DelayQueue::new(),
expired: VecDeque::new(),
Expand All @@ -120,6 +126,11 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// If we have a stashed line, short circut here.
if let Some(val) = self.stashed.take() {
return Ok(Async::Ready(Some(val)));
}

// If we're in draining mode, short circut here.
if let Some(to_drain) = &mut self.draining {
if let Some((line, src)) = to_drain.pop() {
Expand All @@ -140,6 +151,23 @@ where
// handler gave us something - return it, otherwise continue
// with the flow.
if let Some(val) = self.handle_line(line, src) {
let val = match val {
// If we have to emit just one line - that's easy,
// we just return it.
(Emit::One(val), src) => (val, src),
// If we have to emit two lines - take the second
// one and stash it, then return the first one.
// This way, the stashed line will be returned
// on the next stream poll.
(Emit::Two(val, to_stash), src) => {
// Stashed line is always consumed at the start
// of the `poll` loop. If it's non-empty here -
// it's a bug.
debug_assert!(self.stashed.is_none());
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
self.stashed = Some((to_stash, src.clone()));
(val, src)
}
};
return Ok(Async::Ready(Some(val)));
}
}
Expand All @@ -166,13 +194,22 @@ where
}
}

/// Specifies the amount of lines to emit in response to a single input line.
/// We have to emit either one or two lines.
enum Emit {
/// Emit one line.
One(Bytes),
/// Emit two lines, in the order they're specified.
Two(Bytes, Bytes),
}

impl<T, K> LineAgg<T, K>
where
T: Stream<Item = (Bytes, K), Error = ()>,
K: Hash + Eq + Clone,
{
/// Handle line, if we have something to output - return it.
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> {
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Emit, K)> {
// Check if we already have the buffered data for the source.
match self.buffers.entry(src) {
Entry::Occupied(mut entry) => {
Expand All @@ -186,8 +223,8 @@ where
add_next_line(buffered, line);
None
} else {
let buffered = entry.insert(line.as_ref().into());
Some((buffered.freeze(), entry.key().clone()))
let (src, buffered) = entry.remove_entry();
Some((Emit::Two(buffered.freeze(), line), src))
}
}
// All consecutive lines matching this pattern, plus one
Expand All @@ -200,15 +237,15 @@ where
} else {
let (src, mut buffered) = entry.remove_entry();
add_next_line(&mut buffered, line);
Some((buffered.freeze(), src))
Some((Emit::One(buffered.freeze()), src))
}
}
// All consecutive lines not matching this pattern are included
// in the group.
Mode::HaltBefore => {
if condition_matched {
let buffered = entry.insert(line.as_ref().into());
Some((buffered.freeze(), entry.key().clone()))
let (src, buffered) = entry.remove_entry();
Some((Emit::Two(buffered.freeze(), line), src))
} else {
let buffered = entry.get_mut();
add_next_line(buffered, line);
Expand All @@ -221,7 +258,7 @@ where
if condition_matched {
let (src, mut buffered) = entry.remove_entry();
add_next_line(&mut buffered, line);
Some((buffered.freeze(), src))
Some((Emit::One(buffered.freeze()), src))
} else {
let buffered = entry.get_mut();
add_next_line(buffered, line);
Expand All @@ -241,7 +278,7 @@ where
None
} else {
// It's just a regular line we don't really care about.
Some((line, entry.into_key()))
Some((Emit::One(line), entry.into_key()))
}
}
}
Expand All @@ -256,6 +293,7 @@ fn add_next_line(buffered: &mut BytesMut, line: Bytes) {
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I'll have to remember that one for my own tests. Too badd we can't turn that on across all the sources at once.

Copy link
Contributor Author

@MOZGIII MOZGIII Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it being used at the file/mod.rs and it looked really good.

Can't we import it at /lib.rs with an old #[macro_use] extern crate pretty_assertions;?


#[test]
fn mode_continue_through_1() {
Expand Down Expand Up @@ -425,6 +463,59 @@ mod tests {
run_and_assert(&lines, config, &expected);
}

/// https://github.com/timberio/vector/issues/3237
#[test]
fn two_lines_emit_with_continue_through() {
let lines = vec![
"not merged 1",
" merged 1",
" merged 2",
"not merged 2",
" merged 3",
" merged 4",
"not merged 3",
];
let config = Config {
start_pattern: Regex::new("^\\s").unwrap(),
condition_pattern: Regex::new("^\\s").unwrap(),
mode: Mode::ContinueThrough,
timeout: Duration::from_millis(10),
};
let expected = vec![
"not merged 1",
" merged 1\n merged 2",
"not merged 2",
" merged 3\n merged 4",
"not merged 3",
];
run_and_assert(&lines, config, &expected);
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn two_lines_emit_with_halt_before() {
let lines = vec![
"merged 1",
"merged 2",
" splitter 1",
"merged 3",
"merged 4",
" splitter 2",
];
let config = Config {
start_pattern: Regex::new("").unwrap(),
condition_pattern: Regex::new("^\\s").unwrap(),
mode: Mode::HaltBefore,
timeout: Duration::from_millis(10),
};
let expected = vec![
"merged 1\nmerged 2",
" splitter 1",
"merged 3\nmerged 4",
" splitter 2",
];
run_and_assert(&lines, config, &expected);
MOZGIII marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn legacy() {
let lines = vec![
Expand Down Expand Up @@ -490,7 +581,10 @@ mod tests {
.map(|line| (Bytes::from_static(line.as_bytes()), "test.log".to_owned()))
.collect();

assert_eq!(actual, expected_mapped);
assert_eq!(
actual, expected_mapped,
"actual on the left, expected on the right",
);
}

fn run_and_assert(lines: &[&'static str], config: Config, expected: &[&'static str]) {
Expand Down