Skip to content

Commit

Permalink
fix(file source): correct an error with line aggregation in `continue…
Browse files Browse the repository at this point in the history
…_through` and `halt_before` (#3262)

* Hint which data is where

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Add real life test case

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Fix the issue with emitting multiple lines

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Rename real_life_use_case_1 into two_lines_emit_with_continue_through

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Adjust the samples at two_lines_emit_with_continue_through for better formatting

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Add a two_lines_emit_with_halt_before test

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Fix an issue with emitting two messages at HaltBefore mode

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Add a comment on debug_assert

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Use pretty_assertions

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Fix another set of bugs and corrected two_lines_emit_with_halt_before test

- two_lines_emit_with_halt_before had hard to grasp example, which
  resulted in incorrect semantics, and effective a *wrong test*; the
  test samples were rewritten to ensure
- added more sample for the stashing logic tests, and hinted which test
  lines are going to be stashed for easier correspondence of the test logic
  with the code

Signed-off-by: MOZGIII <mike-n@narod.ru>
  • Loading branch information
MOZGIII committed Jul 31, 2020
1 parent bafb7c3 commit 0d497ec
Showing 1 changed file with 140 additions and 10 deletions.
150 changes: 140 additions & 10 deletions src/sources/file/line_agg.rs
Expand Up @@ -79,6 +79,11 @@ pub(super) struct LineAgg<T, K> {
/// Key is usually a filename or other line source identifier.
buffers: HashMap<K, BytesMut>,

/// Stashed lines. When line aggreation results in more than one line being
/// emitted, we have to stash lines and return them into the stream after
/// that before doing any other work.
stashed: Option<(Bytes, K)>,

/// Draining queue. We switch to draining mode when we get `None` from
/// the inner stream. In this mode we stop polling `inner` for new lines
/// and just flush all the buffered data.
Expand All @@ -102,6 +107,7 @@ where
config,

draining: None,
stashed: None,
buffers: HashMap::new(),
timeouts: DelayQueue::new(),
expired: VecDeque::new(),
Expand All @@ -120,6 +126,18 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// If we have a stashed line, process it before doing anything else.
if let Some((line, src)) = self.stashed.take() {
// Handle the stashed line. If the handler gave us something -
// return it, otherwise restart the loop iteration to start
// anew. Handler could've stashed another value, continuing to
// the new loop iteration handles that.
if let Some(val) = self.handle_line_and_stashing(line, src) {
return Ok(Async::Ready(Some(val)));
}
continue;
}

// If we're in draining mode, short circut here.
if let Some(to_drain) = &mut self.draining {
if let Some((line, src)) = to_drain.pop() {
Expand All @@ -139,7 +157,7 @@ where
// Handle the incoming line we got from `inner`. If the
// handler gave us something - return it, otherwise continue
// with the flow.
if let Some(val) = self.handle_line(line, src) {
if let Some(val) = self.handle_line_and_stashing(line, src) {
return Ok(Async::Ready(Some(val)));
}
}
Expand All @@ -166,13 +184,22 @@ where
}
}

/// Specifies the amount of lines to emit in response to a single input line.
/// We have to emit either one or two lines.
enum Emit {
/// Emit one line.
One(Bytes),
/// Emit two lines, in the order they're specified.
Two(Bytes, Bytes),
}

impl<T, K> LineAgg<T, K>
where
T: Stream<Item = (Bytes, K), Error = ()>,
K: Hash + Eq + Clone,
{
/// Handle line, if we have something to output - return it.
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> {
fn handle_line(&mut self, line: Bytes, src: K) -> Option<(Emit, K)> {
// Check if we already have the buffered data for the source.
match self.buffers.entry(src) {
Entry::Occupied(mut entry) => {
Expand All @@ -186,8 +213,8 @@ where
add_next_line(buffered, line);
None
} else {
let buffered = entry.insert(line.as_ref().into());
Some((buffered.freeze(), entry.key().clone()))
let (src, buffered) = entry.remove_entry();
Some((Emit::Two(buffered.freeze(), line), src))
}
}
// All consecutive lines matching this pattern, plus one
Expand All @@ -200,15 +227,15 @@ where
} else {
let (src, mut buffered) = entry.remove_entry();
add_next_line(&mut buffered, line);
Some((buffered.freeze(), src))
Some((Emit::One(buffered.freeze()), src))
}
}
// All consecutive lines not matching this pattern are included
// in the group.
Mode::HaltBefore => {
if condition_matched {
let buffered = entry.insert(line.as_ref().into());
Some((buffered.freeze(), entry.key().clone()))
let (src, buffered) = entry.remove_entry();
Some((Emit::Two(buffered.freeze(), line), src))
} else {
let buffered = entry.get_mut();
add_next_line(buffered, line);
Expand All @@ -221,7 +248,7 @@ where
if condition_matched {
let (src, mut buffered) = entry.remove_entry();
add_next_line(&mut buffered, line);
Some((buffered.freeze(), src))
Some((Emit::One(buffered.freeze()), src))
} else {
let buffered = entry.get_mut();
add_next_line(buffered, line);
Expand All @@ -241,11 +268,36 @@ where
None
} else {
// It's just a regular line we don't really care about.
Some((line, entry.into_key()))
Some((Emit::One(line), entry.into_key()))
}
}
}
}

/// Handle line and do stashing of extra emitted lines.
/// Requires that the `stashed` item is empty (i.e. entry is vacant). This
/// invariant has to be taken care of by the caller.
fn handle_line_and_stashing(&mut self, line: Bytes, src: K) -> Option<(Bytes, K)> {
// Stashed line is always consumed at the start of the `poll`
// loop before entering this line processing logic. If it's
// non-empty here - it's a bug.
debug_assert!(self.stashed.is_none());
let val = self.handle_line(line, src)?;
let val = match val {
// If we have to emit just one line - that's easy,
// we just return it.
(Emit::One(line), src) => (line, src),
// If we have to emit two lines - take the second
// one and stash it, then return the first one.
// This way, the stashed line will be returned
// on the next stream poll.
(Emit::Two(line, to_stash), src) => {
self.stashed = Some((to_stash, src.clone()));
(line, src)
}
};
Some(val)
}
}

fn add_next_line(buffered: &mut BytesMut, line: Bytes) {
Expand All @@ -256,6 +308,7 @@ fn add_next_line(buffered: &mut BytesMut, line: Bytes) {
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;

#[test]
fn mode_continue_through_1() {
Expand Down Expand Up @@ -425,6 +478,80 @@ mod tests {
run_and_assert(&lines, config, &expected);
}

/// https://github.com/timberio/vector/issues/3237
#[test]
fn two_lines_emit_with_continue_through() {
let lines = vec![
"not merged 1", // will NOT be stashed, but passthroughed
" merged 1",
" merged 2",
"not merged 2", // will be stashed
" merged 3",
" merged 4",
"not merged 3", // will be stashed
"not merged 4", // will NOT be stashed, but passthroughed
" merged 5",
"not merged 5", // will be stashed
" merged 6",
" merged 7",
" merged 8",
"not merged 6", // will be stashed
];
let config = Config {
start_pattern: Regex::new("^\\s").unwrap(),
condition_pattern: Regex::new("^\\s").unwrap(),
mode: Mode::ContinueThrough,
timeout: Duration::from_millis(10),
};
let expected = vec![
"not merged 1",
" merged 1\n merged 2",
"not merged 2",
" merged 3\n merged 4",
"not merged 3",
"not merged 4",
" merged 5",
"not merged 5",
" merged 6\n merged 7\n merged 8",
"not merged 6",
];
run_and_assert(&lines, config, &expected);
}

#[test]
fn two_lines_emit_with_halt_before() {
let lines = vec![
"part 0.1",
"part 0.2",
"START msg 1", // will be stashed
"part 1.1",
"part 1.2",
"START msg 2", // will be stashed
"START msg 3", // will be stashed
"part 3.1",
"START msg 4", // will be stashed
"part 4.1",
"part 4.2",
"part 4.3",
"START msg 5", // will be stashed
];
let config = Config {
start_pattern: Regex::new("").unwrap(),
condition_pattern: Regex::new("^START ").unwrap(),
mode: Mode::HaltBefore,
timeout: Duration::from_millis(10),
};
let expected = vec![
"part 0.1\npart 0.2",
"START msg 1\npart 1.1\npart 1.2",
"START msg 2",
"START msg 3\npart 3.1",
"START msg 4\npart 4.1\npart 4.2\npart 4.3",
"START msg 5",
];
run_and_assert(&lines, config, &expected);
}

#[test]
fn legacy() {
let lines = vec![
Expand Down Expand Up @@ -490,7 +617,10 @@ mod tests {
.map(|line| (Bytes::from_static(line.as_bytes()), "test.log".to_owned()))
.collect();

assert_eq!(actual, expected_mapped);
assert_eq!(
actual, expected_mapped,
"actual on the left, expected on the right",
);
}

fn run_and_assert(lines: &[&'static str], config: Config, expected: &[&'static str]) {
Expand Down

0 comments on commit 0d497ec

Please sign in to comment.