Skip to content

Commit

Permalink
enhancement(docker source): Multiline support (#3607)
Browse files Browse the repository at this point in the history
* enhancement(docker source): Multiline support

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Share the multiline docs among the file and docker source

Signed-off-by: MOZGIII <mike-n@narod.ru>

* Add test

Signed-off-by: MOZGIII <mike-n@narod.ru>
  • Loading branch information
MOZGIII committed Aug 31, 2020
1 parent c00e4e5 commit 1a43593
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 83 deletions.
79 changes: 79 additions & 0 deletions .meta/_partials/fields/_multiline_options.toml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[<%= namespace %>.options.multiline]
type = "table"
category = "Multiline"
description = """\
Multiline parsing configuration.
If not speicified, multiline parsing is disabled.\
"""

[<%= namespace %>.options.multiline.children.start_pattern]
type = "string"
category = "Multiline"
examples = ["^[^\\s]", "\\\\$", "^(INFO|ERROR) ", "[^;]$"]
required = true
sort = 1
description = """\
Start regex pattern to look for as a beginning of the message.\
"""

[<%= namespace %>.options.multiline.children.condition_pattern]
type = "string"
category = "Multiline"
examples = ["^[\\s]+", "\\\\$", "^(INFO|ERROR) ", ";$"]
required = true
sort = 3
description = """\
Condition regex pattern to look for. Exact behavior is configured via `mode`.\
"""

[<%= namespace %>.options.multiline.children.mode]
type = "string"
category = "Multiline"
required = true
sort = 2
description = """\
Mode of operation, specifies how the `condition_pattern` is interpreted.\
"""

[<%= namespace %>.options.multiline.children.mode.enum]
continue_through = """\
All consecutive lines matching this pattern are included in the group. \
The first line (the line that matched the start pattern) does not need \
to match the `ContinueThrough` pattern. \
This is useful in cases such as a Java stack trace, where some indicator \
in the line (such as leading whitespace) indicates that it is an \
extension of the preceeding line.\
"""
continue_past = """\
All consecutive lines matching this pattern, plus one additional line, \
are included in the group. \
This is useful in cases where a log message ends with a continuation \
marker, such as a backslash, indicating that the following line is part \
of the same message.\
"""
halt_before = """\
All consecutive lines not matching this pattern are included in the \
group. \
This is useful where a log line contains a marker indicating that it \
begins a new message.\
"""
halt_with = """\
All consecutive lines, up to and including the first line matching this \
pattern, are included in the group. \
This is useful where a log line ends with a termination marker, such as \
a semicolon.\
"""

[<%= namespace %>.options.multiline.children.timeout_ms]
type = "uint"
category = "Multiline"
examples = [1000, 600000]
unit = "milliseconds"
common = true
required = true
sort = 4
description = """\
The maximum time to wait for the continuation. Once this timeout is \
reached, the buffered message is guaraneed to be flushed, even if \
incomplete.\
"""
2 changes: 2 additions & 0 deletions .meta/sources/docker.toml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,5 @@ required = true
description = """\
The UTC timestamp extracted from the Docker log event.\
"""

<%= render("_partials/fields/_multiline_options.toml", namespace: "source.docker") %>
80 changes: 1 addition & 79 deletions .meta/sources/file.toml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -150,85 +150,7 @@ An approximate limit on the amount of data read from a single file at a given \
time.\
"""

[sources.file.options.multiline]
type = "table"
category = "Multiline"
description = """\
Multiline parsing configuration (per file).
If not speicified, multiline parsing is disabled.\
"""

[sources.file.options.multiline.children.start_pattern]
type = "string"
category = "Multiline"
examples = ["^[^\\s]", "\\\\$", "^(INFO|ERROR) ", "[^;]$"]
required = true
sort = 1
description = """\
Start regex pattern to look for as a beginning of the message.\
"""

[sources.file.options.multiline.children.condition_pattern]
type = "string"
category = "Multiline"
examples = ["^[\\s]+", "\\\\$", "^(INFO|ERROR) ", ";$"]
required = true
sort = 3
description = """\
Condition regex pattern to look for. Exact behavior is configured via `mode`.\
"""

[sources.file.options.multiline.children.mode]
type = "string"
category = "Multiline"
required = true
sort = 2
description = """\
Mode of operation, specifies how the `condition_pattern` is interpreted.\
"""

[sources.file.options.multiline.children.mode.enum]
continue_through = """\
All consecutive lines matching this pattern are included in the group. \
The first line (the line that matched the start pattern) does not need \
to match the `ContinueThrough` pattern. \
This is useful in cases such as a Java stack trace, where some indicator \
in the line (such as leading whitespace) indicates that it is an \
extension of the preceeding line.\
"""
continue_past = """\
All consecutive lines matching this pattern, plus one additional line, \
are included in the group. \
This is useful in cases where a log message ends with a continuation \
marker, such as a backslash, indicating that the following line is part \
of the same message.\
"""
halt_before = """\
All consecutive lines not matching this pattern are included in the \
group. \
This is useful where a log line contains a marker indicating that it \
begins a new message.\
"""
halt_with = """\
All consecutive lines, up to and including the first line matching this \
pattern, are included in the group. \
This is useful where a log line ends with a termination marker, such as \
a semicolon.\
"""

[sources.file.options.multiline.children.timeout_ms]
type = "uint"
category = "Multiline"
examples = [1000, 600000]
unit = "milliseconds"
common = true
required = true
sort = 4
description = """\
The maximum time to wait for the continuation. Once this timeout is \
reached, the buffered message is guaraneed to be flushed, even if \
incomplete.\
"""
<%= render("_partials/fields/_multiline_options.toml", namespace: "source.file") %>

[sources.file.options.oldest_first]
type = "bool"
Expand Down
116 changes: 112 additions & 4 deletions src/sources/docker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::util::MultilineConfig;
use crate::{
config::{DataType, GlobalOptions, SourceConfig, SourceDescription},
event::merge_state::LogEventMergeState,
Expand All @@ -7,6 +8,7 @@ use crate::{
DockerContainerUnwatch, DockerContainerWatch, DockerEventReceived,
DockerLoggingDriverUnsupported, DockerTimestampParseFailed,
},
line_agg::{self, LineAgg},
shutdown::ShutdownSignal,
Pipeline,
};
Expand All @@ -29,7 +31,7 @@ use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::sync::Arc;
use std::{collections::HashMap, env};
use std::{collections::HashMap, convert::TryFrom, env};
use string_cache::DefaultAtom as Atom;
use tokio::sync::mpsc;
use tracing::field;
Expand All @@ -55,6 +57,7 @@ pub struct DockerConfig {
include_images: Option<Vec<String>>,
partial_event_marker_field: Option<Atom>,
auto_partial_merge: bool,
multiline: Option<MultilineConfig>,
}

impl Default for DockerConfig {
Expand All @@ -65,6 +68,7 @@ impl Default for DockerConfig {
include_images: None,
partial_event_marker_field: Some(event::PARTIAL.clone()),
auto_partial_merge: true,
multiline: None,
}
}
}
Expand Down Expand Up @@ -155,6 +159,7 @@ impl SourceConfig for DockerConfig {

struct DockerSourceCore {
config: DockerConfig,
line_agg_config: Option<line_agg::Config>,
docker: Docker,
/// Only logs created at, or after this moment are logged.
now_timestamp: DateTime<Utc>,
Expand All @@ -173,8 +178,15 @@ impl DockerSourceCore {
now = %now.to_rfc3339()
);

let line_agg_config = if let Some(ref multiline_config) = config.multiline {
Some(line_agg::Config::try_from(multiline_config)?)
} else {
None
};

Ok(DockerSourceCore {
config,
line_agg_config,
docker,
now_timestamp: now.into(),
})
Expand Down Expand Up @@ -549,7 +561,7 @@ impl EventStreamBuilder {
// Create event streamer
let mut partial_event_merge_state = None;

stream
let events_stream = stream
.map(|value| {
match value {
Ok(message) => Ok(info.new_event(
Expand Down Expand Up @@ -580,8 +592,21 @@ impl EventStreamBuilder {
}
})
.take_while(|v| future::ready(v.is_ok()))
.filter_map(|v| future::ready(v.transpose()))
.take_until(self.shutdown.clone().compat())
.filter_map(|v| future::ready(v.unwrap()))
.take_until(self.shutdown.clone().compat());

let events_stream: Box<dyn Stream<Item = Event> + Unpin + Send> =
if let Some(ref line_agg_config) = self.core.line_agg_config {
Box::new(line_agg_adapter(
events_stream,
line_agg::Logic::new(line_agg_config.clone()),
))
} else {
Box::new(events_stream)
};

events_stream
.map(Ok)
.forward(self.out.clone().sink_compat().sink_map_err(|_| ()))
.map(|_| {})
.await;
Expand Down Expand Up @@ -927,6 +952,31 @@ fn docker() -> Result<Docker, DockerError> {
}
}

fn line_agg_adapter(
inner: impl Stream<Item = Event> + Unpin,
logic: line_agg::Logic<Bytes, LogEvent>,
) -> impl Stream<Item = Event> {
let line_agg_in = inner.map(|event| {
let mut log_event = event.into_log();

let message_value = log_event
.remove(event::log_schema().message_key())
.expect("message must exist in the event");
let stream_value = log_event
.get(&STREAM)
.expect("stream must exist in the event");

let stream = stream_value.as_bytes();
let message = message_value.into_bytes();
(stream, message, log_event)
});
let line_agg_out = LineAgg::<_, Bytes, LogEvent>::new(line_agg_in, logic);
line_agg_out.map(|(_, message, mut log_event)| {
log_event.insert(event::log_schema().message_key(), message);
Event::Log(log_event)
})
}

#[cfg(all(test, feature = "docker-integration-tests"))]
mod tests {
use super::*;
Expand Down Expand Up @@ -1401,4 +1451,62 @@ mod tests {
let log = events[0].as_log();
assert_eq!(log[&event::log_schema().message_key()], message.into());
}

#[tokio::test]
async fn merge_multiline() {
trace_init();

let emitted_messages = vec![
"java.lang.Exception",
" at com.foo.bar(bar.java:123)",
" at com.foo.baz(baz.java:456)",
];
let expected_messages = vec![concat!(
"java.lang.Exception\n",
" at com.foo.bar(bar.java:123)\n",
" at com.foo.baz(baz.java:456)",
)];
let name = "vector_test_merge_multiline";
let config = DockerConfig {
include_containers: Some(vec![name.to_owned()]),
include_images: Some(vec!["busybox".to_owned()]),
multiline: Some(MultilineConfig {
start_pattern: "^[^\\s]".to_owned(),
condition_pattern: "^[\\s]+at".to_owned(),
mode: line_agg::Mode::ContinueThrough,
timeout_ms: 10,
}),
..DockerConfig::default()
};

let out = source_with_config(config);

let docker = docker().unwrap();

let command = emitted_messages
.into_iter()
.map(|message| format!("echo {:?}", message))
.collect::<Box<_>>()
.join(" && ");

let id = cmd_container(name, None, vec!["sh", "-c", &command], &docker).await;
if let Err(error) = container_run(&id, &docker).await {
container_remove(&id, &docker).await;
panic!("Container failed to start with error: {:?}", error);
}
let events = collect_n(out, expected_messages.len()).await.unwrap();
container_remove(&id, &docker).await;

let actual_messages = events
.into_iter()
.map(|event| {
event
.into_log()
.remove(event::log_schema().message_key())
.unwrap()
.to_string_lossy()
})
.collect::<Vec<_>>();
assert_eq!(actual_messages, expected_messages);
}
}

0 comments on commit 1a43593

Please sign in to comment.