Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/uu/wc/src/count_fast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const FILE_ATTRIBUTE_NORMAL: u32 = 128;
#[cfg(any(target_os = "linux", target_os = "android"))]
use libc::S_IFIFO;
#[cfg(any(target_os = "linux", target_os = "android"))]
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact};
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice};

const BUF_SIZE: usize = 64 * 1024;

Expand Down Expand Up @@ -61,7 +61,9 @@ fn count_bytes_using_splice(fd: &impl AsFd) -> Result<usize, usize> {
Ok(0) => return Ok(byte_count),
Ok(res) => {
byte_count += res;
splice_exact(&pipe_rd, &null_file, res).map_err(|_| byte_count)?;
// pipe to null is not blocked. So this returns res at most cases
// next splice does not hang if we discarded 1+ pages
splice(&pipe_rd, &null_file, res).map_err(|_| byte_count)?;
}
Err(_) => return Err(byte_count),
}
Expand Down
88 changes: 36 additions & 52 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

//! Thin zero-copy-related wrappers around functions from the `rustix` crate.

#[cfg(any(target_os = "linux", target_os = "android"))]
use crate::io::{RawReader, RawWriter};
#[cfg(any(target_os = "linux", target_os = "android"))]
use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::{
fs::File,
io::{PipeReader, PipeWriter, Read, Write},
io::{PipeReader, PipeWriter, Read, copy},
os::fd::AsFd,
sync::OnceLock,
};
Expand Down Expand Up @@ -54,21 +56,19 @@ pub fn splice(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io:
rustix::pipe::splice(source, None, target, None, len, SpliceFlags::empty())
}

/// Splice wrapper which fully finishes the write.
///
/// Exactly `len` bytes are moved from `source` into `target`.
///
/// Panics if `source` runs out of data before `len` bytes have been moved.
/// Splice wrapper tries to send `len` from `source` into `target`.
/// return bytes failed to send
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_exact(source: &impl AsFd, target: &impl AsFd, len: usize) -> std::io::Result<()> {
pub fn splice_exact(source: &impl AsFd, target: &impl AsFd, len: usize) -> usize {
let mut left = len;
while left > 0 {
let written = splice(source, target, left)?;
debug_assert_ne!(written, 0, "unexpected end of data");
left -= written;
match splice(source, target, left) {
Ok(s) => left -= s,
Err(_) => return left,
}
}
Ok(())
left
}

/// check that source is FUSE
Expand Down Expand Up @@ -106,11 +106,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re
/// This should not be used if one of them are pipe to save resources
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
where
R: Read + AsFd,
S: AsFd,
{
pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
Expand All @@ -128,16 +124,15 @@ where
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(false),
Ok(n) => {
if splice_exact(&pipe_rd, dest, n).is_err() {
let lost = splice_exact(&pipe_rd, dest, n);
if lost > 0 {
// If the first splice manages to copy to the intermediate
// pipe, but the second splice to stdout fails for some reason
// pipe, but the N-th splice to stdout fails for some reason
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using unbuffered read/write. Then
// intermediate pipe to stdout using read/write. Then
// we tell the caller to fall back.
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut drain = Vec::with_capacity(n);
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&dest).write_all(&drain)?;
// This should not be buffered, or order of output would be wrong
copy(&mut pipe_rd.take(lost as u64), &mut RawWriter(&dest))?;
return Ok(true);
}
}
Expand All @@ -150,11 +145,7 @@ where
/// return actually sent bytes
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn send_n_bytes(
input: impl Read + AsFd,
mut target: impl Write + AsFd,
n: u64,
) -> std::io::Result<u64> {
pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result<u64> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
let mut n = n;
Expand Down Expand Up @@ -194,20 +185,22 @@ pub fn send_n_bytes(
match splice(&input, &broker_w, n as usize) {
Ok(0) => break might_fuse(&input),
Ok(s) => {
if splice_exact(&broker_r, &target, s).is_ok() {
n -= s as u64;
bytes_written += s as u64;
if n == 0 {
// avoid unnecessary splice for small input
break false;
match splice_exact(&broker_r, &target, s) {
0 => {
n -= s as u64;
bytes_written += s as u64;
if n == 0 {
// avoid unnecessary splice for small input
break false;
}
}
lost => {
bytes_written += (s - lost) as u64;
// drain content of pipe before fallback to raw write
bytes_written +=
copy(&mut broker_r.take(lost as u64), &mut RawWriter(&target))?;
break true;
}
} else {
debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
// drain pipe before fallback to raw write
let mut drain = Vec::with_capacity(s);
broker_r.take(s as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&target).write_all(&drain)?;
break true;
}
}
_ => break true,
Expand All @@ -220,17 +213,8 @@ pub fn send_n_bytes(
if !fallback {
return Ok(bytes_written);
}
let mut reader = input.take(n);
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
loop {
match reader.read(&mut buf)? {
0 => return Ok(bytes_written),
n => {
target.write_all(&buf[..n])?;
bytes_written += n as u64;
}
}
}
bytes_written += copy(&mut RawReader(input).take(n), &mut RawWriter(target))?;
Ok(bytes_written)
}

/// Return verified /dev/null
Expand Down
Loading