Skip to content

Conversation

@earlbread
Copy link
Contributor

@earlbread earlbread commented Sep 22, 2025

Description

Currently, the Kinesis source would panic when encountering resharding scenarios (shard splits/merges) because it attempted to create duplicate consumers for the same shard.

This occurred when:

  1. The source initialized and created consumers for all shards (including closed parent shards)
  2. Parent shard consumers would send ChildShards messages when reaching the end
  3. The source would try to spawn consumers for child shards that were already created

This replaces the assertion with a check that safely skips duplicate consumer creation.

Fixes #5896

How was this PR tested?

  • Added the test_kinesis_source_handles_resharding_with_split test
cargo test -p quickwit-indexing --features kinesis,kinesis-localstack-tests test_kinesis_source_handles_resharding_with_split -- --ignored
  • Verified with an actual Kinesis stream that source creation and ingestion work successfully with the build from this branch.

@earlbread
Copy link
Contributor Author

cc @guilload

@earlbread
Copy link
Contributor Author

earlbread commented Sep 29, 2025

I fixed the lint failure.

Copy link
Member

@guilload guilload left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. The source could use a logging statement when we spawn a new shard consumer with stream name, shard_id, start position.

@earlbread
Copy link
Contributor Author

earlbread commented Sep 29, 2025

@guilload Thanks for your review. I've added the logging statement you mentioned.

2025-09-29T16:50:49.000Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Spawning new shard consumer stream_name=earlbread-test-stream-2 shard_id=shardId-000000000000 start_position=Position::Beginning
2025-09-29T16:50:49.000Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Spawning new shard consumer stream_name=earlbread-test-stream-2 shard_id=shardId-000000000001 start_position=Position::Beginning
2025-09-29T16:50:49.000Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Spawning new shard consumer stream_name=earlbread-test-stream-2 shard_id=shardId-000000000002 start_position=Position::Offset(49667547358012113504468154075506442543022530173843013666)
2025-09-29T16:50:49.000Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Starting Kinesis source. stream_name=earlbread-test-stream-2 assigned_shards=shardId-000000000000, shardId-000000000001, shardId-000000000002
2025-09-29T16:50:49.049Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Shard consumer already exists, skipping creation. stream_name=earlbread-test-stream-2 shard_id=shardId-000000000001
2025-09-29T16:50:49.049Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Shard consumer already exists, skipping creation. stream_name=earlbread-test-stream-2 shard_id=shardId-000000000002
2025-09-29T16:50:49.260Z  INFO quickwit_actors::spawn_builder: actor-exit actor_id="KinesisShardConsumer-dawn-mBjC" phase=handling(quickwit_indexing::source::kinesis::shard_consumer::Loop) exit_status=Success
2025-09-29T16:50:49.260Z  INFO quickwit_indexing::source::kinesis::kinesis_source: Shard is closed. stream_name=earlbread-test-stream-2 shard_id=shardId-000000000000 num_active_shards=3

@guilload
Copy link
Member

Awesome. I will merge. Thanks, @earlbread!

@earlbread
Copy link
Contributor Author

@guilload Thank you!

@guilload guilload merged commit 607ffbd into quickwit-oss:main Sep 30, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kinesis source fails with "channel closed" errors after shard resharding

2 participants