Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More workers then partitions cause re-balance to hang #134

Closed
amitripshtos opened this issue Aug 8, 2018 · 9 comments
Closed

More workers then partitions cause re-balance to hang #134

amitripshtos opened this issue Aug 8, 2018 · 9 comments

Comments

@amitripshtos
Copy link

Hey guys, first of all - this is an awesome library ! It's gives a solution to a problem a lot of developers has in data-heavy applications. :)

Steps to reproduce

Create a kafka topic with 1 partition, and start 3 agents for that topic.
You will see that the agents will hang when they will try to re-balance.

Expected behavior

They should re-balance, and only one should consume messages since 3 workers > 1 partition

Actual behavior

All agents freeze upon re-balance.

Versions

Faust version 1.0.27
Python 3.7.0 without any extension

  • Python version 3.7.0
  • Faust version 1.0.27
  • Operating system alpine3.8
  • Kafka version 1.1.1
@vineetgoel
Copy link
Contributor

This is weird. Are you seeing any errors or are the agents just hanging? We typically run with python 3.6, could you also try with that?

@ask
Copy link
Contributor

ask commented Aug 9, 2018

I'm testing with Python 3.7 and should work fine.

Do you have an example app that can reproduce the issue?

@vineet, notice he says he has three workers attempting to consume from a topic with only one partition.
I'm not sure how we handle that, and I doubt we have a test for it.

@ask
Copy link
Contributor

ask commented Aug 9, 2018

Please also paste logs if you have (but make sure they don't contain sensitive data)

@ask
Copy link
Contributor

ask commented Aug 9, 2018

Some notes after testing with 3 workers on a single-partition topic:

  • Topic is autocreated by Kafka with 1 partition

  • Internal topics (table changelogs, repartitioned topics) are created with 4 partitions
    (app argument topic_partitions=4)

What works:

  • Having a simple agent that does nothing but print when it receives works.

    • One worker is assigned the single partition
    • The remaining two workers assigned no partitions and is idle

What does not work:

  1. Having an agent that repartitions the stream with groupby
  • The repartitioned stream in my example is auto created with 4 partitions

  • This means the source topic is 1 partition, but the repartition is 4 partition

  • I think Kafka Streams raises an error in this case, but we don't

  1. Having the agent also use a table

    • The changelog topic is created with 4 partitions, this could be related to the error

    • The logs hang on Waiting for restore to finish...:

[2018-08-08 18:29:44,847: INFO]: [^---ChangelogReader: country_to_total]: Highwater for changelog partitions:
┌Highwater──────────────────────────────────────┬───────────┬───────────┐
│ topic                                         │ partition │ highwater │
├───────────────────────────────────────────────┼───────────┼───────────┤
│ faust-withdrawals4-country_to_total-changelog │ 3         │ -1        │
│ faust-withdrawals4-country_to_total-changelog │ 2         │ -1        │
└───────────────────────────────────────────────┴───────────┴───────────┘
[2018-08-08 18:29:46,411: INFO]: [^---ChangelogReader: country_to_total]: Updated offsets at start of reading:
┌Reading Starts At──────────────────────────────┬───────────┬────────┐
│ topic                                         │ partition │ offset │
├───────────────────────────────────────────────┼───────────┼────────┤
│ faust-withdrawals4-country_to_total-changelog │ 3         │ -1     │
│ faust-withdrawals4-country_to_total-changelog │ 2         │ -1     │
└───────────────────────────────────────────────┴───────────┴────────┘
[2018-08-08 18:29:46,412: INFO]: [^---ChangelogReader: country_to_total]: No updates needed
[2018-08-08 18:29:46,412: INFO]: [^---ChangelogReader: country_to_total]: Setting stop event
[2018-08-08 18:29:46,412: INFO]: [^--TableManager]: Started restoring: ChangelogReader: country_to_total
[2018-08-08 18:29:46,412: INFO]: [^--Fetcher]: Starting...
[2018-08-08 18:29:46,412: INFO]: [^--TableManager]: Waiting for restore to finish...

As you can see the high-water is -1, so the recovery shouldn't have anything to recover,
but we are still hanging on it.

@amitripshtos
Copy link
Author

Thanks for the quick response!
I'm not near laptop so I can't send logs, but I don't getting any logs , just the app is getting stuck and I can not Ctrl+c

I saw the issue without using group by and table, just a regular stream that prints something.

I started a worker, it worked fine. Started 2 more, they worked fine. Stopped one of them and all of them got stuck.

I will try to do it today again

@ask
Copy link
Contributor

ask commented Aug 9, 2018

Ah, it only logs warnings and errors by default.

To get additional logging start the worker with -l info

ask added a commit that referenced this issue Aug 9, 2018
@ask
Copy link
Contributor

ask commented Aug 15, 2018

This should be fixed in latest faust.

@ask ask closed this as completed Aug 15, 2018
@amitripshtos
Copy link
Author

Awesome, thanks!

@mainTAP
Copy link

mainTAP commented Jan 28, 2020

It seems I'm experiencing the same/very similar issue.
The workers don't always hang when they try to take over but it happens very often.
I have 2 workers reading from topic with 1 partition and when 1 worker is killed, the other hangs when trying to re-balance, looking at the logs:

[2020-01-28 14:23:05,691] [14748] [WARNING]: Heartbeat failed for group test_kafka because it is rebalancing
[2020-01-28 14:23:05,691] [14748] [INFO]: Revoking previously assigned partitions frozenset() for group test_kafka
[2020-01-28 14:23:05,693] [14748] [DEBUG]: Stopping heartbeat task

  • follows with just the Timer messages

Where as if the worker does take over it actually re-joins the group:

[2020-01-28 15:22:30,884] [19864] [WARNING]: Heartbeat failed for group test_kafka because it is rebalancing
[2020-01-28 15:22:30,884] [19864] [INFO]: Revoking previously assigned partitions frozenset() for group test_kafka
[2020-01-28 15:22:30,886] [19864] [DEBUG]: Stopping heartbeat task
[2020-01-28 15:22:30,886] [19864] [INFO]: (Re-)joining group test_kafka
[2020-01-28 15:22:30,886] [19864] [DEBUG]: Sending JoinGroup ..

I'm using Python 3.6.8 and Faust 1.10.1

Adir-Shemesh pushed a commit to Adir-Shemesh/faust that referenced this issue Jan 10, 2022
* fix tests directory name in Makefile

* fix black warning

Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants