Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ConsumerRebalanceListener from ConsumerVerticleContext #3207

Conversation

debasishbsws
Copy link
Member

A clean conflict-free PR of #3195

Proposed Changes

  • Put the ConsumerRebalanceListener into ConsumerVerticle

@knative-prow
Copy link

knative-prow bot commented Jul 11, 2023

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@knative-prow knative-prow bot added do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. size/M Denotes a PR that changes 30-99 lines, ignoring generated files. area/data-plane labels Jul 11, 2023
@knative-prow knative-prow bot requested review from aliok and pierDipi July 11, 2023 17:32
@codecov
Copy link

codecov bot commented Jul 11, 2023

Codecov Report

Merging #3207 (7ace30c) into main (c713604) will decrease coverage by 0.05%.
The diff coverage is 100.00%.

@@             Coverage Diff              @@
##               main    #3207      +/-   ##
============================================
- Coverage     63.42%   63.38%   -0.05%     
+ Complexity      752      746       -6     
============================================
  Files           167      167              
  Lines         11815    11815              
  Branches        246      246              
============================================
- Hits           7494     7489       -5     
+ Misses         3753     3752       -1     
- Partials        568      574       +6     
Flag Coverage Δ
java-unittests 79.55% <100.00%> (-0.19%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...roker/dispatcher/main/ConsumerVerticleContext.java 93.10% <ø> (-0.23%) ⬇️
...ker/dispatcher/impl/consumer/ConsumerVerticle.java 87.50% <100.00%> (+1.29%) ⬆️
...patcher/impl/consumer/OrderedConsumerVerticle.java 66.35% <100.00%> (ø)
...tcher/impl/consumer/UnorderedConsumerVerticle.java 68.00% <100.00%> (ø)
...roker/dispatcher/main/ConsumerVerticleBuilder.java 62.18% <100.00%> (ø)

... and 2 files with indirect coverage changes

@debasishbsws
Copy link
Member Author

debasishbsws commented Jul 11, 2023

@pierDipi
"Did you consider passing to the consumer verticle the rebalance listener instead of passing all revoke handlers?

consumerVerticle.setRebalanceListener(createRebalanceListener(partitionRevokedHandlers));
That would decouple the consumer verticle from the rebalance logic, in my opinion, creating and coordinating the build of the rebalance listener is more a responsibility of the builder rather than a responsibility of the consumer verticle (consumer verticle is responsible for the lifecycle of the consumer)"(https://github.com/knative-sandbox/eventing-kafka-broker/pull/3195/files#r1259704553)

In this approach, some tests are failing as the consumerVerticleListener is not getting initialised.
I can put a check like

protected ConsumerRebalanceListener getConsumerRebalanceListener() {
        if(consumerRebalanceListener != null){
            return consumerRebalanceListener;
        }
        return new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
                getPartitionRevokedHandler().partitionRevoked(partitions).onComplete(
                        r -> logger.info("Partitions revoked {}", consumerVerticleContext.getLoggingKeyValue()));
            }

            @Override
            public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
                logger.info("Partitions assigned {}", consumerVerticleContext.getLoggingKeyValue());
            }
        };
    }

But I think it is like same as Building the ConsumerRebalanceListener inside it.
OR
I can create a new ConsumerRebalanceListenerImpl and move the logic there to decouple the consumer verticle from the rebalance logic as I Previously did in Old PR old commit aa9b139

@pierDipi
Copy link
Member

In this approach, some tests are failing as the consumerVerticleListener is not getting initialised. I can put a check like

protected ConsumerRebalanceListener getConsumerRebalanceListener() {
        if(consumerRebalanceListener != null){
            return consumerRebalanceListener;
        }
        return new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
                getPartitionRevokedHandler().partitionRevoked(partitions).onComplete(
                        r -> logger.info("Partitions revoked {}", consumerVerticleContext.getLoggingKeyValue()));
            }

            @Override
            public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
                logger.info("Partitions assigned {}", consumerVerticleContext.getLoggingKeyValue());
            }
        };
    }

But I think it is like same as Building the ConsumerRebalanceListener inside it. OR I can create a new ConsumerRebalanceListenerImpl and move the logic there to decouple the consumer verticle from the rebalance logic as I Previously did in Old PR old commit aa9b139

I didn't get what's the problem, can you expand a bit more?

@debasishbsws
Copy link
Member Author

I have moved the ConsumerRebalanceLitener creation Logic out from the ConsumerVerticle and use consumerVerticle.setRebalanceListener(... ); instead. But now ConsumerRebalanceLitener is not getting initialised while running some tests like OrderedConsumerVeticleTest, UnordereConsumerVerticleTest.
So I was asking that to resolve that I can put a condition like if the ConsumerRebalanceLitener is null then create a new one and return it to make sure we never get a null.

@debasishbsws
Copy link
Member Author

debasishbsws commented Jul 12, 2023

Oh wait I can put

consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {....})

into the tests. and leave the code logic the same as now.

@debasishbsws debasishbsws marked this pull request as ready for review July 12, 2023 09:36
@knative-prow knative-prow bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jul 12, 2023
@debasishbsws
Copy link
Member Author

/retest

@debasishbsws
Copy link
Member Author

@pierDipi would you take a look now

Copy link
Member

@pierDipi pierDipi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

/lgtm
/approve

@knative-prow knative-prow bot added the lgtm Indicates that a PR is ready to be merged. label Jul 13, 2023
@knative-prow
Copy link

knative-prow bot commented Jul 13, 2023

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: debasishbsws, pierDipi

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jul 13, 2023
@knative-prow knative-prow bot merged commit c649ec4 into knative-extensions:main Jul 13, 2023
29 of 32 checks passed
@debasishbsws debasishbsws deleted the remove-ConsumerRebalanceListener-from-context branch July 13, 2023 15:28
Rahul-Kumar-prog pushed a commit to Rahul-Kumar-prog/eventing-kafka-broker that referenced this pull request Jul 31, 2023
…ative-extensions#3207)

* Implemented

* Test fixed acording

* Change the Tests to set RebalanceListener
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. area/data-plane lgtm Indicates that a PR is ready to be merged. size/M Denotes a PR that changes 30-99 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants