New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(aws_s3 source): batch SQS deletes #7992
Conversation
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
c2b7e1a
to
cbb1975
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! 🎉
src/internal_events/aws_s3.rs
Outdated
|
||
#[derive(Debug)] | ||
pub(crate) struct SqsMessageDeleteFailed { | ||
state: MessageDeleteFailureState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code paths for Complete
and Partial
below seem to be completely disjoint. What is the benefit of making this an enum instead of two separate event structs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuity in downstream systems that currently look at these events was the thought... although maybe that doesn't really matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how. There are already two separate code paths that emit the same warning and metric (albeit in the same function). I don't think separating them into separate methods/structs will make that big a deal (assuming I understand your concern correctly).
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
You're totally right. I swear I ran |
src/internal_events/aws_s3.rs
Outdated
|
||
#[derive(Debug)] | ||
pub(crate) struct SqsMessageDeleteFailed { | ||
state: MessageDeleteFailureState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how. There are already two separate code paths that emit the same warning and metric (albeit in the same function). I don't think separating them into separate methods/structs will make that big a deal (assuming I understand your concern correctly).
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
let cloned_entries = delete_entries.clone(); | ||
match self.delete_messages(delete_entries).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too bad you can't pass this by reference. This clone is otherwise pretty pointless (since the internal events can be written to use a reference and not need a clone).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's annoying. :(
This PR introduces batching to the delete phase of the core S3 source loop. After reading a batch of messages from SQS, we would process them one-by-one, and then, if enabled, delete them from SQS to mark them as completed. Instead, we now batch up the message IDs to delete as we retrieve/process the files from S3, and only once a batch of SQS messages has been processed in that way do we delete them from SQS.
In a synthetic benchmark of many small files, such that the time spent reading the files from S3/sending them to the pipeline is small, we can observe a 40-50% increase in throughput to the overall source.
Signed-off-by: Toby Lawrence toby@nuclearfurnace.com