-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Worker] Check if worker allocator is terminated in static allocation mode #3105
[Worker] Check if worker allocator is terminated in static allocation mode #3105
Conversation
@@ -360,6 +359,7 @@ func (k *kafka) drainOnRebalance(session sarama.ConsumerGroupSession, | |||
|
|||
wg.Wait() | |||
readyForRebalanceChan <- true | |||
close(readyForRebalanceChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case if we are out of waiting time (in select operator), we will never close the chan because writing to the chan is blocking operation. So, I would leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goroutine will still run even when the timeout has passed and the select doesn't wait for this channel.
The issue is that if timeout has passed, the function exists and closes the readyForRebalanceChan
. The goroutine continues to run, and tries to write to the closed channel.
My thought was to not close the channel before the goroutine is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor Yes, but if the goroutine is still running when the timeout has passed, this change will result in the channel never being closed. We won't read from readyForRebalanceChan, leading to a zombie goroutine persisting indefinitely.
Currently, it's possible to attempt writing to a closed channel, causing a panic and the goroutine to exit. To address this issue properly and prevent the panic, we should notify the goroutine from the main function body that the timeout has passed, and there's no need to write anything to the channel. But as for me, panicing in this goroutine is not a big issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rokatyy We already have a recover for that case in this goroutine 🤦
I will revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, nice catch with SignalTermination
!
One question about closing chan and we are done.
@@ -116,13 +119,18 @@ func (s *singleton) SignalDraining() error { | |||
} | |||
|
|||
func (s *singleton) SignalTermination() error { | |||
return s.worker.Drain() | |||
s.isTerminated = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Followup to #3092 , where we are blocking worker allocation if a the workers are terminated:
drainOnRebalance
- close thereadyForRebalanceChan
after sending a value on it. This will help cases where themaxWaitHandlerDuringRebalance
times out before the drain handler finishes, and then the channel is closed before the drain invoking goroutine passes a value on it.