diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index a02d7b56524..f808ac83286 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -383,9 +383,28 @@ pub(crate) mod test_suite { #[cfg(feature = "integration-testsuite")] pub async fn storage_test_multi_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> { let test_path = Path::new("hello_large.txt"); - let test_buffer = vec![0u8; 15_000_000]; - storage.put(test_path, Box::new(test_buffer)).await?; + + let mut test_buffer = Vec::with_capacity(15_000_000); + for i in 0..15_000_000u32 { + test_buffer.push((i % 256) as u8); + } + + storage + .put(test_path, Box::new(test_buffer.clone())) + .await?; + assert_eq!(storage.file_num_bytes(test_path).await?, 15_000_000); + + let downloaded_data = storage.get_all(test_path).await?; + + assert_eq!(test_buffer.len(), downloaded_data.len(), "Length mismatch"); + // dont use assert_eq since we dont want large buffers to be printed + // if assert fails + assert!( + test_buffer.as_slice() == downloaded_data.as_slice(), + "Content mismatch - data corruption detected!" + ); + Ok(()) } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 007a78b3c8a..d4c9bd67d84 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -274,7 +274,7 @@ impl AzureBlobStorage { chunk_range(0..total_len as usize, part_len as usize).map(into_u64_range); let blob_client = self.container_client.blob_client(name); - let mut upload_blocks_stream_result = tokio_stream::iter(multipart_ranges.enumerate()) + let upload_blocks_stream = tokio_stream::iter(multipart_ranges.enumerate()) .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); @@ -284,7 +284,8 @@ impl AzureBlobStorage { .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { - let block_id = format!("block:{num}"); + // zero pad block ids to make them sortable as strings + let block_id = format!("block:{:05}", num); let (data, hash_digest) = extract_range_data_and_hash(moved_payload.box_clone(), range.clone()) .await?; @@ -301,16 +302,22 @@ impl AzureBlobStorage { }) .buffer_unordered(self.multipart_policy.max_concurrent_uploads()); - // Concurrently upload block with limit. - let mut block_list = BlockList::default(); - while let Some(put_block_result) = upload_blocks_stream_result.next().await { - match put_block_result { - Ok(block_id) => block_list - .blocks - .push(BlobBlockType::new_uncommitted(block_id)), - Err(error) => return Err(error.into()), - } - } + // Collect and sort block ids to preserve part order for put_block_list. + // Azure docs: "The put block list operation enforces the order in which blocks + // are to be combined to create a blob". + // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list + let mut block_ids: Vec = upload_blocks_stream + .try_collect() + .await + .map_err(StorageError::from)?; + block_ids.sort_unstable(); + + let block_list = BlockList { + blocks: block_ids + .into_iter() + .map(BlobBlockType::new_uncommitted) + .collect(), + }; // Commit all uploaded blocks. blob_client diff --git a/quickwit/quickwit-storage/tests/azure_storage.rs b/quickwit/quickwit-storage/tests/azure_storage.rs index c54720d5bbd..c5d4937220a 100644 --- a/quickwit/quickwit-storage/tests/azure_storage.rs +++ b/quickwit/quickwit-storage/tests/azure_storage.rs @@ -45,7 +45,7 @@ async fn azure_storage_test_suite() -> anyhow::Result<()> { object_storage.set_policy(MultiPartPolicy { // On azure, block size is limited between 64KB and 100MB. - target_part_num_bytes: 5 * 1_024 * 1_024, // 5MB + target_part_num_bytes: 5 * 1_024 * 1_024, // 5MiB max_num_parts: 10_000, multipart_threshold_num_bytes: 10_000_000, max_object_num_bytes: 5_000_000_000_000,