-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sqs Source initial changes #2786
Sqs Source initial changes #2786
Conversation
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Codecov Report
@@ Coverage Diff @@
## main #2786 +/- ##
============================================
- Coverage 94.15% 94.14% -0.02%
+ Complexity 2442 2440 -2
============================================
Files 278 278
Lines 6759 6744 -15
Branches 551 551
============================================
- Hits 6364 6349 -15
Misses 254 254
Partials 141 141 |
.../sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSource.java
Outdated
Show resolved
Hide resolved
...s-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/SqsService.java
Outdated
Show resolved
Hide resolved
...s-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/SqsService.java
Outdated
Show resolved
Hide resolved
...ommon/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/BufferAccumulator.java
Outdated
Show resolved
Hide resolved
.../java/org/opensearch/dataprepper/plugins/aws/sqs/common/config/AwsAuthenticationOptions.java
Outdated
Show resolved
Hide resolved
.../sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSource.java
Show resolved
Hide resolved
...-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/codec/Codec.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/model/SqsOptions.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/model/SqsOptions.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This is looking a lot better.
...-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/ClientFactory.java
Outdated
Show resolved
Hide resolved
...s-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/SqsService.java
Show resolved
Hide resolved
...n/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Uday Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Chintala <udaych20@gmail.com>
...-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTask.java
Outdated
Show resolved
Hide resolved
One test is failing.
Please fix it. |
Signed-off-by: Uday Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
...s-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/SqsService.java
Show resolved
Hide resolved
...qs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/ClientFactory.java
Show resolved
Hide resolved
...mmon/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/metrics/SqsMetrics.java
Outdated
Show resolved
Hide resolved
...s-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/SqsService.java
Outdated
Show resolved
Hide resolved
.../sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSource.java
Outdated
Show resolved
Hide resolved
Done. |
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
...n/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java
Outdated
Show resolved
Hide resolved
...s-sqs-common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/SqsService.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/model/SqsOptions.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/config/QueuesOptions.java
Outdated
Show resolved
Hide resolved
...n/java/org/opensearch/dataprepper/plugins/source/sqssource/handler/RawSqsMessageHandler.java
Outdated
Show resolved
Hide resolved
...-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTask.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
}); | ||
try { | ||
if(!events.isEmpty()) | ||
buffer.writeAll(events, bufferTimeOut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graytaylor0 do you if this is the best approach here to write to buffer with a small set of events (max of 10)? Or we could use similar concept as S3 source where we accumulate records in BufferAccumulator
and flush configurable batches.
...rc/main/java/org/opensearch/dataprepper/plugins/source/sqssource/config/SqsSourceConfig.java
Show resolved
Hide resolved
...c/main/java/org/opensearch/dataprepper/plugins/aws/sqs/common/handler/SqsMessageHandler.java
Outdated
Show resolved
Hide resolved
...-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqssource/SqsSourceTask.java
Outdated
Show resolved
Hide resolved
...ava/org/opensearch/dataprepper/plugins/source/sqssource/config/AwsAuthenticationOptions.java
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com>
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(10); | ||
|
||
static final int NO_OF_RECORDS_TO_ACCUMULATE = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this to be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted, It will be available in the incremental PR.
All the feedback by David is addressed
* Sqs Source implementation Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com> --------- Signed-off-by: Uday Kumar Chintala <udaych20@gmail.com> Signed-off-by: Uday Chintala <udaych20@gmail.com> Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Description
Sqs Source plugin implementation including junit test cases for #2679
Issues Resolved
Resolves the #2679
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.