Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

container: Drop async_compression + support zstd:chunked #622

Merged
merged 2 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.74.0"
[dependencies]
anyhow = "1.0"
containers-image-proxy = "0.5.5"
async-compression = { version = "0.4", features = ["gzip", "tokio", "zstd"] }
camino = "1.0.4"
chrono = "0.4.19"
olpc-cjson = "0.1.1"
Expand Down Expand Up @@ -43,6 +42,7 @@ tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1
tokio-util = { features = ["io-util"], version = "0.7" }
tokio-stream = { features = ["sync"], version = "0.1.8" }
tracing = "0.1"
zstd = "0.13.1"

indoc = { version = "2", optional = true }
xshell = { version = "0.2", optional = true }
Expand Down
88 changes: 72 additions & 16 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::container::store::LayerProgress;
use super::*;
use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::{Future, FutureExt};
use futures_util::{Future, FutureExt, TryFutureExt as _};
use oci_spec::image as oci_image;
use std::sync::{Arc, Mutex};
use tokio::{
Expand Down Expand Up @@ -189,22 +189,76 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

/// Take an async AsyncBufRead and handle decompression for it, returning
/// a wrapped AsyncBufRead implementation.
/// This is implemented with a background thread using a pipe-to-self,
/// and so there is an additional Future object returned that is a "driver"
/// task and must also be checked for errors.
pub(crate) fn decompress_bridge<'a>(
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
is_zstd: bool,
) -> Result<(
// This one is the input reader
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
// And this represents the worker thread doing copying
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
// We use a plain unix pipe() because it's just a very convenient
// way to bridge arbitrarily between sync and async with a worker
// thread. Yes, it involves going through the kernel, but
// eventually we'll replace all this logic with podman anyways.
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
let task = tokio::task::spawn_blocking(move || -> Result<()> {
// Convert the write half of the pipe() into a regular blocking file descriptor
let tx = tx.into_blocking_fd()?;
let mut tx = std::fs::File::from(tx);
// Convert the async input back to synchronous.
let src = tokio_util::io::SyncIoBridge::new(src);
let bufr = std::io::BufReader::new(src);
// Wrap the input in a decompressor; I originally tried to make
// this function take a function pointer, but yeah that was painful
// with the type system.
let mut src: Box<dyn std::io::Read> = if is_zstd {
Box::new(zstd::stream::read::Decoder::new(bufr)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(bufr))
};
// We don't care about the number of bytes copied
let _n: u64 = std::io::copy(&mut src, &mut tx)?;
Ok(())
})
// Flatten the nested Result<Result<>>
.map(crate::tokio_util::flatten_anyhow);
// And return the pair of futures
Ok((tokio::io::BufReader::new(rx), task))
}

/// Create a decompressor for this MIME type, given a stream of input.
fn new_async_decompressor<'a>(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'a,
) -> Result<Box<dyn AsyncBufRead + Send + Unpin + 'a>> {
match media_type {
oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(src),
))),
oci_image::MediaType::ImageLayerZstd => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::ZstdDecoder::new(src),
))),
oci_image::MediaType::ImageLayer => Ok(Box::new(src)),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Ok(Box::new(src)),
o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)),
}
src: impl AsyncBufRead + Send + Unpin + 'static,
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
let r: (
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
) = match media_type {
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
let (r, driver) = decompress_bridge(src, is_zstd)?;
(Box::new(r), Box::new(driver) as _)
}
oci_image::MediaType::ImageLayer => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
Expand Down Expand Up @@ -262,11 +316,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
progress.send_replace(Some(status));
}
};
let reader = new_async_decompressor(media_type, readprogress)?;
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
let driver = driver.and_then(|()| compression_driver);
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
} else {
let blob = new_async_decompressor(media_type, blob)?;
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
let driver = driver.and_then(|()| compression_driver);
Ok((blob, Either::Right(driver)))
}
}
29 changes: 20 additions & 9 deletions lib/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,11 @@ async fn test_tar_write() -> Result<()> {
#[tokio::test]
async fn test_tar_write_tar_layer() -> Result<()> {
let fixture = Fixture::new_v1()?;
let uncompressed_tar = tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(EXAMPLE_TAR_LAYER),
);
ostree_ext::tar::write_tar(fixture.destrepo(), uncompressed_tar, "test", None).await?;
let mut v = Vec::new();
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
let _n = std::io::copy(&mut dec, &mut v)?;
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
Ok(())
}

Expand Down Expand Up @@ -1261,10 +1262,8 @@ async fn test_container_write_derive() -> Result<()> {
Ok(())
}

/// Test for zstd
/// We need to handle the case of modified hardlinks into /sysroot
#[tokio::test]
async fn test_container_zstd() -> Result<()> {
/// Implementation of a test case for non-gzip (i.e. zstd or zstd:chunked) compression
async fn test_non_gzip(format: &str) -> Result<()> {
let fixture = Fixture::new_v1()?;
let baseimg = &fixture.export_container().await?.0;
let basepath = &match baseimg.transport {
Expand All @@ -1276,7 +1275,7 @@ async fn test_container_zstd() -> Result<()> {
let st = tokio::process::Command::new("skopeo")
.args([
"copy",
"--dest-compress-format=zstd",
&format!("--dest-compress-format={format}"),
baseimg_ref.as_str(),
&format!("oci:{zstd_image_path}"),
])
Expand All @@ -1302,6 +1301,18 @@ async fn test_container_zstd() -> Result<()> {
Ok(())
}

/// Test for zstd
#[tokio::test]
async fn test_container_zstd() -> Result<()> {
test_non_gzip("zstd").await
}

/// Test for zstd:chunked
#[tokio::test]
async fn test_container_zstd_chunked() -> Result<()> {
test_non_gzip("zstd:chunked").await
}

/// Test for https://github.com/ostreedev/ostree-rs-ext/issues/405
/// We need to handle the case of modified hardlinks into /sysroot
#[tokio::test]
Expand Down
Loading