Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] DLQ Serialization Doesn't Support Create Action #3040

Closed
engechas opened this issue Jul 18, 2023 · 0 comments
Closed

[BUG] DLQ Serialization Doesn't Support Create Action #3040

engechas opened this issue Jul 18, 2023 · 0 comments
Labels
bug Something isn't working

Comments

@engechas
Copy link
Collaborator

Describe the bug
DataPrepper supports create and index actions in the OpenSearch sink. Depending on which action is specified, the BulkRequest populates the respective parameter. This is accounted for in some places of the code, such as the JavaClientUncompressedAccumulatingBulkRequest, but is not accounted for in the DLQ serialization logic:
https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java#L65

This causes an exception that shuts down DataPrepper:

ERROR org.opensearch.dataprepper.pipeline.common.FutureHelper - FutureTask failed due to: 
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Cannot get 'Index' variant: current variant is 'Create'.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at org.opensearch.dataprepper.pipeline.common.FutureHelper.awaitFuturesIndefinitely(FutureHelper.java:29) ~[data-prepper-core-2.3.1.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.postToSink(ProcessWorker.java:140) ~[data-prepper-core-2.3.1.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:121) ~[data-prepper-core-2.3.1.jar:?]
    at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:50) ~[data-prepper-core-2.3.1.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalStateException: Cannot get 'Index' variant: current variant is 'Create'.
    at org.opensearch.client.util.TaggedUnionUtils.get(TaggedUnionUtils.java:47) ~[opensearch-java-2.5.0.jar:?]
    at org.opensearch.client.opensearch.core.bulk.BulkOperation.index(BulkOperation.java:139) ~[opensearch-java-2.5.0.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter.convertDocumentToGenericMap(FailedBulkOperationConverter.java:65) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter.convertToDlqObject(FailedBulkOperationConverter.java:40) ~[opensearch-2.3.1.jar:?]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) ~[?:?]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.logFailureForBulkRequests(OpenSearchSink.java:328) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:334) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:252) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:245) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:269) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:191) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$6(OpenSearchSink.java:314) ~[opensearch-2.3.1.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:311) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:283) ~[opensearch-2.3.1.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:64) ~[data-prepper-api-2.3.1.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:64) ~[data-prepper-api-2.3.1.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:336) ~[data-prepper-core-2.3.1.jar:?]

There may be other areas where the assumption that action is index is made. The code should be audited to ensure all document accesses account for both supported actions.

Expected behavior
DLQ serialization works regardless of action

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Archived in project
Development

No branches or pull requests

2 participants