New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(syslog source): Syslog source as a (socket+remap) composition #7046
enhancement(syslog source): Syslog source as a (socket+remap) composition #7046
Conversation
65661c2
to
ab167b6
Compare
cc @FungusHumungus on the structural differences. What do you think the best path forward is to preserving backward compatibility? |
The differences between the Syslog source and the Socket sources is that they use different decoders. Tcp uses a BytesDelimitedCodec (each of the socket sources are actually slightly different - but I think the principles are still largely the same), Syslog uses a custom build SyslogDecoder. The syslog decoder handles octet counting. If the first bytes of the incoming message are numerical it uses that number to determine how long the message will be. This allows for messages containing newlines - otherwise the message will be considered only up to the first newline. To maintain backward compatibility we need to be able to pass a parameter into the Socket source to allow you to specify the decoder to use. Possible this should be a hidden option? The Syslog source can pass the Syslog decoder to the Socket source. A slight complication is that the Decoder is an associated type for the source, so it's not just a simple matter of being able to pass in the Decoder directly. We would likely need to create a wrapper struct that points to a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @FungusHumungus points out, this needs to handle the syslog protocol octet framing like SyslogDecoder
does, in addition to tweaking the output event for field compatibility.
src/sources/syslog_remap.rs
Outdated
|
||
impl GenerateConfig for SyslogRemapConfig { | ||
fn generate_config() -> toml::Value { | ||
return SocketConfig::generate_config(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return SocketConfig::generate_config(); | |
SocketConfig::generate_config() |
src/sources/syslog_remap.rs
Outdated
source: r#" | ||
structured = parse_syslog!(.message) | ||
. = merge(., structured) | ||
"# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've started using the indoc!
macro for things like this so they don't have to break the indentation flow like this.
So regarding the octet framing, we will have to keep the |
I feel like I might be missing something. Doesn't the Remap |
From my understanding the Socket source, when listening on a TCP socket, breaks the stream and generate a new event on every newline character (https://github.com/timberio/vector/blob/master/src/sources/socket/tcp.rs#L84-L86) however a single syslog message can contain one or many newline characters, thus the |
There are two layers to this issue. You are correct that both the |
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
a2ff05b
to
ffe44bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good! Re: octet framing, we could add that functionality as an option to the socket source itself and simply configure it here. I'm not sure if there are other use cases for it outside of syslog or not.
src/sources/syslog_remap.rs
Outdated
let (to_transform, rx) = Pipeline::new_with_buffer(100, vec![Box::new(tf)]); | ||
let out = cx.out; | ||
cx.out = to_transform; | ||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating a new pipeline and spawning an additional task, I think we can just do something like:
cx.out.inlines.push(Box::new(tf));
We may want a nicer API than just making inlines
a public field, but the idea is the same.
src/sources/syslog_remap.rs
Outdated
rx.map(|mut event| { | ||
event | ||
.as_mut_log() | ||
.insert(log_schema().source_type_key(), Bytes::from("syslog_remap")); | ||
Ok(event) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting bit I hadn't thought of. Maybe we should make this kind of tagging something that happens from the source context and/or pipeline as well? That way we could again just modify the context and pass it down to keep this simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we do it for every source it would makes sense to do it on a general basis. From what I can tell if we want to leverage the pipeline logic we could use something like the AddFields
transform in the inlines
vec.
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
After doing some test, digging into the current
|
Thanks for the review, regarding the syslog decoder I don't really know where to move it, right now I relocated it with the |
#[derive(Debug, Clone)] | ||
pub enum TcpDecoder { | ||
BytesDecoder(BytesDelimitedCodec), | ||
SyslogDecoder(SyslogDecoder), | ||
} | ||
|
||
impl Decoder for TcpDecoder { | ||
type Item = Bytes; | ||
type Error = io::Error; | ||
|
||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { | ||
match self { | ||
TcpDecoder::BytesDecoder(d) => d.decode(src), | ||
TcpDecoder::SyslogDecoder(d) => d.decode(src), | ||
} | ||
} | ||
|
||
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { | ||
match self { | ||
TcpDecoder::BytesDecoder(d) => d.decode_eof(buf), | ||
TcpDecoder::SyslogDecoder(d) => d.decode_eof(buf), | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is probably a way more idiomatic version of that.
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
f26ca66
to
bf6557a
Compare
As agreed let's postponed this and wait for the codec work to be completed as this seems a good usecase for this. |
Signed-off-by: Pablo Sichert <mail@pablosichert.com>
The "Framing and Codecs" RFC (ongoing work) can be found here #7352. In any case, this PR is very helpful to understand where I need to take a closer look. |
Closing this since @pablosichert will be revisiting with the codec work. I'll leave the branch though. |
Working PoC, I kept the original source to have both sources side by side to easily compare behaviour, early tests show that all socket family (tcp/udp/af_unix) works as expected.
Right now we have slight differences in generated events:
e.g. for
<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
The original
syslog
source will produce:while the one based on the remap transform will produce:
I don't think the way
meta.sequenceId
is serialized really matters as both are ultimately equivalent.However the
host
&source_ip
values may require some additional work to kept the exact same behaviour if deemed mandatory.Todo : port tests/syslog.rs to this new source for early assessment