From eec5bbc2aef94ac25dada56ebdf60de95790ede4 Mon Sep 17 00:00:00 2001 From: Shrijeet Paliwal Date: Tue, 23 Sep 2025 13:55:37 -0700 Subject: [PATCH] Fix azure multipart upload data corruption Blocks were added to BlockList in completion order rather than sequential order, causing data corruption. Zero pad block IDs and sort before committing to ensure correct blob reconstruction. Also improve the existing test that was only checking file, the test would fail on main if integrity check was turned on. This PR passes the unit test, also has been runnin in for last two days in production without data corruption, prior to this PR - split was getting corrupted every 45 minutes. --- quickwit/quickwit-storage/src/lib.rs | 19 +++++++++++- .../src/object_storage/azure_blob_storage.rs | 31 ++++++++++++------- .../quickwit-storage/tests/azure_storage.rs | 2 +- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index a02d7b56524..d2c2c5444d0 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -383,9 +383,26 @@ pub(crate) mod test_suite { #[cfg(feature = "integration-testsuite")] pub async fn storage_test_multi_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> { let test_path = Path::new("hello_large.txt"); - let test_buffer = vec![0u8; 15_000_000]; + + let mut test_buffer = Vec::with_capacity(15_000_000); + for i in 0..15_000_000u32 { + test_buffer.push((i % 256) as u8); + } + + let expected_hash = md5::compute(&test_buffer); + storage.put(test_path, Box::new(test_buffer)).await?; + assert_eq!(storage.file_num_bytes(test_path).await?, 15_000_000); + + let downloaded_data = storage.get_all(test_path).await?; + let actual_hash = md5::compute(&downloaded_data); + + assert_eq!( + expected_hash, actual_hash, + "Content hash mismatch - data corruption detected!" + ); + Ok(()) } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 007a78b3c8a..d4c9bd67d84 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -274,7 +274,7 @@ impl AzureBlobStorage { chunk_range(0..total_len as usize, part_len as usize).map(into_u64_range); let blob_client = self.container_client.blob_client(name); - let mut upload_blocks_stream_result = tokio_stream::iter(multipart_ranges.enumerate()) + let upload_blocks_stream = tokio_stream::iter(multipart_ranges.enumerate()) .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); @@ -284,7 +284,8 @@ impl AzureBlobStorage { .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { - let block_id = format!("block:{num}"); + // zero pad block ids to make them sortable as strings + let block_id = format!("block:{:05}", num); let (data, hash_digest) = extract_range_data_and_hash(moved_payload.box_clone(), range.clone()) .await?; @@ -301,16 +302,22 @@ impl AzureBlobStorage { }) .buffer_unordered(self.multipart_policy.max_concurrent_uploads()); - // Concurrently upload block with limit. - let mut block_list = BlockList::default(); - while let Some(put_block_result) = upload_blocks_stream_result.next().await { - match put_block_result { - Ok(block_id) => block_list - .blocks - .push(BlobBlockType::new_uncommitted(block_id)), - Err(error) => return Err(error.into()), - } - } + // Collect and sort block ids to preserve part order for put_block_list. + // Azure docs: "The put block list operation enforces the order in which blocks + // are to be combined to create a blob". + // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list + let mut block_ids: Vec = upload_blocks_stream + .try_collect() + .await + .map_err(StorageError::from)?; + block_ids.sort_unstable(); + + let block_list = BlockList { + blocks: block_ids + .into_iter() + .map(BlobBlockType::new_uncommitted) + .collect(), + }; // Commit all uploaded blocks. blob_client diff --git a/quickwit/quickwit-storage/tests/azure_storage.rs b/quickwit/quickwit-storage/tests/azure_storage.rs index c54720d5bbd..a7f838b1070 100644 --- a/quickwit/quickwit-storage/tests/azure_storage.rs +++ b/quickwit/quickwit-storage/tests/azure_storage.rs @@ -45,7 +45,7 @@ async fn azure_storage_test_suite() -> anyhow::Result<()> { object_storage.set_policy(MultiPartPolicy { // On azure, block size is limited between 64KB and 100MB. - target_part_num_bytes: 5 * 1_024 * 1_024, // 5MB + target_part_num_bytes: 2 * 1_024 * 1_024, // 2MB max_num_parts: 10_000, multipart_threshold_num_bytes: 10_000_000, max_object_num_bytes: 5_000_000_000_000,