use multi-threaded XZ decompression via liblzma#13
use multi-threaded XZ decompression via liblzma#13bennyz merged 2 commits intojumpstarter-dev:mainfrom
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 46 minutes and 27 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (9)
📝 WalkthroughWalkthroughReplaces subprocess XZ handling with an in-process decompression pipeline (liblzma), adds URL-based compression detection, threads an XZ memory-limit option through callers, streams decompressed chunks via channels with progress, and swaps the Changes
Sequence DiagramsequenceDiagram
participant DL as Downloader
participant BUF as Buffer\n(ByteBoundedReceiver)
participant DE as Decompressor\nThread
participant CONS as Consumer\n(Writer Task)
participant DST as Destination
DL->>BUF: send Bytes chunks
rect rgba(100,200,100,0.5)
BUF->>DE: ChannelReader blocks & reads Bytes
DE->>DE: decode chunk (liblzma / flate2 / passthrough)
DE->>CONS: send decompressed Vec<u8>
DE->>CONS: send consumed progress via consumed_tx
end
CONS->>CONS: detect format & parse sparse data
CONS->>DST: write to block device
CONS->>DE: stop/close when done
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/fls/decompress.rs`:
- Around line 127-130: The Compression::Zstd branch in decompress.rs currently
returns a static "install zstdcat" error but get_compression_from_url routes
.zst/.zstd URLs here, so the message is misleading and no external fallback is
used; either implement an external fallback by invoking zstdcat (use
std::process::Command in the function handling decompression, detect absence of
the binary and surface a clear error, stream input/output and propagate exit
status) or update the error text in the Compression::Zstd return to explicitly
state "URL Zstd flashing is unsupported" (and keep a comment referencing
get_compression_from_url), choosing one approach and ensuring the symbol
Compression::Zstd in decompress.rs and the caller used by
get_compression_from_url are updated accordingly.
- Around line 83-90: get_compression_from_url currently takes the full URL
including query strings/fragments, so filenames like
"image.xz?X-Amz-Signature=..." yield Compression::None; update
get_compression_from_url to first strip any query string or fragment by
splitting the input on '?' and '#' and using the part before them, then extract
the extension (rsplit('.') etc.) and match to Compression (Gzip, Xz, Zstd, None)
as before; this ensures signed URLs and URLs with fragments are detected
correctly.
- Around line 117-123: The MT XZ decoder currently disables memory limits by
calling memlimit_threading(u64::MAX) and memlimit_stop(u64::MAX) on
liblzma::stream::MtStreamBuilder, which is unsafe for untrusted firmware; change
this to use a conservative bounded limit instead (e.g., a constant
DEFAULT_XZ_MEMLIMIT) or read a configurable limit from BlockFlashOptions (add an
xz_memlimit or xz_max_memory field), validate it, and pass that value into
memlimit_threading(...) and memlimit_stop(...); leave the rest of the
MtStreamBuilder/decoder creation and the XzDecoder::new_stream(channel_reader,
stream) call intact, and ensure the option has a sensible default so callers
that don’t opt-in get safe behavior.
In `@src/fls/oci/from_oci.rs`:
- Line 16: The OCI tar-entry XZ extraction paths still instantiate
single-threaded decoders with XzDecoder::new(); replace those constructions with
the multi-threaded builder pattern used in src/fls/decompress.rs by importing
liblzma::MtStreamBuilder and creating decoders via MtStreamBuilder::new_stream()
(mirroring the same options/unwrap/error handling used in decompress.rs),
updating the two places that call XzDecoder::new() to use the
MtStreamBuilder::new_stream() stream wrapper instead so OCI extraction uses the
MT decoder.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d5b19a2c-d622-443f-9e43-05306f3326b8
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
Cargo.tomlsrc/fls/decompress.rssrc/fls/from_url.rssrc/fls/magic_bytes.rssrc/fls/oci/from_oci.rssrc/fls/stream_utils.rstests/common/mod.rs
60f5446 to
dc76f94
Compare
| compression: Compression, | ||
| consumed_progress_tx: mpsc::UnboundedSender<u64>, | ||
| ) -> Result<DecompressorResult, Box<dyn std::error::Error>> { | ||
| let (decompressed_tx, decompressed_rx) = mpsc::channel::<Vec<u8>>(8); |
There was a problem hiding this comment.
Isn't Vec a bit unbounded?
LLMs always hit us with their unbound usage of memory, we need to make sure that our memory consumption does not grow unbounded, for example if we have a small block that decompresses as a lot of 0's or similar (unlikely for simg, but likely for non-simg)
mangelajo
left a comment
There was a problem hiding this comment.
nice, only a comment on the decompression channel.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
src/fls/decompress.rs (2)
83-93: Edge case: dots in non-filename path segments.
path.rsplit('.').next()is applied to the whole URL path, so a URL likehttps://host/v1.0/imageyieldsextension = "0/image"— harmless here (falls through toCompression::None), but a pathologicalhttps://host/a.xz/filewould resolve asCompression::Nonedespite an intent-ambiguous URL, and more importantlyhttps://host/dir.tar/file.XZalready works only because the final segment is taken byrsplit('.'). Splitting on/first (taking only the last path segment) would make this more robust and predictable.♻️ Proposed refinement
pub(crate) fn get_compression_from_url(url: &str) -> Compression { let path = url.split('?').next().unwrap_or(url); let path = path.split('#').next().unwrap_or(path); - let extension = path.rsplit('.').next().unwrap_or("").to_lowercase(); + let filename = path.rsplit('/').next().unwrap_or(""); + let extension = filename.rsplit('.').next().unwrap_or("").to_lowercase(); match extension.as_str() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/decompress.rs` around lines 83 - 93, get_compression_from_url currently looks at the entire path when extracting an extension, which can misidentify extensions in intermediate path segments; after stripping query/hash, take only the final path segment (e.g., via path.rsplit('/').next().unwrap_or(path)) and then extract the extension from that segment with rsplit('.').next().unwrap_or("").to_lowercase() before matching to Compression::Gzip/Xz/Zstd/None to ensure only the filename tail is considered.
139-150: Per-chunkVec<u8>allocation and 8 MB read buffer.Two small refinements worth considering:
buf[..n].to_vec()allocates a freshVecfor every read (up to 8 MB each) and thempsc::channel::<Vec<u8>>(8)means up to ~64 MB of decompressed data held in flight — bounded, but allocation churn on the hot path is avoidable. Consider sendingbytes::Bytes(withBytesMut/split_to) to avoid the copy, which also aligns the decompressed-side type with the already-Bytes-based compressed side.vec![0u8; 8 * 1024 * 1024]zero-initializes 8 MB up-front; that memset is wasted work sinceRead::readwill overwrite. Not a correctness issue, just a minor cost.Addresses the earlier concern from
@mangelajoaboutVec<u8>growth patterns — note that the channel being bounded to 8 items already caps total in-flight memory, but per-chunk allocation remains.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/decompress.rs` around lines 139 - 150, The loop currently creates a zeroed 8 MiB Vec per read and clones each filled slice via buf[..n].to_vec(), causing allocation churn; change to allocate a single bytes::BytesMut buffer once (e.g., let mut buf = BytesMut::with_capacity(8*1024*1024)), reuse it for each decoder.read by temporarily setting its length (unsafe set_len or advance_mut pattern), call decoder.read(&mut buf[..]) to fill n bytes, then extract the filled region with buf.split_to(n).freeze() (or split_to(n) into Bytes) and send that Bytes on decompressed_tx; this removes per-chunk Vec allocations and avoids the upfront zero-initialize cost while preserving the bounded channel behavior.src/fls/stream_utils.rs (1)
42-49: Progress reported before the chunk is actually consumed.
tx.send(data.len())fires as soon as the chunk is received fromrx, not as the bytes are read out viabuf. For the decompressor path this is fine (the chunk is fully "consumed from the buffer" at that point), but note that the variable nameconsumed_progress_txin the caller suggests "bytes the decompressor consumed" — which matches this semantics. Worth a one-line doc comment onwith_progressclarifying that progress counts bytes pulled from the channel, not bytes returned byread.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/stream_utils.rs` around lines 42 - 49, The progress send in the read loop (when match self.rx.blocking_recv() yields Some(data) and calls progress_tx.send(data.len() as u64)) reports bytes pulled from the channel rather than bytes returned by Read; add a one-line doc comment on the with_progress wrapper (and/or the consumed_progress_tx parameter) clarifying that progress counts bytes received from the internal channel (i.e., bytes pulled via rx.blocking_recv()) not bytes actually returned by StreamUtils::read, and reference the symbols with_progress, consumed_progress_tx, rx.blocking_recv, and progress_tx so future readers know the intended semantics.src/main.rs (1)
75-77: Consider clarifying the help text."exceeds = single-thread fallback, then error" is terse and only partially accurate for liblzma:
memlimit_threadingtriggers MT→ST fallback, andmemlimit_stopaborts. Since both are set to the same value here, the fallback path is effectively skipped. Consider rephrasing to: "XZ decompression memory limit in MB; streams exceeding this limit will fail (default: 256)".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 75 - 77, Update the help text for the xz_memlimit argument to be clearer and accurate: replace the terse "exceeds = single-thread fallback, then error" with a concise description like "XZ decompression memory limit in MB; streams exceeding this limit will fail (default: 256)". Ensure the change is applied to the #[arg(...)] doc/long help for the xz_memlimit field so the CLI help and default remain correct; mention xz_memlimit by name so reviewers can locate the field and confirm the wording.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/fls/decompress.rs`:
- Around line 119-123: The code multiplies the CLI-provided xz_memlimit_mb by
1024*1024 without overflow checks causing silent wrap; replace the unchecked
multiplication with a checked/saturating operation (e.g. use
xz_memlimit_mb.saturating_mul(1024 * 1024)) when computing memlimit so the value
cannot wrap, keep the variable name memlimit as u64 and preserve the existing
eprintln logging of xz_memlimit_mb and num_threads.
- Around line 100-158: In start_inprocess_decompressor: detect the specific
blocking_send failure from decompressed_tx.blocking_send (the "Writer task
closed, stopping decompression" case) and treat it as a normal/clean shutdown
instead of returning an Err string; change the send error branch to return
Ok(()) (or log at debug level) so the thread exits cleanly, while other
decompression errors from decoder.read(...) still return Err; reference
decompressed_tx.blocking_send, the "Writer task closed" message, and
start_inprocess_decompressor to locate and update the logic.
---
Nitpick comments:
In `@src/fls/decompress.rs`:
- Around line 83-93: get_compression_from_url currently looks at the entire path
when extracting an extension, which can misidentify extensions in intermediate
path segments; after stripping query/hash, take only the final path segment
(e.g., via path.rsplit('/').next().unwrap_or(path)) and then extract the
extension from that segment with rsplit('.').next().unwrap_or("").to_lowercase()
before matching to Compression::Gzip/Xz/Zstd/None to ensure only the filename
tail is considered.
- Around line 139-150: The loop currently creates a zeroed 8 MiB Vec per read
and clones each filled slice via buf[..n].to_vec(), causing allocation churn;
change to allocate a single bytes::BytesMut buffer once (e.g., let mut buf =
BytesMut::with_capacity(8*1024*1024)), reuse it for each decoder.read by
temporarily setting its length (unsafe set_len or advance_mut pattern), call
decoder.read(&mut buf[..]) to fill n bytes, then extract the filled region with
buf.split_to(n).freeze() (or split_to(n) into Bytes) and send that Bytes on
decompressed_tx; this removes per-chunk Vec allocations and avoids the upfront
zero-initialize cost while preserving the bounded channel behavior.
In `@src/fls/stream_utils.rs`:
- Around line 42-49: The progress send in the read loop (when match
self.rx.blocking_recv() yields Some(data) and calls progress_tx.send(data.len()
as u64)) reports bytes pulled from the channel rather than bytes returned by
Read; add a one-line doc comment on the with_progress wrapper (and/or the
consumed_progress_tx parameter) clarifying that progress counts bytes received
from the internal channel (i.e., bytes pulled via rx.blocking_recv()) not bytes
actually returned by StreamUtils::read, and reference the symbols with_progress,
consumed_progress_tx, rx.blocking_recv, and progress_tx so future readers know
the intended semantics.
In `@src/main.rs`:
- Around line 75-77: Update the help text for the xz_memlimit argument to be
clearer and accurate: replace the terse "exceeds = single-thread fallback, then
error" with a concise description like "XZ decompression memory limit in MB;
streams exceeding this limit will fail (default: 256)". Ensure the change is
applied to the #[arg(...)] doc/long help for the xz_memlimit field so the CLI
help and default remain correct; mention xz_memlimit by name so reviewers can
locate the field and confirm the wording.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f92a9c46-b3e0-4f3f-8967-58dd50cbaf4f
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
Cargo.tomlsrc/fls/decompress.rssrc/fls/from_url.rssrc/fls/magic_bytes.rssrc/fls/oci/from_oci.rssrc/fls/options.rssrc/fls/stream_utils.rssrc/main.rstests/common/mod.rs
✅ Files skipped from review due to trivial changes (2)
- src/fls/oci/from_oci.rs
- tests/common/mod.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/fls/magic_bytes.rs
- Cargo.toml
- src/fls/from_url.rs
|
need to test OCI with xz |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/fls/from_url.rs (1)
494-538:⚠️ Potential issue | 🟡 MinorFinal-drain only covers
decompressed_progress_rx;decompressor_written_progress_rxandwritten_progress_rxcan still hold tail samples.After the poll loop exits at line 519 and the thread is joined, you re-drain only
decompressed_progress_rxat lines 536–538. Any updates that landed indecompressor_written_progress_rxorwritten_progress_rxbetween the lasttry_recvinside the loop and theis_finished()break will not be reflected inprogress.bytes_sent_to_decompressor/progress.bytes_writtenwhenprogress.decompress_durationandfinal_decompress_rateare computed just below. Worth draining all three for consistent final stats:♻️ Suggested fix
while let Ok(byte_count) = decompressed_progress_rx.try_recv() { progress.bytes_decompressed += byte_count; } + while let Ok(written_len) = decompressor_written_progress_rx.try_recv() { + progress.bytes_sent_to_decompressor += written_len; + } + while let Ok(written_bytes) = written_progress_rx.try_recv() { + progress.bytes_written = written_bytes; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/from_url.rs` around lines 494 - 538, The final drain only consumes decompressed_progress_rx so any leftover messages in decompressor_written_progress_rx or written_progress_rx after the loop can be lost; after joining the decompressor (decompressor_handle.join()) drain both decompressor_written_progress_rx and written_progress_rx the same way you drain decompressed_progress_rx so progress.bytes_sent_to_decompressor and progress.bytes_written reflect any tail samples—i.e., add while let Ok(...) try_recv loops for decompressor_written_progress_rx and written_progress_rx (updating progress.bytes_sent_to_decompressor and progress.bytes_written respectively) immediately after the decompressor join and before computing final duration/rates.
🧹 Nitpick comments (3)
src/fls/from_url.rs (3)
169-173: Minor nit: use the importedCompressionrather than the fully-qualified path.You already pull
get_compression_from_urlin via theuseat line 9; importingCompressionalongside it avoids the inlinecrate::fls::compression::Compression::Noneon line 170 and reads a bit nicer.♻️ Suggested
-use crate::fls::decompress::{get_compression_from_url, start_inprocess_decompressor}; +use crate::fls::compression::Compression; +use crate::fls::decompress::{get_compression_from_url, start_inprocess_decompressor}; ... - let is_compressed = compression != crate::fls::compression::Compression::None; + let is_compressed = compression != Compression::None;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/from_url.rs` around lines 169 - 173, Import Compression alongside get_compression_from_url and use it in the comparison instead of the fully-qualified path; specifically, add Compression to the existing use that brings in get_compression_from_url and change the check `compression != crate::fls::compression::Compression::None` to `compression != Compression::None` (the variables/functions to update are get_compression_from_url, Compression, and the existing is_compressed/compression comparison).
209-215: Reject unsupported Zstd URLs up-front instead of from inside the decompressor thread.
start_inprocess_decompressoronly returns the Zstd-not-supported error from inside the spawned thread (persrc/fls/decompress.rssnippet). That means when a user pointsflsat a.zstURL, you will:
- Successfully set up the HTTP client and buffer channel.
- Start the download loop and begin pushing bytes into
buffer_tx.- Only surface the error later, after
buffer_tx.send()starts failing (becausebuffer_rxwas dropped when the thread exited) or atdecompressor_handle.join().That wastes bandwidth and produces a confusing error path ("Buffer channel closed" vs. the real cause). Consider checking
compressionagainst the supported set before downloading, e.g.:♻️ Proposed early rejection
let compression = get_compression_from_url(url); let is_compressed = compression != crate::fls::compression::Compression::None; + if matches!(compression, crate::fls::compression::Compression::Zstd) { + return Err("Zstd in-process decompression is not supported".into()); + } if is_compressed { eprintln!("Using decompressor: {} (in-process)", compression); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/from_url.rs` around lines 209 - 215, Reject unsupported compression types (e.g., Zstd) before spawning the in-process decompressor and starting the download: check the compression value (the same enum/variant used when calling start_inprocess_decompressor) early in the from_url flow and return an explicit unsupported-compression error if it's Zstd (or any other unsupported variant) so the caller sees the real error immediately instead of after buffer channel failures or decompressor_handle.join(); update the logic around the start_inprocess_decompressor call to perform this pre-check on compression.
517-534: Wrapjoin()inspawn_blockingto avoid blocking the tokio worker thread.
decompressor_handle.join()at line 524 is a synchronous blocking call executed directly on the tokio runtime. Callingstd::thread::JoinHandle::join()in an async context blocks the worker thread, which on acurrent_threadruntime can stall the entire runtime, and on a multi-thread runtime parks a worker inefficiently.Use
tokio::task::spawn_blockingto offload the join operation to tokio's dedicated blocking thread pool. This also eliminates the need for the 100 ms polling loop above:♻️ Proposed change
- if decompressor_handle.is_finished() { - break; - } - - tokio::time::sleep(Duration::from_millis(100)).await; - } - - match decompressor_handle.join() { - Ok(Ok(())) => {} - Ok(Err(e)) => { + tokio::time::sleep(Duration::from_millis(100)).await; + if decompressor_handle.is_finished() { + break; + } + } + + match tokio::task::spawn_blocking(move || decompressor_handle.join()).await { + Ok(Ok(Ok(()))) => {} + Ok(Ok(Err(e))) => { eprintln!(); return Err(e.into()); } - Err(_) => { + Ok(Err(_)) | Err(_) => { eprintln!(); return Err("Decompressor thread panicked".into()); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/from_url.rs` around lines 517 - 534, The code currently calls decompressor_handle.join() directly (and uses the 100ms polling loop while waiting), which blocks the tokio worker; replace this by offloading the join to tokio's blocking pool: call tokio::task::spawn_blocking(move || decompressor_handle.join()) and await that JoinHandle, then match the nested Result as before (handle Ok(Ok(())), Ok(Err(e)) and Err(_) cases). Remove the polling loop that checks decompressor_handle.is_finished() and the sleep, and ensure the variable name decompressor_handle is moved into the spawn_blocking closure so the synchronous join runs on the blocking threadpool instead of the async worker.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/fls/from_url.rs`:
- Around line 494-538: The final drain only consumes decompressed_progress_rx so
any leftover messages in decompressor_written_progress_rx or written_progress_rx
after the loop can be lost; after joining the decompressor
(decompressor_handle.join()) drain both decompressor_written_progress_rx and
written_progress_rx the same way you drain decompressed_progress_rx so
progress.bytes_sent_to_decompressor and progress.bytes_written reflect any tail
samples—i.e., add while let Ok(...) try_recv loops for
decompressor_written_progress_rx and written_progress_rx (updating
progress.bytes_sent_to_decompressor and progress.bytes_written respectively)
immediately after the decompressor join and before computing final
duration/rates.
---
Nitpick comments:
In `@src/fls/from_url.rs`:
- Around line 169-173: Import Compression alongside get_compression_from_url and
use it in the comparison instead of the fully-qualified path; specifically, add
Compression to the existing use that brings in get_compression_from_url and
change the check `compression != crate::fls::compression::Compression::None` to
`compression != Compression::None` (the variables/functions to update are
get_compression_from_url, Compression, and the existing
is_compressed/compression comparison).
- Around line 209-215: Reject unsupported compression types (e.g., Zstd) before
spawning the in-process decompressor and starting the download: check the
compression value (the same enum/variant used when calling
start_inprocess_decompressor) early in the from_url flow and return an explicit
unsupported-compression error if it's Zstd (or any other unsupported variant) so
the caller sees the real error immediately instead of after buffer channel
failures or decompressor_handle.join(); update the logic around the
start_inprocess_decompressor call to perform this pre-check on compression.
- Around line 517-534: The code currently calls decompressor_handle.join()
directly (and uses the 100ms polling loop while waiting), which blocks the tokio
worker; replace this by offloading the join to tokio's blocking pool: call
tokio::task::spawn_blocking(move || decompressor_handle.join()) and await that
JoinHandle, then match the nested Result as before (handle Ok(Ok(())),
Ok(Err(e)) and Err(_) cases). Remove the polling loop that checks
decompressor_handle.is_finished() and the sleep, and ensure the variable name
decompressor_handle is moved into the spawn_blocking closure so the synchronous
join runs on the blocking threadpool instead of the async worker.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d45be885-98d0-40c5-8707-f0ff3d091872
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
Cargo.tomlsrc/fls/decompress.rssrc/fls/from_url.rssrc/fls/magic_bytes.rssrc/fls/oci/from_oci.rssrc/fls/options.rssrc/fls/stream_utils.rssrc/main.rstests/common/mod.rs
✅ Files skipped from review due to trivial changes (2)
- Cargo.toml
- tests/common/mod.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- src/fls/magic_bytes.rs
- src/fls/options.rs
- src/main.rs
- src/fls/decompress.rs
- src/fls/oci/from_oci.rs
Replace xz2 crate with liblzma 0.4 (parallel feature) to enable lzma_stream_decoder_mt() This helps remove the decompression bottleneck from XZ decompression speeding flashing up by ~4x Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com> Assisted-by: claude-opus-4.6
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/fls/from_url.rs (2)
517-534:JoinHandle::join()inside async context — acceptable here, worth a brief note.
decompressor_handle.join()is a blocking call on anstd::thread::JoinHandle, but the preceding loop pollsis_finished()before breaking, sojoin()returns immediately without parking the tokio worker. This is fine as-is; a small comment would help future readers avoid "fixing" it tospawn_blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/from_url.rs` around lines 517 - 534, Add a short clarifying comment above the decompressor wait loop / the match on decompressor_handle.join() explaining that decompressor_handle is an std::thread::JoinHandle and that the loop polls decompressor_handle.is_finished() before breaking, so the subsequent decompressor_handle.join() is guaranteed to return immediately and not block the Tokio worker, hence spawn_blocking is unnecessary; reference the decompressor_handle.is_finished() loop and the decompressor_handle.join() match to locate where to add the comment.
169-215: Fail fast for unsupported URL compressions (e.g. Zstd) before starting the download.
get_compression_from_urlcan returnCompression::Zstd, andstart_inprocess_decompressorhands that branch a hardErr("Zstd in-process decompression is not supported")inside the decompressor thread. The main HTTP loop only observes that failure throughwriter_handle.is_finished()between chunks, so a user pointingflsat a.zstURL may wait for substantial download and buffering before the error surfaces.A cheap guard before setting up channels/writer avoids that:
let compression = get_compression_from_url(url); + if compression == crate::fls::compression::Compression::Zstd { + return Err( + "Zstd URL decompression is not supported by the in-process pipeline".into(), + ); + } let is_compressed = compression != crate::fls::compression::Compression::None;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/from_url.rs` around lines 169 - 215, Detect unsupported in-process compression early by calling get_compression_from_url and checking for Compression::Zstd (or other unsupported variants) immediately after computing compression and before allocating channels/writer/buffers; if unsupported, return an Err (or surface a user-facing error) instead of proceeding. Specifically, in from_url.rs after let compression = get_compression_from_url(url) and before creating channels/writer/buffer/start_inprocess_decompressor, add a guard that rejects Compression::Zstd with a clear error message so start_inprocess_decompressor is never invoked for unsupported formats.src/fls/decompress.rs (1)
157-168: Optional: reuse the read buffer instead ofto_vec()per chunk.Each loop iteration allocates a fresh
Vec<u8>up to 8 MiB. With thempsc::channel::<Vec<u8>>(8)capacity that's bounded (~64 MiB peak), so it's not a leak, but it is avoidable churn on the hot decompression path. Consider sendingBytes(cheap clone) or pooling the buffer if profiling shows this matters.♻️ Sketch
- let mut buf = vec![0u8; 8 * 1024 * 1024]; + let mut buf = vec![0u8; 8 * 1024 * 1024]; loop { let n = decoder .read(&mut buf) .map_err(|e| format!("Decompression error: {}", e))?; if n == 0 { break; } - if decompressed_tx.blocking_send(buf[..n].to_vec()).is_err() { + // e.g. change channel to mpsc::Sender<bytes::Bytes> and: + if decompressed_tx.blocking_send(bytes::Bytes::copy_from_slice(&buf[..n])).is_err() { return Err("Writer task closed, stopping decompression".to_string()); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/decompress.rs` around lines 157 - 168, The loop currently clones each chunk with buf[..n].to_vec(), causing allocation churn; instead take ownership of the filled Vec and send it without copying (e.g. change channel element type or use bytes::Bytes). After decoder.read(&mut buf) returns n, call buf.truncate(n), convert the Vec into a Bytes without copying via Bytes::from(std::mem::take(&mut buf)) (or send the Vec directly if you change the mpsc type), recreate buf as a fresh vec with the original capacity for the next iteration, and then call decompressed_tx.blocking_send on the owned buffer; update imports (bytes::Bytes) and the channel element type accordingly to avoid per-chunk to_vec() allocations while keeping the decoder.read / decompressed_tx usage the same.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/fls/decompress.rs`:
- Around line 137-148: The code sets memlimit_threading and memlimit_stop to the
same value, which removes the soft fallback to fewer threads; update the
MtStreamBuilder usage in src/fls/decompress.rs so memlimit_threading uses
mb_to_bytes(xz_memlimit_mb) but memlimit_stop is larger (e.g., u64::MAX or a
reasonable multiplier) to allow thread-reduction before error, or alternatively
change the CLI help text to remove the "single-thread fallback" wording if you
prefer a hard stop; adjust the calls to .memlimit_threading(memlimit) and
.memlimit_stop(...) on the MtStreamBuilder that feeds
liblzma::read::XzDecoder::new_stream accordingly.
In `@src/main.rs`:
- Around line 75-77: The CLI help for xz_memlimit (argument xz_memlimit)
promises a "single-thread fallback" but the decompressor config currently sets
memlimit_threading and memlimit_stop to the same value in src/fls/decompress.rs,
which causes an immediate error instead of downgrading to single-thread; fix by
either (A) updating the help text in main.rs to remove the
single-thread-fallback wording to match current behavior, or (B) implement the
intended fallback by configuring memlimit_threading and memlimit_stop
appropriately in the decoder setup (ensure memlimit_threading < memlimit_stop
and the code path in the decompression logic uses memlimit_threading for
threaded attempts and memlimit_stop as the hard stop before error), referencing
the xz_memlimit CLI arg and the memlimit_threading / memlimit_stop variables to
locate and change the code.
---
Nitpick comments:
In `@src/fls/decompress.rs`:
- Around line 157-168: The loop currently clones each chunk with
buf[..n].to_vec(), causing allocation churn; instead take ownership of the
filled Vec and send it without copying (e.g. change channel element type or use
bytes::Bytes). After decoder.read(&mut buf) returns n, call buf.truncate(n),
convert the Vec into a Bytes without copying via Bytes::from(std::mem::take(&mut
buf)) (or send the Vec directly if you change the mpsc type), recreate buf as a
fresh vec with the original capacity for the next iteration, and then call
decompressed_tx.blocking_send on the owned buffer; update imports (bytes::Bytes)
and the channel element type accordingly to avoid per-chunk to_vec() allocations
while keeping the decoder.read / decompressed_tx usage the same.
In `@src/fls/from_url.rs`:
- Around line 517-534: Add a short clarifying comment above the decompressor
wait loop / the match on decompressor_handle.join() explaining that
decompressor_handle is an std::thread::JoinHandle and that the loop polls
decompressor_handle.is_finished() before breaking, so the subsequent
decompressor_handle.join() is guaranteed to return immediately and not block the
Tokio worker, hence spawn_blocking is unnecessary; reference the
decompressor_handle.is_finished() loop and the decompressor_handle.join() match
to locate where to add the comment.
- Around line 169-215: Detect unsupported in-process compression early by
calling get_compression_from_url and checking for Compression::Zstd (or other
unsupported variants) immediately after computing compression and before
allocating channels/writer/buffers; if unsupported, return an Err (or surface a
user-facing error) instead of proceeding. Specifically, in from_url.rs after let
compression = get_compression_from_url(url) and before creating
channels/writer/buffer/start_inprocess_decompressor, add a guard that rejects
Compression::Zstd with a clear error message so start_inprocess_decompressor is
never invoked for unsupported formats.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6f918d5f-b33d-47b6-9d45-0cee5a11fff6
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
Cargo.tomlsrc/fls/decompress.rssrc/fls/from_url.rssrc/fls/magic_bytes.rssrc/fls/oci/from_oci.rssrc/fls/options.rssrc/fls/stream_utils.rssrc/main.rstests/common/mod.rs
✅ Files skipped from review due to trivial changes (1)
- Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (3)
- tests/common/mod.rs
- src/fls/magic_bytes.rs
- src/fls/options.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/fls/oci/from_oci.rs (1)
1443-1461: Extract MT XZ decoder construction into a shared helper.Lines 1443-1459 are an almost verbatim copy of
start_inprocess_decompressorinsrc/fls/decompress.rs(lines 132-149) — same thread sizing, sameeprintln!, sameMtStreamBuilderconfiguration. Adding acreate_mt_xz_decoder<R: Read>(reader: R, xz_memlimit_mb: u64) -> Result<XzDecoder<R>, String>next to the existingcreate_xz_decoderwould eliminate the drift risk and make any future change to memlimit semantics a one-line fix.Minor, in the same match: the
_ =>arm at line 1460 silently lumpsCompression::ZstdwithCompression::None, so a Zstd stream would pass through as raw bytes ifflash_raw_disk_image_directly's upstream Zstd guard (line 2387) is ever bypassed. Consider matchingCompression::Noneexplicitly and keepingCompression::Zstdas a hard error here too.♻️ Sketch of the shared helper
In
src/fls/decompress.rs:pub(crate) fn create_mt_xz_decoder<R: Read>( reader: R, xz_memlimit_mb: u64, ) -> Result<liblzma::read::XzDecoder<R>, String> { let num_threads = std::thread::available_parallelism() .map(|n| n.get() as u32) .unwrap_or(2); let memlimit = mb_to_bytes(xz_memlimit_mb); eprintln!( "XZ decompression: {} threads, memory limit {}MB", num_threads, xz_memlimit_mb ); let stream = liblzma::stream::MtStreamBuilder::new() .threads(num_threads) .memlimit_threading(memlimit) .memlimit_stop(memlimit) .decoder() .map_err(|e| format!("Failed to create MT XZ decoder: {}", e))?; Ok(liblzma::read::XzDecoder::new_stream(reader, stream)) }Then at the call sites:
- Compression::Xz => { - let num_threads = std::thread::available_parallelism() - .map(|n| n.get() as u32) - .unwrap_or(2); - eprintln!( - "XZ decompression: {} threads, memory limit {}MB", - num_threads, xz_memlimit_mb - ); - let memlimit = crate::fls::decompress::mb_to_bytes(xz_memlimit_mb); - let stream = liblzma::stream::MtStreamBuilder::new() - .threads(num_threads) - .memlimit_threading(memlimit) - .memlimit_stop(memlimit) - .decoder() - .map_err(|e| format!("Failed to create MT XZ decoder: {}", e))?; - Box::new(liblzma::read::XzDecoder::new_stream(reader, stream)) - } + Compression::Xz => Box::new( + crate::fls::decompress::create_mt_xz_decoder(reader, xz_memlimit_mb)?, + ),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/fls/oci/from_oci.rs` around lines 1443 - 1461, Extract the duplicated MT XZ decoder construction into a shared helper function (e.g., create_mt_xz_decoder<R: Read>(reader: R, xz_memlimit_mb: u64) -> Result<liblzma::read::XzDecoder<R>, String>) and replace the inlined builder in from_oci::Compression::Xz branch and start_inprocess_decompressor with calls to that helper; ensure the helper contains the thread sizing, eprintln!, Mb-to-bytes conversion, MtStreamBuilder configuration and error mapping (refer to create_xz_decoder and start_inprocess_decompressor to mirror behavior). Also change the match in from_oci (currently using `_ => Box::new(reader)`) to explicitly match Compression::None and return a hard error for Compression::Zstd so Zstd is not silently passed through (use the same error-reporting pattern used elsewhere).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/fls/oci/from_oci.rs`:
- Around line 1443-1461: Extract the duplicated MT XZ decoder construction into
a shared helper function (e.g., create_mt_xz_decoder<R: Read>(reader: R,
xz_memlimit_mb: u64) -> Result<liblzma::read::XzDecoder<R>, String>) and replace
the inlined builder in from_oci::Compression::Xz branch and
start_inprocess_decompressor with calls to that helper; ensure the helper
contains the thread sizing, eprintln!, Mb-to-bytes conversion, MtStreamBuilder
configuration and error mapping (refer to create_xz_decoder and
start_inprocess_decompressor to mirror behavior). Also change the match in
from_oci (currently using `_ => Box::new(reader)`) to explicitly match
Compression::None and return a hard error for Compression::Zstd so Zstd is not
silently passed through (use the same error-reporting pattern used elsewhere).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 445a2b9a-a532-4973-88bb-2e3cf799479e
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
Cargo.tomlsrc/fls/decompress.rssrc/fls/from_url.rssrc/fls/magic_bytes.rssrc/fls/oci/from_oci.rssrc/fls/options.rssrc/fls/stream_utils.rssrc/main.rstests/common/mod.rs
✅ Files skipped from review due to trivial changes (1)
- src/fls/options.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/fls/magic_bytes.rs
- tests/common/mod.rs
- Cargo.toml
Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com> Assisted-by: claude-opus-4.6
Replace xz2 crate with liblzma 0.4 (parallel feature) to enable
lzma_stream_decoder_mt()
This helps remove the decompression bottleneck from XZ decompression
speeding flashing up by ~4x