diff --git a/src/uu/wc/src/count_fast.rs b/src/uu/wc/src/count_fast.rs index 5de13bb034..5bc1f5b560 100644 --- a/src/uu/wc/src/count_fast.rs +++ b/src/uu/wc/src/count_fast.rs @@ -27,7 +27,7 @@ const FILE_ATTRIBUTE_NORMAL: u32 = 128; #[cfg(any(target_os = "linux", target_os = "android"))] use libc::S_IFIFO; #[cfg(any(target_os = "linux", target_os = "android"))] -use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact}; +use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice}; const BUF_SIZE: usize = 64 * 1024; @@ -61,7 +61,9 @@ fn count_bytes_using_splice(fd: &impl AsFd) -> Result { Ok(0) => return Ok(byte_count), Ok(res) => { byte_count += res; - splice_exact(&pipe_rd, &null_file, res).map_err(|_| byte_count)?; + // pipe to null is not blocked. So this returns res at most cases + // next splice does not hang if we discarded 1+ pages + splice(&pipe_rd, &null_file, res).map_err(|_| byte_count)?; } Err(_) => return Err(byte_count), } diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index f70beb4734..3edc7191e0 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -5,12 +5,14 @@ //! Thin zero-copy-related wrappers around functions from the `rustix` crate. +#[cfg(any(target_os = "linux", target_os = "android"))] +use crate::io::{RawReader, RawWriter}; #[cfg(any(target_os = "linux", target_os = "android"))] use rustix::pipe::{SpliceFlags, fcntl_setpipe_size}; #[cfg(any(target_os = "linux", target_os = "android"))] use std::{ fs::File, - io::{PipeReader, PipeWriter, Read, Write}, + io::{PipeReader, PipeWriter, Read, copy}, os::fd::AsFd, sync::OnceLock, }; @@ -54,21 +56,19 @@ pub fn splice(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io: rustix::pipe::splice(source, None, target, None, len, SpliceFlags::empty()) } -/// Splice wrapper which fully finishes the write. -/// -/// Exactly `len` bytes are moved from `source` into `target`. -/// -/// Panics if `source` runs out of data before `len` bytes have been moved. +/// Splice wrapper tries to send `len` from `source` into `target`. +/// return bytes failed to send #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn splice_exact(source: &impl AsFd, target: &impl AsFd, len: usize) -> std::io::Result<()> { +pub fn splice_exact(source: &impl AsFd, target: &impl AsFd, len: usize) -> usize { let mut left = len; while left > 0 { - let written = splice(source, target, left)?; - debug_assert_ne!(written, 0, "unexpected end of data"); - left -= written; + match splice(source, target, left) { + Ok(s) => left -= s, + Err(_) => return left, + } } - Ok(()) + left } /// check that source is FUSE @@ -106,11 +106,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re /// This should not be used if one of them are pipe to save resources #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn splice_unbounded_broker(source: &R, dest: &mut S) -> std::io::Result -where - R: Read + AsFd, - S: AsFd, -{ +pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result { static PIPE_CACHE: OnceLock> = OnceLock::new(); let Some((pipe_rd, pipe_wr)) = PIPE_CACHE .get_or_init(|| pipe::(MAX_ROOTLESS_PIPE_SIZE).ok()) @@ -128,16 +124,15 @@ where match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) { Ok(0) => return Ok(false), Ok(n) => { - if splice_exact(&pipe_rd, dest, n).is_err() { + let lost = splice_exact(&pipe_rd, dest, n); + if lost > 0 { // If the first splice manages to copy to the intermediate - // pipe, but the second splice to stdout fails for some reason + // pipe, but the N-th splice to stdout fails for some reason // we can recover by copying the data that we have from the - // intermediate pipe to stdout using unbuffered read/write. Then + // intermediate pipe to stdout using read/write. Then // we tell the caller to fall back. - debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); - let mut drain = Vec::with_capacity(n); - pipe_rd.take(n as u64).read_to_end(&mut drain)?; - crate::io::RawWriter(&dest).write_all(&drain)?; + // This should not be buffered, or order of output would be wrong + copy(&mut pipe_rd.take(lost as u64), &mut RawWriter(&dest))?; return Ok(true); } } @@ -150,11 +145,7 @@ where /// return actually sent bytes #[inline] #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn send_n_bytes( - input: impl Read + AsFd, - mut target: impl Write + AsFd, - n: u64, -) -> std::io::Result { +pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result { static PIPE_CACHE: OnceLock> = OnceLock::new(); let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize); let mut n = n; @@ -194,20 +185,22 @@ pub fn send_n_bytes( match splice(&input, &broker_w, n as usize) { Ok(0) => break might_fuse(&input), Ok(s) => { - if splice_exact(&broker_r, &target, s).is_ok() { - n -= s as u64; - bytes_written += s as u64; - if n == 0 { - // avoid unnecessary splice for small input - break false; + match splice_exact(&broker_r, &target, s) { + 0 => { + n -= s as u64; + bytes_written += s as u64; + if n == 0 { + // avoid unnecessary splice for small input + break false; + } + } + lost => { + bytes_written += (s - lost) as u64; + // drain content of pipe before fallback to raw write + bytes_written += + copy(&mut broker_r.take(lost as u64), &mut RawWriter(&target))?; + break true; } - } else { - debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); - // drain pipe before fallback to raw write - let mut drain = Vec::with_capacity(s); - broker_r.take(s as u64).read_to_end(&mut drain)?; - crate::io::RawWriter(&target).write_all(&drain)?; - break true; } } _ => break true, @@ -220,17 +213,8 @@ pub fn send_n_bytes( if !fallback { return Ok(bytes_written); } - let mut reader = input.take(n); - let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation - loop { - match reader.read(&mut buf)? { - 0 => return Ok(bytes_written), - n => { - target.write_all(&buf[..n])?; - bytes_written += n as u64; - } - } - } + bytes_written += copy(&mut RawReader(input).take(n), &mut RawWriter(target))?; + Ok(bytes_written) } /// Return verified /dev/null