Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,26 @@ pub(crate) mod test_suite {
#[cfg(feature = "integration-testsuite")]
pub async fn storage_test_multi_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> {
let test_path = Path::new("hello_large.txt");
let test_buffer = vec![0u8; 15_000_000];

let mut test_buffer = Vec::with_capacity(15_000_000);
for i in 0..15_000_000u32 {
test_buffer.push((i % 256) as u8);
}

let expected_hash = md5::compute(&test_buffer);

storage.put(test_path, Box::new(test_buffer)).await?;

assert_eq!(storage.file_num_bytes(test_path).await?, 15_000_000);

let downloaded_data = storage.get_all(test_path).await?;
let actual_hash = md5::compute(&downloaded_data);

assert_eq!(
expected_hash, actual_hash,
"Content hash mismatch - data corruption detected!"
);

Ok(())
}
}
31 changes: 19 additions & 12 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl AzureBlobStorage {
chunk_range(0..total_len as usize, part_len as usize).map(into_u64_range);

let blob_client = self.container_client.blob_client(name);
let mut upload_blocks_stream_result = tokio_stream::iter(multipart_ranges.enumerate())
let upload_blocks_stream = tokio_stream::iter(multipart_ranges.enumerate())
.map(|(num, range)| {
let moved_blob_client = blob_client.clone();
let moved_payload = payload.clone();
Expand All @@ -284,7 +284,8 @@ impl AzureBlobStorage {
.inc_by(range.end - range.start);
async move {
retry(&self.retry_params, || async {
let block_id = format!("block:{num}");
// zero pad block ids to make them sortable as strings
let block_id = format!("block:{:05}", num);
let (data, hash_digest) =
extract_range_data_and_hash(moved_payload.box_clone(), range.clone())
.await?;
Expand All @@ -301,16 +302,22 @@ impl AzureBlobStorage {
})
.buffer_unordered(self.multipart_policy.max_concurrent_uploads());

// Concurrently upload block with limit.
let mut block_list = BlockList::default();
while let Some(put_block_result) = upload_blocks_stream_result.next().await {
match put_block_result {
Ok(block_id) => block_list
.blocks
.push(BlobBlockType::new_uncommitted(block_id)),
Err(error) => return Err(error.into()),
}
}
// Collect and sort block ids to preserve part order for put_block_list.
// Azure docs: "The put block list operation enforces the order in which blocks
// are to be combined to create a blob".
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
let mut block_ids: Vec<String> = upload_blocks_stream
.try_collect()
.await
.map_err(StorageError::from)?;
block_ids.sort_unstable();

let block_list = BlockList {
blocks: block_ids
.into_iter()
.map(BlobBlockType::new_uncommitted)
.collect(),
};

// Commit all uploaded blocks.
blob_client
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/tests/azure_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn azure_storage_test_suite() -> anyhow::Result<()> {

object_storage.set_policy(MultiPartPolicy {
// On azure, block size is limited between 64KB and 100MB.
target_part_num_bytes: 5 * 1_024 * 1_024, // 5MB
target_part_num_bytes: 2 * 1_024 * 1_024, // 2MB
max_num_parts: 10_000,
multipart_threshold_num_bytes: 10_000_000,
max_object_num_bytes: 5_000_000_000_000,
Expand Down
Loading