From 3690a7dab25ccfd63c679a9291c4fbd989a205a2 Mon Sep 17 00:00:00 2001 From: Seunghun Lee Date: Mon, 22 Sep 2025 02:40:52 +0900 Subject: [PATCH] Fix Kinesis source panic on resharding --- .../src/source/kinesis/kinesis_source.rs | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs index 0889e7785df..5d2e2ec00c9 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs @@ -137,7 +137,14 @@ impl KinesisSource { shard_id: ShardId, checkpoint: &SourceCheckpoint, ) { - assert!(!self.state.shard_consumers.contains_key(&shard_id)); + if self.state.shard_consumers.contains_key(&shard_id) { + info!( + stream_name = %self.stream_name, + shard_id = %shard_id, + "Shard consumer already exists, skipping creation." + ); + return; + } let partition_id = PartitionId::from(shard_id.as_str()); let from_position = checkpoint @@ -149,6 +156,12 @@ impl KinesisSource { Position::Offset(offset) => Some(offset.to_string()), Position::Eof(_) => panic!("position of a Kinesis shard should never be EOF"), }; + info!( + stream_name = %self.stream_name, + shard_id = %shard_id, + start_position = ?from_position, + "Spawning new shard consumer" + ); let shard_consumer = ShardConsumer::new( self.stream_name.clone(), shard_id.clone(), @@ -384,6 +397,61 @@ mod tests { Ok(merged_batch) } + #[ignore] + #[tokio::test] + async fn test_kinesis_source_handles_resharding_with_split() { + use crate::source::kinesis::api::tests::split_shard; + use crate::source::kinesis::helpers::tests::wait_for_active_stream; + + let universe = Universe::with_accelerated_time(); + let (doc_processor_mailbox, _doc_processor_inbox) = universe.create_test_mailbox(); + let (kinesis_client, stream_name) = setup("test-resharding-split", 1).await.unwrap(); + let index_id = "test-kinesis-resharding-index"; + let index_uid = IndexUid::new_with_random_ulid(index_id); + + // Split the shard (1 -> 2 shards) + let shard_id_0 = make_shard_id(0); + split_shard( + &kinesis_client, + &stream_name, + &shard_id_0, + "85070591730234615865843651857942052864", + ) + .await + .unwrap(); + + // Wait for stream to be active after split + let _ = wait_for_active_stream(&kinesis_client, &stream_name) + .await + .unwrap(); + + // Initialize source after split + let kinesis_params = KinesisSourceParams { + stream_name: stream_name.clone(), + region_or_endpoint: Some(RegionOrEndpoint::Endpoint( + "http://localhost:4566".to_string(), + )), + enable_backfill_mode: true, + }; + let source_params = SourceParams::Kinesis(kinesis_params.clone()); + let source_config = SourceConfig::for_test("test-kinesis-resharding", source_params); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + + let kinesis_source = KinesisSource::try_new(source_runtime, kinesis_params) + .await + .unwrap(); + + let actor = SourceActor { + source: Box::new(kinesis_source), + doc_processor_mailbox: doc_processor_mailbox.clone(), + }; + let (_mailbox, handle) = universe.spawn_builder().spawn(actor); + let (exit_status, _exit_state) = handle.join().await; + assert!(exit_status.is_success()); + + teardown(&kinesis_client, &stream_name).await; + } + #[ignore] #[tokio::test] async fn test_kinesis_source() {