Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 11 additions & 30 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

//! Thin zero-copy-related wrappers around functions from the `rustix` crate.

#[cfg(any(target_os = "linux", target_os = "android"))]
use crate::io::{RawReader, RawWriter};
#[cfg(any(target_os = "linux", target_os = "android"))]
use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::{
fs::File,
io::{PipeReader, PipeWriter, Read, Write},
io::{PipeReader, PipeWriter, Read, copy},
os::fd::AsFd,
sync::OnceLock,
};
Expand Down Expand Up @@ -106,11 +108,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re
/// This should not be used if one of them are pipe to save resources
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
where
R: Read + AsFd,
S: AsFd,
{
pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
Expand All @@ -132,12 +130,10 @@ where
// If the first splice manages to copy to the intermediate
// pipe, but the second splice to stdout fails for some reason
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using unbuffered read/write. Then
// intermediate pipe to stdout using read/write. Then
// we tell the caller to fall back.
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut drain = Vec::with_capacity(n);
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&dest).write_all(&drain)?;
// This should not be buffered, or order of output would be wrong
copy(&mut pipe_rd.take(n as u64), &mut RawWriter(&dest))?;
return Ok(true);
}
}
Expand All @@ -150,11 +146,7 @@ where
/// return actually sent bytes
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn send_n_bytes(
input: impl Read + AsFd,
mut target: impl Write + AsFd,
n: u64,
) -> std::io::Result<u64> {
pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result<u64> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
let mut n = n;
Expand Down Expand Up @@ -204,9 +196,7 @@ pub fn send_n_bytes(
} else {
debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
// drain pipe before fallback to raw write
let mut drain = Vec::with_capacity(s);
broker_r.take(s as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&target).write_all(&drain)?;
copy(&mut broker_r.take(s as u64), &mut RawWriter(&target))?;
break true;
}
}
Expand All @@ -220,17 +210,8 @@ pub fn send_n_bytes(
if !fallback {
return Ok(bytes_written);
}
let mut reader = input.take(n);
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
loop {
match reader.read(&mut buf)? {
0 => return Ok(bytes_written),
n => {
target.write_all(&buf[..n])?;
bytes_written += n as u64;
}
}
}
bytes_written += copy(&mut RawReader(input).take(n), &mut RawWriter(target))?;
Ok(bytes_written)
}

/// Return verified /dev/null
Expand Down
Loading