-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix: actually batch TCP source decoder outputs #10506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
✔️ Deploy Preview for vector-project canceled. 🔨 Explore the source changes: 14c246d 🔍 Inspect the deploy log: https://app.netlify.com/sites/vector-project/deploys/61c0ffe9aadc8000086f343d |
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
8d9d9ea
to
f8b2054
Compare
Soak Test ResultsBaseline: bd8f097 ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Test units below are bytes/second/CPU, except for "skewness". The further "skewness" is from 0.0 the more indication that vector lacks consistency in behavior, making predictions of fitness in the field challenging. The abbreviated table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. The abbreviated table will be omitted if no statistically interesting changes are observed.
Fine details of change detection per experiment.
Fine details of each soak run.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall very happy with this. I have some comments around constants mostly but this strikes me as very close to merging.
src/codecs/ready_frames.rs
Outdated
Poll::Ready(Some(Ok((frame, size)))) => { | ||
self.enqueued.push(frame); | ||
self.enqueued_size += size; | ||
if self.enqueued.len() >= 1024 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see this constant pulled in as a struct member, maybe not configurable? Or maybe we allow the max enqueued size to be set by the user, feed that to Vec::with_capacity
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it as a member and made it match the passed capacity. I'm now slightly meh about the name capacity
since it's also a soft limit, but will probably leave it unless someone has a better idea.
src/sources/logstash.rs
Outdated
} | ||
} | ||
|
||
struct LogstashAcker { | ||
protocol: LogstashProtocolVersion, | ||
sequence_number: u32, | ||
// TODO: this is very likely overkill, since there are only two protocol versions and it seems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something you intend to resolve in this PR? If not, I'd be satisfied to see an issue created, referenced in this comment block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I don't think it's necessary to address, but I was hoping that someone with more knowledge of the logstash protocol would be able to offer an opinion (/cc @jszwedko, since you're on much of the blame). My goal with this version was to be as defensive as possible, but that doesn't lead to the most straightforward implementation and I'd prefer not to leave something that's misleading (i.e. implies that we'll ever actually have mixed protocol versions, if we won't).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it'd be unlikely, but is possible. I'd be OK disallowing mixed protocols on a single connection if it would simplify things here, but we should have an explicit assertion that it never changes.
I think you could probably simplify this to just be a tuple of version number and sequence number rather than storing the sequence numbers per protocol version. I think we can assume, even if multiple protocol versions are being used, that a single TCP stream represents one stream of events (rather than a stream per protocol version).
src/sources/logstash.rs
Outdated
bytes.push(self.protocol.into()); | ||
bytes.push(LogstashFrameType::Ack.into()); | ||
bytes.extend(self.sequence_number.to_be_bytes().iter()); | ||
let mut bytes: Vec<u8> = Vec::with_capacity(6 * self.sequence_numbers.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh hmm, why 6 in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe 6 bytes is for protocol (u8
) + frame type (u8
) + sequence number (u32
), so a known fixed size of an ack frame in this protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, gotcha. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A future reader would probably appreciate having a constant along the lines of
const ACK_FRAME_SIZE: usize = 6; // protocol (u8) + frame type (u8) + sequence number (u32)
or using std::mem::sizeof::<Protocol>() + std::mem::sizeof::<FrameType>() + std::mem::sizeof::<SequenceNumber>()
for that matter 😄
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
Soak Test ResultsBaseline: 21e2525 ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Test units below are bytes/second/CPU, except for "skewness". The further "skewness" is from 0.0 the more indication that vector lacks consistency in behavior, making predictions of fitness in the field challenging. The abbreviated table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. The abbreviated table will be omitted if no statistically interesting changes are observed.
Fine details of change detection per experiment.
Fine details of each soak run.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I left a couple of comments. Nice bump to performance for these TCP sources.
src/sources/logstash.rs
Outdated
} | ||
} | ||
|
||
struct LogstashAcker { | ||
protocol: LogstashProtocolVersion, | ||
sequence_number: u32, | ||
// TODO: this is very likely overkill, since there are only two protocol versions and it seems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it'd be unlikely, but is possible. I'd be OK disallowing mixed protocols on a single connection if it would simplify things here, but we should have an explicit assertion that it never changes.
I think you could probably simplify this to just be a tuple of version number and sequence number rather than storing the sequence numbers per protocol version. I think we can assume, even if multiple protocol versions are being used, that a single TCP stream represents one stream of events (rather than a stream per protocol version).
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome catch and deep dive, @lukesteensen!
This wouldn't have been a terribly surprising finding if we'd looked at the code and seen that we were obviously sending single events at a time, but at a glance the code here did look like it was doing the right thing with respect to batching.
Fully agree, this has been quite a pitfall. I'm assuming it'll be worth it to survey if other sources share the same flaw?
src/sources/logstash.rs
Outdated
bytes.push(self.protocol.into()); | ||
bytes.push(LogstashFrameType::Ack.into()); | ||
bytes.extend(self.sequence_number.to_be_bytes().iter()); | ||
let mut bytes: Vec<u8> = Vec::with_capacity(6 * self.sequence_numbers.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A future reader would probably appreciate having a constant along the lines of
const ACK_FRAME_SIZE: usize = 6; // protocol (u8) + frame type (u8) + sequence number (u32)
or using std::mem::sizeof::<Protocol>() + std::mem::sizeof::<FrameType>() + std::mem::sizeof::<SequenceNumber>()
for that matter 😄
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
Yes, excellent find. Impressive performance improvements. |
Soak Test ResultsBaseline: 03416da ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Test units below are bytes/second/CPU, except for "skewness". The further "skewness" is from 0.0 the more indication that vector lacks consistency in behavior, making predictions of fitness in the field challenging. The abbreviated table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 90.0% confidence. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. The abbreviated table will be omitted if no statistically interesting changes are observed.
Fine details of change detection per experiment.
Fine details of each soak run.
|
I'm hoping that last set of numbers post-auto-merge are just a weird artifact of something else landing on master, but I'm rerunning soaks locally between the merge commit and its parent to be sure. |
Here are those local results, for the record: Soak Test ResultsBaseline: 03416da ExplanationA soak test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Test units below are bytes/second/CPU, except for "skewness". The further "skewness" is from 0.0 the more indication that vector lacks consistency in behavior, making predictions of fitness in the field challenging. The abbreviated table below, if present, lists those experiments that have experienced a statistically significant change in their throughput performance between baseline and comparision SHAs, with 95.0% confidence. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±8.87% change in mean throughput are discarded. The abbreviated table will be omitted if no statistically interesting changes are observed.
Fine details of change detection per experiment.
Fine details of each soak run.
|
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
This came out of some strange soak test behavior in #10432. Essentially, we had reduced the change down to what should have been insignificant performance-wise (roughly, one additional allocation per batch via
ready_chunks
vsenqueued
), but were still seeing a ~10% penalty on certain soaks.The theory was that for those tests to be so hyper-sensitive to the performance of
send_all
, they must be heavily bottlenecked by it. Given that it's not supposed to be a method that things should bottleneck on, I looked into why that might be. They all were based on theTcpSource
trait and utilized our new codec work to implement their respective protocols.The interesting bit ended up being roughly here where we treat the output of the decoder stream as a "batch", setting up acks, annotating events, emitting metrics, and sending upstream for each "batch". The problem is that these are often (always in the case of syslog) not actually batches at all but single events wrapped in a
SmallVec
, which means we're doing all of the work mentioned previously for every single event as it comes through. This is significantly less efficient than amortizing those costs over an actual good-sized batch of events.The solution ended up being quite a bit more complicated than I'd have liked, primarily due to the way that
TcpSource
has grown into a bit of a nightmare-ish piece of complexity with all of our small additions over the years. The biggest hurdle way the way that it ties acknowledgements to the generic frame type, such that we can't accumulate events across frames without keeping those frames accessible to later build acks. This was addressed by tweaking the acking trait to to build acks from groups of frames instead of individual frames, allowing us to accumulate frames themselves in a new stream combinator (ReadyFrames
) before passing them into the existing logic. This leaves the basic structure intact, but ensures that we're actually trying to group up a significant number of events per batch when we're under load.I do want to emphasize that this is not a design flaw with our codec system, and the
SmallVec
pattern still seems to me like a good one. It's also very likely that we had essentially equivalent behavior prior to the introduction of codecs, so I don't believe they introduced a performance regression. The problem is really that the way we integrated codecs into sources like these gives the impression that batching is happening when it's not. This wouldn't have been a terribly surprising finding if we'd looked at the code and seen that we were obviously sending single events at a time, but at a glance the code here did look like it was doing the right thing with respect to batching.It's likely that there's more relatively low-hanging performance fruit in some of the sources that have been around longest, and we'll just need to keep this pattern in mind as we look for it, since it's not as obvious as it would be otherwise. I'm thinking about patterns and tools for detecting situations like this automatically, but a lot of it would be significantly improved by simply revisiting some of these sources and straightening out some of the old-style code that's been added to incrementally over quite a long period.