From 694d8968e317122ff9b5ef8a3f7c03f537421232 Mon Sep 17 00:00:00 2001 From: oech3 <79379754+oech3@users.noreply.github.com> Date: Thu, 21 May 2026 19:03:12 +0900 Subject: [PATCH] pipes.rs: simplify by io::copy & doc the case it cannot be used --- src/uucore/src/lib/features/pipes.rs | 41 ++++++++-------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index c5fa252ce2..6fd3366d55 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -5,6 +5,8 @@ //! Thin zero-copy-related wrappers around functions. +#[cfg(any(target_os = "linux", target_os = "android"))] +use crate::io::{RawReader, RawWriter}; #[cfg(any(target_os = "linux", target_os = "android"))] use rustix::pipe::{SpliceFlags, fcntl_setpipe_size}; #[cfg(any(target_os = "linux", target_os = "android"))] @@ -106,11 +108,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re /// This should not be used if one of them are pipe to save resources #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn splice_unbounded_broker(source: &R, dest: &mut S) -> std::io::Result -where - R: Read + AsFd, - S: AsFd, -{ +pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result { static PIPE_CACHE: OnceLock> = OnceLock::new(); let Some((pipe_rd, pipe_wr)) = PIPE_CACHE .get_or_init(|| pipe::(MAX_ROOTLESS_PIPE_SIZE).ok()) @@ -134,10 +132,11 @@ where // we can recover by copying the data that we have from the // intermediate pipe to stdout using unbuffered read/write. Then // we tell the caller to fall back. + // use read_to_end to drain pipe for the case write failed debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); let mut drain = Vec::with_capacity(n); pipe_rd.take(n as u64).read_to_end(&mut drain)?; - crate::io::RawWriter(&dest).write_all(&drain)?; + RawWriter(&dest).write_all(&drain)?; return Ok(true); } } @@ -152,11 +151,7 @@ where /// (the fallback will be embedded to this function in the future) #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn splice_unbounded_auto(source: &R, dest: &mut S) -> std::io::Result -where - R: Read + AsFd, - S: AsFd, -{ +pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result { // use splice to check that input or output is pipe which is efficient let fallback = match splice(&source, dest, MAX_ROOTLESS_PIPE_SIZE) { Ok(_) => splice_unbounded(source, dest)?, @@ -169,11 +164,7 @@ where /// return actually sent bytes #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn send_n_bytes( - input: impl Read + AsFd, - mut target: impl Write + AsFd, - n: u64, -) -> std::io::Result { +pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result { static PIPE_CACHE: OnceLock> = OnceLock::new(); let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize); let mut n = n; @@ -222,10 +213,10 @@ pub fn send_n_bytes( } } else { debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); - // drain pipe before fallback to raw write + // use read_to_end to drain pipe at this fallback for the case write failed let mut drain = Vec::with_capacity(s); broker_r.take(s as u64).read_to_end(&mut drain)?; - crate::io::RawWriter(&target).write_all(&drain)?; + RawWriter(&target).write_all(&drain)?; break true; } } @@ -239,17 +230,9 @@ pub fn send_n_bytes( if !fallback { return Ok(bytes_written); } - let mut reader = input.take(n); - let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation - loop { - match reader.read(&mut buf)? { - 0 => return Ok(bytes_written), - n => { - target.write_all(&buf[..n])?; - bytes_written += n as u64; - } - } - } + // do not buffer at this fallback, or order of output would be wrong with multiple input + bytes_written += std::io::copy(&mut RawReader(input).take(n), &mut RawWriter(target))?; + Ok(bytes_written) } /// Return verified /dev/null