Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): correct an error with line aggregation in continue_through and halt_before #3262

Merged
merged 10 commits into from Jul 31, 2020
150 changes: 140 additions & 10 deletions src/sources/file/line_agg.rs
Expand Up @@ -79,6 +79,11 @@ pub(super) struct LineAgg<T, K> {
/// Key is usually a filename or other line source identifier.
buffers: HashMap<K, BytesMut>,

/// Stashed lines. When line aggreation results in more than one line being
/// emitted, we have to stash lines and return them into the stream after
/// that before doing any other work.
stashed: Option<(Bytes, K)>,

/// Draining queue. We switch to draining mode when we get `None` from
/// the inner stream. In this mode we stop polling `inner` for new lines
/// and just flush all the buffered data.
Expand All @@ -102,6 +107,7 @@ where
config,

draining: None,
stashed: None,
buffers: HashMap::new(),
timeouts: DelayQueue::new(),
expired: VecDeque::new(),
Expand All @@ -120,6 +126,18 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// If we have a stashed line, process it before doing anything else.
if let Some((line, src)) = self.stashed.take() {
// Handle the stashed line. If the handler gave us something -
// return it, otherwise restart the loop iteration to start
// anew. Handler could've stashed another value, continuing to
// the new loop iteration handles that.
if let Some(val) = self.handle_line_and_stashing(line, src) {
return Ok(Async::Ready(Some(val)));
}
continue;
}

// If we're in draining mode, short circut here.
if let Some(to_drain) = &mut self.draining {
if let Some((line, src)) = to_drain.pop() {
Expand All @@ -139,7 +157,7 @@ where
// Handle the incoming line we got from `inner`. If the
// handler gave us something - return it, otherwise continue
// with the flow.
if let Some(val) = self.handle_line(line, src) {
if let Some(val) = self.handle_line_and_stashing(line, src) {
return Ok(Async::Ready(Some(val)));
}
}
Expand All @@ -166,13 +184,22 @@ where
}
}

/// Specifies the amount of lines to emit in response to a single input line.
/// We have to emit either one or two lines.
enum Emit {
/// Emit one line.
One(Bytes),
/// Emit two lines, in the order they're specified.
Two(Bytes, Bytes),
}

impl<T, K> LineAgg<T, K>
where
T: Stream<Item = (Bytes, K), Error = ()>,
K: Hash + Eq + Clone,
{
/// Handle line, if we have something to output - return it.
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> {
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Emit, K)> {
// Check if we already have the buffered data for the source.
match self.buffers.entry(src) {
Entry::Occupied(mut entry) => {
Expand All @@ -186,8 +213,8 @@ where
add_next_line(buffered, line);
None
} else {
let buffered = entry.insert(line.as_ref().into());
Some((buffered.freeze(), entry.key().clone()))
let (src, buffered) = entry.remove_entry();
Some((Emit::Two(buffered.freeze(), line), src))
}
}
// All consecutive lines matching this pattern, plus one
Expand All @@ -200,15 +227,15 @@ where
} else {
let (src, mut buffered) = entry.remove_entry();
add_next_line(&mut buffered, line);
Some((buffered.freeze(), src))
Some((Emit::One(buffered.freeze()), src))
}
}
// All consecutive lines not matching this pattern are included
// in the group.
Mode::HaltBefore => {
if condition_matched {
let buffered = entry.insert(line.as_ref().into());
Some((buffered.freeze(), entry.key().clone()))
let (src, buffered) = entry.remove_entry();
Some((Emit::Two(buffered.freeze(), line), src))
} else {
let buffered = entry.get_mut();
add_next_line(buffered, line);
Expand All @@ -221,7 +248,7 @@ where
if condition_matched {
let (src, mut buffered) = entry.remove_entry();
add_next_line(&mut buffered, line);
Some((buffered.freeze(), src))
Some((Emit::One(buffered.freeze()), src))
} else {
let buffered = entry.get_mut();
add_next_line(buffered, line);
Expand All @@ -241,11 +268,36 @@ where
None
} else {
// It's just a regular line we don't really care about.
Some((line, entry.into_key()))
Some((Emit::One(line), entry.into_key()))
}
}
}
}

/// Handle line and do stashing of extra emitted lines.
/// Requires that the `stashed` item is empty (i.e. entry is vacant). This
/// invariant has to be taken care of by the caller.
fn handle_line_and_stashing(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> {
// Stashed line is always consumed at the start of the `poll`
// loop before entering this line processing logic. If it's
// non-empty here - it's a bug.
debug_assert!(self.stashed.is_none());
let val = self.handle_line(line, src)?;
let val = match val {
// If we have to emit just one line - that's easy,
// we just return it.
(Emit::One(line), src) => (line, src),
// If we have to emit two lines - take the second
// one and stash it, then return the first one.
// This way, the stashed line will be returned
// on the next stream poll.
(Emit::Two(line, to_stash), src) => {
self.stashed = Some((to_stash, src.clone()));
(line, src)
}
};
Some(val)
}
}

fn add_next_line(buffered: &mut BytesMut, line: Bytes) {
Expand All @@ -256,6 +308,7 @@ fn add_next_line(buffered: &mut BytesMut, line: Bytes) {
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I'll have to remember that one for my own tests. Too badd we can't turn that on across all the sources at once.

Copy link
Contributor Author

@MOZGIII MOZGIII Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it being used at the file/mod.rs and it looked really good.

Can't we import it at /lib.rs with an old #[macro_use] extern crate pretty_assertions;?


#[test]
fn mode_continue_through_1() {
Expand Down Expand Up @@ -425,6 +478,80 @@ mod tests {
run_and_assert(&lines, config, &expected);
}

/// https://github.com/timberio/vector/issues/3237
#[test]
fn two_lines_emit_with_continue_through() {
let lines = vec![
"not merged 1", // will NOT be stashed, but passthroughed
" merged 1",
" merged 2",
"not merged 2", // will be stashed
" merged 3",
" merged 4",
"not merged 3", // will be stashed
"not merged 4", // will NOT be stashed, but passthroughed
" merged 5",
"not merged 5", // will be stashed
" merged 6",
" merged 7",
" merged 8",
"not merged 6", // will be stashed
];
let config = Config {
start_pattern: Regex::new("^\\s").unwrap(),
condition_pattern: Regex::new("^\\s").unwrap(),
mode: Mode::ContinueThrough,
timeout: Duration::from_millis(10),
};
let expected = vec![
"not merged 1",
" merged 1\n merged 2",
"not merged 2",
" merged 3\n merged 4",
"not merged 3",
"not merged 4",
" merged 5",
"not merged 5",
" merged 6\n merged 7\n merged 8",
"not merged 6",
];
run_and_assert(&lines, config, &expected);
}

#[test]
fn two_lines_emit_with_halt_before() {
let lines = vec![
"part 0.1",
"part 0.2",
"START msg 1", // will be stashed
"part 1.1",
"part 1.2",
"START msg 2", // will be stashed
"START msg 3", // will be stashed
"part 3.1",
"START msg 4", // will be stashed
"part 4.1",
"part 4.2",
"part 4.3",
"START msg 5", // will be stashed
];
let config = Config {
start_pattern: Regex::new("").unwrap(),
condition_pattern: Regex::new("^START ").unwrap(),
mode: Mode::HaltBefore,
timeout: Duration::from_millis(10),
};
let expected = vec![
"part 0.1\npart 0.2",
"START msg 1\npart 1.1\npart 1.2",
"START msg 2",
"START msg 3\npart 3.1",
"START msg 4\npart 4.1\npart 4.2\npart 4.3",
"START msg 5",
];
run_and_assert(&lines, config, &expected);
}

#[test]
fn legacy() {
let lines = vec![
Expand Down Expand Up @@ -490,7 +617,10 @@ mod tests {
.map(|line| (Bytes::from_static(line.as_bytes()), "test.log".to_owned()))
.collect();

assert_eq!(actual, expected_mapped);
assert_eq!(
actual, expected_mapped,
"actual on the left, expected on the right",
);
}

fn run_and_assert(lines: &[&'static str], config: Config, expected: &[&'static str]) {
Expand Down