Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: initialize inflightAcks channel to not nil channel #1548

Merged
merged 2 commits into from
Mar 8, 2024

Conversation

afugazzotto
Copy link
Contributor

@afugazzotto afugazzotto commented Mar 7, 2024

Closes #1529

Initialize inflightAcks channel to not nil channel to avoid hanging on cleanup function when there are no messages to acknowledge.

How to Test/Reproduce:

  1. setup a Kafka client and a Kafka topic with 10 partitions
  2. in a bash script, set Kafka client to send 10 messages per second
  3. setup a Numaflow pipeline with source vertex to consume from the Kafka client (disable autoscaling)
  4. manually scale source vertex to 12 or more pods (kubectl scale vtx <vtx_name> --replicas=12)
  5. manually delete pod-0
  6. wait for a new pod-0 to restart
  7. verify it has no topics assigned to it in the metrics: kafka_source_read_total
  8. scale down to 1 pod
  9. at this point, before the fix, the messages flow to the log sink would stop. After the fix, pod-0 is not stuck in the cleanup state and can resume consuming messages (kafka_source_read_total should show for all 10 partitions)

…to avoid hanging on cleanup function when there are no messages to acknowledge.

Signed-off-by: Antonino Fugazzotto <antonino_fugazzotto@intuit.com>
@afugazzotto afugazzotto requested review from jy4096, yhl25 and a team March 7, 2024 19:11
@afugazzotto afugazzotto marked this pull request as ready for review March 7, 2024 19:24
Copy link
Member

@KeranYang KeranYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need inflightAcks at first place? My understanding is Ack is a blocking call hence inflightAcks should be either nil or closed. Am I missing anything? cc: @vigith

@afugazzotto
Copy link
Contributor Author

afugazzotto commented Mar 7, 2024

Why do we need inflightAcks at first place? My understanding is Ack is a blocking call hence inflightAcks should be either nil or closed. Am I missing anything? cc: @vigith

Based on this question, would it make more sense (or be preferred) to remove the inflight ack "check" in the Cleanup function (https://github.com/numaproj/numaflow/blob/main/pkg/sources/kafka/handler.go#L59)?

@KeranYang
Copy link
Member

would it make more sense (or be preferred) to remove the inflight ack "check" in the Cleanup function

or completely remove the var inflightAcks?

@vigith
Copy link
Contributor

vigith commented Mar 7, 2024

Why do we need inflightAcks at first place? My understanding is Ack is a blocking call hence inflightAcks should be either nil or closed. Am I missing anything? cc: @vigith

Cleanup function is not called "synchronously" with the Ack function, so there will be a state where Ack is running and Cleanup has been called.

Signed-off-by: Vigith Maurice <vigith@gmail.com>
@vigith vigith changed the title Fixes #1541: initialize inflightAcks channel to not nil channel fix: initialize inflightAcks channel to not nil channel Mar 7, 2024
@afugazzotto afugazzotto merged commit 1844575 into main Mar 8, 2024
20 checks passed
@afugazzotto afugazzotto deleted the fixIssue1541 branch March 8, 2024 00:52
whynowy pushed a commit that referenced this pull request Mar 15, 2024
Signed-off-by: Antonino Fugazzotto <antonino_fugazzotto@intuit.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Autoscaling for Kafka not able to stabilize causing too many server-side rebalancing
3 participants