diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index f70beb4734..166c79c416 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -5,12 +5,14 @@ //! Thin zero-copy-related wrappers around functions from the `rustix` crate. +#[cfg(any(target_os = "linux", target_os = "android"))] +use crate::io::{RawReader, RawWriter}; #[cfg(any(target_os = "linux", target_os = "android"))] use rustix::pipe::{SpliceFlags, fcntl_setpipe_size}; #[cfg(any(target_os = "linux", target_os = "android"))] use std::{ fs::File, - io::{PipeReader, PipeWriter, Read, Write}, + io::{PipeReader, PipeWriter, Read, copy}, os::fd::AsFd, sync::OnceLock, }; @@ -106,11 +108,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re /// This should not be used if one of them are pipe to save resources #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn splice_unbounded_broker(source: &R, dest: &mut S) -> std::io::Result -where - R: Read + AsFd, - S: AsFd, -{ +pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result { static PIPE_CACHE: OnceLock> = OnceLock::new(); let Some((pipe_rd, pipe_wr)) = PIPE_CACHE .get_or_init(|| pipe::(MAX_ROOTLESS_PIPE_SIZE).ok()) @@ -132,12 +130,10 @@ where // If the first splice manages to copy to the intermediate // pipe, but the second splice to stdout fails for some reason // we can recover by copying the data that we have from the - // intermediate pipe to stdout using unbuffered read/write. Then + // intermediate pipe to stdout using read/write. Then // we tell the caller to fall back. - debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); - let mut drain = Vec::with_capacity(n); - pipe_rd.take(n as u64).read_to_end(&mut drain)?; - crate::io::RawWriter(&dest).write_all(&drain)?; + // This should not be buffered, or order of output would be wrong + copy(&mut pipe_rd.take(n as u64), &mut RawWriter(&dest))?; return Ok(true); } } @@ -150,11 +146,7 @@ where /// return actually sent bytes #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn send_n_bytes( - input: impl Read + AsFd, - mut target: impl Write + AsFd, - n: u64, -) -> std::io::Result { +pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result { static PIPE_CACHE: OnceLock> = OnceLock::new(); let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize); let mut n = n; @@ -204,9 +196,7 @@ pub fn send_n_bytes( } else { debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); // drain pipe before fallback to raw write - let mut drain = Vec::with_capacity(s); - broker_r.take(s as u64).read_to_end(&mut drain)?; - crate::io::RawWriter(&target).write_all(&drain)?; + copy(&mut broker_r.take(s as u64), &mut RawWriter(&target))?; break true; } } @@ -220,17 +210,8 @@ pub fn send_n_bytes( if !fallback { return Ok(bytes_written); } - let mut reader = input.take(n); - let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation - loop { - match reader.read(&mut buf)? { - 0 => return Ok(bytes_written), - n => { - target.write_all(&buf[..n])?; - bytes_written += n as u64; - } - } - } + bytes_written += copy(&mut RawReader(input).take(n), &mut RawWriter(target))?; + Ok(bytes_written) } /// Return verified /dev/null