-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Continue calling S3SinkService::output even if records is empty to flush stale batches #3187
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
There is an unnecessary lock that we can remove.
Also, I think you will need to rebase this with #3186 which provides the buckets/keys to the buffer via bucket/key suppliers.
@@ -91,59 +90,48 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer | |||
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS); | |||
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED); | |||
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE); | |||
|
|||
reentrantLock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a lock here. The constructor is called once by one thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I wasn't sure. I'll remove this with the rebase
Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
7a472d5
to
2ea59c6
Compare
Description
Any data left in the S3 sink buffer after traffic stops won't be flushed to S3 sink the threshold check is never reached again. This PR changes the behavior to continue calling
S3SinkService::output
even if the current processing batch is empty to check if the previously buffered data should now be flushed.Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.