From c71bda277b4484eca8134bf74dc453ac89cbb783 Mon Sep 17 00:00:00 2001 From: Shrijeet Paliwal Date: Tue, 23 Sep 2025 13:55:37 -0700 Subject: [PATCH] Fix azure multipart upload data corruption Blocks were added to BlockList in completion order rather than sequential order, causing data corruption. Zero pad block IDs and sort before committing to ensure correct blob reconstruction. Also improve the existing test that was only checking file, the test would fail on main if integrity check was turned on. This PR passes the unit test, also has been runnin in for last two days in production without data corruption, prior to this PR - split was getting corrupted every 45 minutes. --- quickwit/quickwit-storage/src/lib.rs | 23 ++++++++++++-- .../src/object_storage/azure_blob_storage.rs | 31 ++++++++++++------- .../quickwit-storage/tests/azure_storage.rs | 2 +- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index a02d7b56524..f808ac83286 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -383,9 +383,28 @@ pub(crate) mod test_suite { #[cfg(feature = "integration-testsuite")] pub async fn storage_test_multi_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> { let test_path = Path::new("hello_large.txt"); - let test_buffer = vec![0u8; 15_000_000]; - storage.put(test_path, Box::new(test_buffer)).await?; + + let mut test_buffer = Vec::with_capacity(15_000_000); + for i in 0..15_000_000u32 { + test_buffer.push((i % 256) as u8); + } + + storage + .put(test_path, Box::new(test_buffer.clone())) + .await?; + assert_eq!(storage.file_num_bytes(test_path).await?, 15_000_000); + + let downloaded_data = storage.get_all(test_path).await?; + + assert_eq!(test_buffer.len(), downloaded_data.len(), "Length mismatch"); + // dont use assert_eq since we dont want large buffers to be printed + // if assert fails + assert!( + test_buffer.as_slice() == downloaded_data.as_slice(), + "Content mismatch - data corruption detected!" + ); + Ok(()) } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 007a78b3c8a..d4c9bd67d84 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -274,7 +274,7 @@ impl AzureBlobStorage { chunk_range(0..total_len as usize, part_len as usize).map(into_u64_range); let blob_client = self.container_client.blob_client(name); - let mut upload_blocks_stream_result = tokio_stream::iter(multipart_ranges.enumerate()) + let upload_blocks_stream = tokio_stream::iter(multipart_ranges.enumerate()) .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); @@ -284,7 +284,8 @@ impl AzureBlobStorage { .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { - let block_id = format!("block:{num}"); + // zero pad block ids to make them sortable as strings + let block_id = format!("block:{:05}", num); let (data, hash_digest) = extract_range_data_and_hash(moved_payload.box_clone(), range.clone()) .await?; @@ -301,16 +302,22 @@ impl AzureBlobStorage { }) .buffer_unordered(self.multipart_policy.max_concurrent_uploads()); - // Concurrently upload block with limit. - let mut block_list = BlockList::default(); - while let Some(put_block_result) = upload_blocks_stream_result.next().await { - match put_block_result { - Ok(block_id) => block_list - .blocks - .push(BlobBlockType::new_uncommitted(block_id)), - Err(error) => return Err(error.into()), - } - } + // Collect and sort block ids to preserve part order for put_block_list. + // Azure docs: "The put block list operation enforces the order in which blocks + // are to be combined to create a blob". + // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list + let mut block_ids: Vec = upload_blocks_stream + .try_collect() + .await + .map_err(StorageError::from)?; + block_ids.sort_unstable(); + + let block_list = BlockList { + blocks: block_ids + .into_iter() + .map(BlobBlockType::new_uncommitted) + .collect(), + }; // Commit all uploaded blocks. blob_client diff --git a/quickwit/quickwit-storage/tests/azure_storage.rs b/quickwit/quickwit-storage/tests/azure_storage.rs index c54720d5bbd..c5d4937220a 100644 --- a/quickwit/quickwit-storage/tests/azure_storage.rs +++ b/quickwit/quickwit-storage/tests/azure_storage.rs @@ -45,7 +45,7 @@ async fn azure_storage_test_suite() -> anyhow::Result<()> { object_storage.set_policy(MultiPartPolicy { // On azure, block size is limited between 64KB and 100MB. - target_part_num_bytes: 5 * 1_024 * 1_024, // 5MB + target_part_num_bytes: 5 * 1_024 * 1_024, // 5MiB max_num_parts: 10_000, multipart_threshold_num_bytes: 10_000_000, max_object_num_bytes: 5_000_000_000_000,