Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use tracing::{error, info, info_span, instrument, trace, warn};
use tracing_futures::Instrument as _;
use walkdir::WalkDir;

const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00];
const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";

type FileRange = RangeInclusive<u64>;
Expand Down Expand Up @@ -876,12 +875,9 @@ impl AsyncStorage {
// download the compressed raw blob first.
// Like this we can first check if it's worth recompressing & re-uploading.
let mut compressed_blob = compressed_stream.materialize(usize::MAX).await?;
if compressed_blob
.content
.last_chunk::<{ ZSTD_EOF_BYTES.len() }>()
== Some(&ZSTD_EOF_BYTES)
{
info!(path, "Archive already has correct zstd ending, skipping");

if decompress(compressed_blob.content.as_slice(), alg, usize::MAX).is_ok() {
info!(path, "Archive can be decompressed, skipping");
continue;
}

Expand All @@ -891,20 +887,14 @@ impl AsyncStorage {
let mut decompressed = Vec::new();
{
// old async-compression can read the broken zstd stream
let mut reader = wrap_reader_for_decompression(
io::Cursor::new(compressed_blob.content.clone()),
alg,
);
let mut reader =
wrap_reader_for_decompression(compressed_blob.content.as_slice(), alg);

tokio::io::copy(&mut reader, &mut decompressed).await?;
}

let mut buf = Vec::with_capacity(decompressed.len());
compress_async(&mut io::Cursor::new(&decompressed), &mut buf, alg).await?;
debug_assert_eq!(
buf.last_chunk::<{ ZSTD_EOF_BYTES.len() }>(),
Some(&ZSTD_EOF_BYTES)
);
compress_async(decompressed.as_slice(), &mut buf, alg).await?;
compressed_blob.content = buf;
compressed_blob.compression = Some(alg);

Expand Down Expand Up @@ -1176,12 +1166,13 @@ pub(crate) fn source_archive_path(name: &str, version: &Version) -> String {

#[cfg(test)]
mod test {
use crate::test::{TestEnvironment, V0_1};

use super::*;
use crate::test::{TestEnvironment, V0_1};
use std::env;
use test_case::test_case;

const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00];

fn streaming_blob(
content: impl Into<Vec<u8>>,
alg: Option<CompressionAlgorithm>,
Expand Down
Loading