-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Github-issue#1048 : s3-sink with in_memory buffer implementation. #2623
Github-issue#1048 : s3-sink with in_memory buffer implementation. #2623
Conversation
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
numEvents = s3SinkConfig.getThresholdOptions().getEventCount(); | ||
byteCapacity = s3SinkConfig.getThresholdOptions().getMaximumSize(); | ||
duration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update these variable names for better readability, maybe maxEvents
, maxBytes
, maxCollectionDuration
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
if (isUploadedToS3) { | ||
LOG.info("Snapshot uploaded successfully"); | ||
} else { | ||
LOG.info("Snapshot upload failed"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we dropping the records which are failed to upload to S3?
I think metrics are more helpful here instead of logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have retry or send the data to DLQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes records will be dropped if upload fails even after reaching max retries.
As suggested added metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have retry or send the data to DLQ.
Added retry functionality.
private final StopWatch watch; | ||
|
||
InMemoryBuffer() { | ||
byteArrayOutputStream = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reset the stream instead of creating a new stream object every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the code to reset the stream instead of creating a new stream object every time.
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); | ||
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec); | ||
assertNotNull(s3SinkService); | ||
s3SinkService.output(generateRandomStringEventRecord()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to assert some metrics here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added metrics.
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec); | ||
assertNotNull(s3SinkService); | ||
s3SinkService.output(generateRandomStringEventRecord()); | ||
assertThat(s3SinkService, instanceOf(S3SinkService.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this one line above. No point to asserting this after doing output().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
reentrantLock.lock(); | ||
final String bucket = s3SinkConfig.getBucketOptions().getBucketName(); | ||
if (currentBuffer == null) { | ||
currentBuffer = bufferFactory.getBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why initialize the currentBuffer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approaches suggested by David.
" Event_count = {} Records & Event_collection_duration = {} Sec", | ||
maxBytes.getBytes(), currentBuffer.getEventCount(), currentBuffer.getDuration()); | ||
boolean isUploadedToS3 = currentBuffer.flushToS3(s3Client, bucket, generateKey()); | ||
if (isUploadedToS3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should retry few times before failing. And log the failure reason clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
final String encodedEvent; | ||
encodedEvent = codec.parse(event); | ||
final byte[] encodedBytes = encodedEvent.getBytes(); | ||
if (willExceedThreshold()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this condition never becomes true? It will never be uploaded to S3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
event_count & event_collect_timeout are mandatory attributes so the condition will toggle to false/true.
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
…data-prepper into s3-sink-in_memory
@dlvenable, Thanks for your review suggestions. |
Resolved. |
encodedEvent = codec.parse(event); | ||
final byte[] encodedBytes = encodedEvent.getBytes(); | ||
|
||
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the maxEvents is 10 and only one event is received, no other event is sent for one hour, I don't see how that one event is flushed to S3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is handled in event collection duration i.e. event_collect_timeout . When the duration is met it will be flushed to S3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the email, this is not handled today. We need a way to handle this in all sinks. We will tackle this in a separate PR (either sink specific or at data prepper core level)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this is good. But, the build is currently failing:
> Task :data-prepper-plugins:s3-sink:checkstyleTest
FAILURE: Build failed with an exception.
> Task :data-prepper-plugins:s3-sink:spotlessMarkdownCheck FAILED
* What went wrong:
Execution failed for task ':data-prepper-plugins:s3-sink:spotlessMarkdownCheck'.
> The following files had format violations:
README.md
@@ -63,4 +63,4 @@
This·plugin·is·compatible·with·Java·8.·See
-·[CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
--·[monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
+-·[monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
Run './gradlew :data-prepper-plugins:s3-sink:spotlessApply' to fix these violations.
You can run the checkstyle locally to verify:
./gradlew -p data-prepper-plugins/s3-sink check
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Addressed. |
@deepaksahu562 , I see the same error still: https://github.com/opensearch-project/data-prepper/actions/runs/4948572707/jobs/8849475477?pr=2623 |
I checked out your work, and the suggestion seemed to get me past the error.
However, I do see failing unit tests. |
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
159663c
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Codecov Report
@@ Coverage Diff @@
## main #2623 +/- ##
============================================
+ Coverage 93.52% 93.54% +0.01%
- Complexity 2238 2250 +12
============================================
Files 261 262 +1
Lines 6275 6291 +16
Branches 519 520 +1
============================================
+ Hits 5869 5885 +16
Misses 268 268
Partials 138 138
|
@dlvenable \ @ashoktelukuntla Thanks for your suggestions. Build issue resolved and all checks have passed. Could you please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
…ensearch-project#2623) * Github-issue#1048 : s3-sink with in-memory buffer implementation. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink with in-memory buffer implementation. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink with in-memory buffer implementation. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink - added JUnit test classes. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink - incorporated review comment. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink - incorporated review comment. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink - local-file buffer implementation. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : s3-sink - in-memory buffer implementation. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : resolved - checkstyle error. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : incorporated review comment. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * Github-issue#1048 : incorporated review comment. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * GitHub-issue#1048 : Incorporated review comments. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * GitHub-issue#1048 : Incorporated review comments. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * GitHub-issue#1048 : Incorporated review comments. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> * GitHub-issue#1048 : Resolved javadoc issues. Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> --------- Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com> Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…ion. (opensearch-project#2623)" This reverts commit e71f866.
…ion. (opensearch-project#2623)" This reverts commit e71f866.
Description
Issues Resolved
GitHub-issue #1048
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.