Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send acknowledgements to source when events are forwarded to remote peer #4305

Merged
merged 5 commits into from
Mar 21, 2024

Conversation

kkondaka
Copy link
Collaborator

Description

Send acknowledgements to source when events are forwarded to remote peer
Also, propagate the acknowledgements enabled flag in pipeline correctly when ByteBuffer is used.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -59,6 +59,7 @@ public class PipelineTransformer {
private final EventFactory eventFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private final SourceCoordinatorFactory sourceCoordinatorFactory;
private boolean acknowledgementsEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to have some basic unit test coverage

@@ -117,6 +119,8 @@ private void buildPipelineFromConfiguration(
LOG.info("Building buffer for the pipeline [{}]", pipelineName);
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder());

if (pipelineDefinedBuffer.isByteBuffer())
acknowledgementsEnabled = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be set to true for all sub-pipelines just because one sub-pipeline uses persistent buffer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have added comment and the test case also validates this.

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the need for unit tests.

@@ -50,7 +50,7 @@ public ProcessWorker(
this.pipeline = pipeline;
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName());
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES);
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled();
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled() || readBuffer.isByteBuffer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this logic is right. A byte buffer in itself does not require acknowledgements. Can we add a new default method on Buffer to express the actual need?

boolean isPersistentBuffer();

Or some other boolean?

Krishna Kondaka added 4 commits March 21, 2024 06:00
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -78,6 +79,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
this.acknowledgementsEnabled = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to @graytaylor0 's comment below. This should probably not be a field variable and should be set for each pipeline independently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It should be for all pipelines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkondaka , If I understand correctly, then a downstream pipeline needs to have acknowledgements enabled. This makes sense.

But, the current code ends up changing all pipelines. Some pipelines can run fully independently of each other without any direct relationship. Those would have acknowledgements enabled incorrectly.

@@ -78,6 +79,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
this.acknowledgementsEnabled = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkondaka , If I understand correctly, then a downstream pipeline needs to have acknowledgements enabled. This makes sense.

But, the current code ends up changing all pipelines. Some pipelines can run fully independently of each other without any direct relationship. Those would have acknowledgements enabled incorrectly.

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice clean solution for that last request. Thank you!

@dlvenable dlvenable added this to the v2.7 milestone Mar 21, 2024
@dlvenable dlvenable merged commit 2a02080 into opensearch-project:main Mar 21, 2024
43 of 50 checks passed
@kkondaka kkondaka deleted the ack-fixes branch May 13, 2024 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants