Skip to content

With 100 indexers, Kafka keeps rebalancing partitions and prevent Quickwit to reach a stable indexing throughput #2993

@fmassot

Description

@fmassot

With 100 indexers and Kafka 200 partitions, I observed that Kafka keeps rebalancing partitions every 1-3 minutes. The indexing throughput is thus not stable.

I did not observe this pattern with 50-80 nodes.

quickwit-indexing-throughput-100-nodes

Logs from one indexer:

2023-03-13T17:18:57.473Z  INFO quickwit_indexing::source::kafka_source: New partition assignment after rebalance. index_id=hdfs-logs-partitioned source_id=kafka-source topic=hdfs-logs-partitioned partitions=[8, 9]
2023-03-13T17:18:57.653Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZPZJXBNAHKXN9MQ62C015" partition_id=0
2023-03-13T17:19:42.502Z  INFO quickwit_indexing::source::kafka_source: New partition assignment after rebalance. index_id=hdfs-logs-partitioned source_id=kafka-source topic=hdfs-logs-partitioned partitions=[8, 9]
2023-03-13T17:19:42.678Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZRBJ2J7P1G3ZDHJBFSJRS" partition_id=0
2023-03-13T17:20:27.677Z  INFO quickwit_indexing::source::kafka_source: New partition assignment after rebalance. index_id=hdfs-logs-partitioned source_id=kafka-source topic=hdfs-logs-partitioned partitions=[10, 11]
2023-03-13T17:20:28.043Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZSQVK341SV1NN3AD3B5YK" partition_id=0
2023-03-13T17:20:58.072Z  INFO quickwit_indexing::actors::indexer: send-to-index-serializer commit_trigger=Timeout split_ids=01GVDZSQVK341SV1NN3AD3B5YK num_docs=2536849
2023-03-13T17:20:58.568Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZTNN34V8HCX29RM296B7P" partition_id=0
2023-03-13T17:20:59.478Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:packager: quickwit_indexing::actors::packager: start-packaging-splits split_ids=["01GVDZSQVK341SV1NN3AD3B5YK"]
2023-03-13T17:20:59.478Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:packager: quickwit_indexing::actors::packager: create-packaged-split split_id="01GVDZSQVK341SV1NN3AD3B5YK"
2023-03-13T17:20:59.479Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader: quickwit_indexing::actors::uploader: start-stage-and-store-splits split_ids=["01GVDZSQVK341SV1NN3AD3B5YK"]
2023-03-13T17:21:00.405Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader:upload{split=01GVDZSQVK341SV1NN3AD3B5YK}:store_split: quickwit_indexing::split_store::indexing_split_store: store-split-remote-success split_size_in_megabytes=80.21098 num_docs=1033947 elapsed_secs=0.9190389 throughput_mb_s=87.27703 is_mature=false
2023-03-13T17:21:00.405Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader:upload{split=01GVDZSQVK341SV1NN3AD3B5YK}:store_split: quickwit_indexing::split_store::indexing_split_store: store-in-cache
2023-03-13T17:21:00.416Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:publisher{split_update=SplitsUpdate { index_id: "hdfs-logs-partitioned", new_splits: "01GVDZSQVK341SV1NN3AD3B5YK", checkpoint_delta: Some(kafka-source:∆(00000000000000000010:(00000000000001121963..00000000000001500555] 00000000000000000011:(00000000000001347263..00000000000002002618])) }}: quickwit_indexing::actors::publisher: publish-new-splits new_splits=["01GVDZSQVK341SV1NN3AD3B5YK"] checkpoint_delta=Some(kafka-source:∆(00000000000000000010:(00000000000001121963..00000000000001500555] 00000000000000000011:(00000000000001347263..00000000000002002618]))
2023-03-13T17:21:00.416Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:publisher{split_update=SplitsUpdate { index_id: "hdfs-logs-partitioned", new_splits: "01GVDZSQVK341SV1NN3AD3B5YK", checkpoint_delta: Some(kafka-source:∆(00000000000000000010:(00000000000001121963..00000000000001500555] 00000000000000000011:(00000000000001347263..00000000000002002618])) }}: quickwit_metastore::checkpoint: delta=∆(00000000000000000010:(00000000000001121963..00000000000001500555] 00000000000000000011:(00000000000001347263..00000000000002002618]) checkpoint=Ckpt()
2023-03-13T17:21:28.532Z  INFO quickwit_indexing::actors::indexer: send-to-index-serializer commit_trigger=Timeout split_ids=01GVDZTNN34V8HCX29RM296B7P num_docs=862976
2023-03-13T17:21:28.821Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZVK68EQ2AA1NSSSQBAJAF" partition_id=0
2023-03-13T17:21:29.744Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:packager: quickwit_indexing::actors::packager: start-packaging-splits split_ids=["01GVDZTNN34V8HCX29RM296B7P"]
2023-03-13T17:21:29.745Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:packager: quickwit_indexing::actors::packager: create-packaged-split split_id="01GVDZTNN34V8HCX29RM296B7P"
2023-03-13T17:21:29.745Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader: quickwit_indexing::actors::uploader: start-stage-and-store-splits split_ids=["01GVDZTNN34V8HCX29RM296B7P"]
2023-03-13T17:21:30.553Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader:upload{split=01GVDZTNN34V8HCX29RM296B7P}:store_split: quickwit_indexing::split_store::indexing_split_store: store-split-remote-success split_size_in_megabytes=67.45608 num_docs=862976 elapsed_secs=0.8005289 throughput_mb_s=84.26439 is_mature=false
2023-03-13T17:21:30.553Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader:upload{split=01GVDZTNN34V8HCX29RM296B7P}:store_split: quickwit_indexing::split_store::indexing_split_store: store-in-cache
2023-03-13T17:21:30.566Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:publisher{split_update=SplitsUpdate { index_id: "hdfs-logs-partitioned", new_splits: "01GVDZTNN34V8HCX29RM296B7P", checkpoint_delta: Some(kafka-source:∆(00000000000000000010:(00000000000001500555..00000000000001827063] 00000000000000000011:(00000000000002002618..00000000000002539086])) }}: quickwit_indexing::actors::publisher: publish-new-splits new_splits=["01GVDZTNN34V8HCX29RM296B7P"] checkpoint_delta=Some(kafka-source:∆(00000000000000000010:(00000000000001500555..00000000000001827063] 00000000000000000011:(00000000000002002618..00000000000002539086]))
2023-03-13T17:21:30.566Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:publisher{split_update=SplitsUpdate { index_id: "hdfs-logs-partitioned", new_splits: "01GVDZTNN34V8HCX29RM296B7P", checkpoint_delta: Some(kafka-source:∆(00000000000000000010:(00000000000001500555..00000000000001827063] 00000000000000000011:(00000000000002002618..00000000000002539086])) }}: quickwit_metastore::checkpoint: delta=∆(00000000000000000010:(00000000000001500555..00000000000001827063] 00000000000000000011:(00000000000002002618..00000000000002539086]) checkpoint=Ckpt()
2023-03-13T17:21:43.730Z  INFO quickwit_indexing::source::kafka_source: New partition assignment after rebalance. index_id=hdfs-logs-partitioned source_id=kafka-source topic=hdfs-logs-partitioned partitions=[10, 11]
2023-03-13T17:21:43.894Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZW1Y2WX9RZ1ECFXKJP8F7" partition_id=0
2023-03-13T17:22:13.875Z  INFO quickwit_indexing::actors::indexer: send-to-index-serializer commit_trigger=Timeout split_ids=01GVDZW1Y2WX9RZ1ECFXKJP8F7 num_docs=2176709
2023-03-13T17:22:14.055Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZWZCK40X5RPWRBY8WSCBP" partition_id=0
2023-03-13T17:22:16.392Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:packager: quickwit_indexing::actors::packager: start-packaging-splits split_ids=["01GVDZW1Y2WX9RZ1ECFXKJP8F7"]
2023-03-13T17:22:16.392Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:packager: quickwit_indexing::actors::packager: create-packaged-split split_id="01GVDZW1Y2WX9RZ1ECFXKJP8F7"
2023-03-13T17:22:16.394Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader: quickwit_indexing::actors::uploader: start-stage-and-store-splits split_ids=["01GVDZW1Y2WX9RZ1ECFXKJP8F7"]
2023-03-13T17:22:17.856Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader:upload{split=01GVDZW1Y2WX9RZ1ECFXKJP8F7}:store_split: quickwit_indexing::split_store::indexing_split_store: store-split-remote-success split_size_in_megabytes=147.02968 num_docs=1939505 elapsed_secs=1.4513462 throughput_mb_s=101.305725 is_mature=false
2023-03-13T17:22:17.856Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:uploader:upload{split=01GVDZW1Y2WX9RZ1ECFXKJP8F7}:store_split: quickwit_indexing::split_store::indexing_split_store: store-in-cache
2023-03-13T17:22:17.867Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:publisher{split_update=SplitsUpdate { index_id: "hdfs-logs-partitioned", new_splits: "01GVDZW1Y2WX9RZ1ECFXKJP8F7", checkpoint_delta: Some(kafka-source:∆(00000000000000000010:(00000000000001827063..00000000000002603546] 00000000000000000011:(00000000000002539086..00000000000003702108])) }}: quickwit_indexing::actors::publisher: publish-new-splits new_splits=["01GVDZW1Y2WX9RZ1ECFXKJP8F7"] checkpoint_delta=Some(kafka-source:∆(00000000000000000010:(00000000000001827063..00000000000002603546] 00000000000000000011:(00000000000002539086..00000000000003702108]))
2023-03-13T17:22:17.867Z  INFO index_batch{index_id=hdfs-logs-partitioned source_id=kafka-source pipeline_ord=0}:publisher{split_update=SplitsUpdate { index_id: "hdfs-logs-partitioned", new_splits: "01GVDZW1Y2WX9RZ1ECFXKJP8F7", checkpoint_delta: Some(kafka-source:∆(00000000000000000010:(00000000000001827063..00000000000002603546] 00000000000000000011:(00000000000002539086..00000000000003702108])) }}: quickwit_metastore::checkpoint: delta=∆(00000000000000000010:(00000000000001827063..00000000000002603546] 00000000000000000011:(00000000000002539086..00000000000003702108]) checkpoint=Ckpt()
2023-03-13T17:22:35.012Z  INFO quickwit_indexing::source::kafka_source: New partition assignment after rebalance. index_id=hdfs-logs-partitioned source_id=kafka-source topic=hdfs-logs-partitioned partitions=[12, 13]
2023-03-13T17:22:35.799Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZXMKYRBMXX4MMEX72SNFJ" partition_id=0
2023-03-13T17:22:38.298Z  INFO quickwit_indexing::source::kafka_source: New partition assignment after rebalance. index_id=hdfs-logs-partitioned source_id=kafka-source topic=hdfs-logs-partitioned partitions=[10, 11]
2023-03-13T17:22:38.508Z  INFO quickwit_indexing::actors::indexer: new-split split_id="01GVDZXQ8MW3F70FXNVNGT0NP2" partition_id=0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions