Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/uu/tail/src/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ fn print_target_section<
}
} else {
#[cfg(any(target_os = "linux", target_os = "android"))]
if uucore::pipes::splice_unbounded_broker(file, &mut stdout)? {
if uucore::pipes::splice_unbounded_broker(file, &mut stdout)?.is_err() {
io::copy(file, &mut stdout)?;
}
#[cfg(not(any(target_os = "linux", target_os = "android")))]
Expand Down
35 changes: 9 additions & 26 deletions src/uucore/src/lib/features/buf_copy/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@
// file that was distributed with this source code.

//! Buffer-based copying implementation for Linux and Android.

use crate::error::UResult;

/// Buffer-based copying utilities for unix (excluding Linux).
use std::{
io::{Read, Write},
os::fd::AsFd,
};
use std::os::fd::AsFd;

/// Copy data from `Read` implementor `source` into a `Write` implementor
/// `dest`. This works by reading a chunk of data from `source` and writing the
Expand All @@ -24,26 +17,16 @@ use std::{
/// # Arguments
/// * `source` - `Read` implementor to copy data from.
/// * `dest` - `Write` implementor to copy data to.
pub fn copy_stream<R, S>(src: &mut R, dest: &mut S) -> UResult<()>
where
R: Read + AsFd,
S: Write + AsFd,
{
pub fn copy_stream(
src: &mut (impl std::io::Read + AsFd),
dest: &mut impl AsFd,
) -> crate::error::UResult<()> {
// If we're on Linux or Android, try to use the splice() system call
// for faster writing. If it works, we're done.
// todo: bypass broker pipe this if input or output is pipe. We use this mostly for stream.
if !crate::pipes::splice_unbounded_broker(src, dest)? {
return Ok(());
if crate::pipes::splice_unbounded_auto(src, dest)? {
// If the splice() call failed, fall back on writing "without buffering", or order of output would be wrong
// unrelated for cp /dev/stdin since cp does not have multiple input? <https://github.com/uutils/coreutils/issues/5186>
std::io::copy(src, &mut crate::io::RawWriter(dest))?;
}

// If the splice() call failed, fall back on slower writing.
std::io::copy(src, dest)?;

// If the splice() call failed and there has been some data written to
// stdout via while loop above AND there will be second splice() call
// that will succeed, data pushed through splice will be output before
// the data buffered in stdout.lock. Therefore additional explicit flush
// is required here.
dest.flush()?;
Ok(())
}
27 changes: 16 additions & 11 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,24 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> rustix::io:
Ok(())
}

/// force-splice source to dest even both of them are not pipe
/// return true if we need read/write fallback
/// force-splice source to dest even both of them are not pipe via broker pipe
/// returns Ok(Ok(())) if splice succeeds
/// returns Ok(Err()) if splice failed, but you can fallback to read/write
/// returns std::io::Result if splice from broker failed and read/write fallback from broker failed
///
/// Thus, ?.is_err() returns serious error at early stage and checks that you can fallback
/// This should not be used if one of them are pipe to save resources
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
pub fn splice_unbounded_broker(
source: &impl AsFd,
dest: &mut impl AsFd,
) -> std::io::Result<Result<(), ()>> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
.as_ref()
let Some((pipe_rd, pipe_wr)) =
PIPE_CACHE.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
else {
return Ok(true);
return Ok(Err(()));
};
// improve throughput
// no need to increase pipe size of input fd since
Expand All @@ -122,7 +127,7 @@ pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std:

loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(false),
Ok(0) => return Ok(Ok(())),
Ok(n) => {
if splice_exact(&pipe_rd, dest, n).is_err() {
// If the first splice manages to copy to the intermediate
Expand All @@ -135,10 +140,10 @@ pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std:
let mut drain = Vec::with_capacity(n);
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
RawWriter(&dest).write_all(&drain)?;
return Ok(true);
return Ok(Err(()));
}
}
Err(_) => return Ok(true),
Err(_) => return Ok(Err(())),
}
}
}
Expand All @@ -153,7 +158,7 @@ pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> std::i
// use splice to check that input or output is pipe which is efficient
let fallback = match splice(&source, dest, MAX_ROOTLESS_PIPE_SIZE) {
Ok(_) => splice_unbounded(source, dest).is_err(),
_ => splice_unbounded_broker(source, dest)?,
_ => splice_unbounded_broker(source, dest)?.is_err(),
};
Ok(fallback)
}
Expand Down
Loading