diff --git a/library/std/src/fs.rs b/library/std/src/fs.rs index c256f556b3c8f..a4123cc15b876 100644 --- a/library/std/src/fs.rs +++ b/library/std/src/fs.rs @@ -1656,7 +1656,7 @@ pub fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> /// the length of the `to` file as reported by `metadata`. /// /// If you’re wanting to copy the contents of one file to another and you’re -/// working with [`File`]s, see the [`io::copy`] function. +/// working with [`File`]s, see the [`io::copy()`] function. /// /// # Platform-specific behavior /// diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs new file mode 100644 index 0000000000000..b88bca2f2b4ff --- /dev/null +++ b/library/std/src/io/copy.rs @@ -0,0 +1,88 @@ +use crate::io::{self, ErrorKind, Read, Write}; +use crate::mem::MaybeUninit; + +/// Copies the entire contents of a reader into a writer. +/// +/// This function will continuously read data from `reader` and then +/// write it into `writer` in a streaming fashion until `reader` +/// returns EOF. +/// +/// On success, the total number of bytes that were copied from +/// `reader` to `writer` is returned. +/// +/// If you’re wanting to copy the contents of one file to another and you’re +/// working with filesystem paths, see the [`fs::copy`] function. +/// +/// [`fs::copy`]: crate::fs::copy +/// +/// # Errors +/// +/// This function will return an error immediately if any call to [`read`] or +/// [`write`] returns an error. All instances of [`ErrorKind::Interrupted`] are +/// handled by this function and the underlying operation is retried. +/// +/// [`read`]: Read::read +/// [`write`]: Write::write +/// +/// # Examples +/// +/// ``` +/// use std::io; +/// +/// fn main() -> io::Result<()> { +/// let mut reader: &[u8] = b"hello"; +/// let mut writer: Vec = vec![]; +/// +/// io::copy(&mut reader, &mut writer)?; +/// +/// assert_eq!(&b"hello"[..], &writer[..]); +/// Ok(()) +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub fn copy(reader: &mut R, writer: &mut W) -> io::Result +where + R: Read, + W: Write, +{ + cfg_if::cfg_if! { + if #[cfg(any(target_os = "linux", target_os = "android"))] { + crate::sys::kernel_copy::copy_spec(reader, writer) + } else { + generic_copy(reader, writer) + } + } +} + +/// The general read-write-loop implementation of +/// `io::copy` that is used when specializations are not available or not applicable. +pub(crate) fn generic_copy(reader: &mut R, writer: &mut W) -> io::Result +where + R: Read, + W: Write, +{ + let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit(); + // FIXME: #42788 + // + // - This creates a (mut) reference to a slice of + // _uninitialized_ integers, which is **undefined behavior** + // + // - Only the standard library gets to soundly "ignore" this, + // based on its privileged knowledge of unstable rustc + // internals; + unsafe { + reader.initializer().initialize(buf.assume_init_mut()); + } + + let mut written = 0; + loop { + let len = match reader.read(unsafe { buf.assume_init_mut() }) { + Ok(0) => return Ok(written), + Ok(len) => len, + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + writer.write_all(unsafe { &buf.assume_init_ref()[..len] })?; + written += len as u64; + } +} diff --git a/library/std/src/io/mod.rs b/library/std/src/io/mod.rs index e6efe6ec57eea..57413f9bc4063 100644 --- a/library/std/src/io/mod.rs +++ b/library/std/src/io/mod.rs @@ -266,6 +266,8 @@ pub use self::buffered::IntoInnerError; #[stable(feature = "rust1", since = "1.0.0")] pub use self::buffered::{BufReader, BufWriter, LineWriter}; #[stable(feature = "rust1", since = "1.0.0")] +pub use self::copy::copy; +#[stable(feature = "rust1", since = "1.0.0")] pub use self::cursor::Cursor; #[stable(feature = "rust1", since = "1.0.0")] pub use self::error::{Error, ErrorKind, Result}; @@ -279,11 +281,12 @@ pub use self::stdio::{_eprint, _print}; #[doc(no_inline, hidden)] pub use self::stdio::{set_panic, set_print, LocalOutput}; #[stable(feature = "rust1", since = "1.0.0")] -pub use self::util::{copy, empty, repeat, sink, Empty, Repeat, Sink}; +pub use self::util::{empty, repeat, sink, Empty, Repeat, Sink}; pub(crate) use self::stdio::clone_io; mod buffered; +pub(crate) mod copy; mod cursor; mod error; mod impls; diff --git a/library/std/src/io/stdio.rs b/library/std/src/io/stdio.rs index 2eb5fb4528620..8fbce09dd6362 100644 --- a/library/std/src/io/stdio.rs +++ b/library/std/src/io/stdio.rs @@ -409,6 +409,14 @@ impl Read for Stdin { } } +// only used by platform-dependent io::copy specializations, i.e. unused on some platforms +#[cfg(any(target_os = "linux", target_os = "android"))] +impl StdinLock<'_> { + pub(crate) fn as_mut_buf(&mut self) -> &mut BufReader { + &mut self.inner + } +} + #[stable(feature = "rust1", since = "1.0.0")] impl Read for StdinLock<'_> { fn read(&mut self, buf: &mut [u8]) -> io::Result { diff --git a/library/std/src/io/tests.rs b/library/std/src/io/tests.rs index 913b28538b7c4..f176c2f088cb3 100644 --- a/library/std/src/io/tests.rs +++ b/library/std/src/io/tests.rs @@ -1,7 +1,7 @@ use super::{repeat, Cursor, SeekFrom}; use crate::cmp::{self, min}; -use crate::io::prelude::*; use crate::io::{self, IoSlice, IoSliceMut}; +use crate::io::{BufRead, Read, Seek, Write}; use crate::ops::Deref; #[test] diff --git a/library/std/src/io/util.rs b/library/std/src/io/util.rs index 2b1f371129eaf..db845457c9672 100644 --- a/library/std/src/io/util.rs +++ b/library/std/src/io/util.rs @@ -4,78 +4,7 @@ mod tests; use crate::fmt; -use crate::io::{self, BufRead, ErrorKind, Initializer, IoSlice, IoSliceMut, Read, Write}; -use crate::mem::MaybeUninit; - -/// Copies the entire contents of a reader into a writer. -/// -/// This function will continuously read data from `reader` and then -/// write it into `writer` in a streaming fashion until `reader` -/// returns EOF. -/// -/// On success, the total number of bytes that were copied from -/// `reader` to `writer` is returned. -/// -/// If you’re wanting to copy the contents of one file to another and you’re -/// working with filesystem paths, see the [`fs::copy`] function. -/// -/// [`fs::copy`]: crate::fs::copy -/// -/// # Errors -/// -/// This function will return an error immediately if any call to [`read`] or -/// [`write`] returns an error. All instances of [`ErrorKind::Interrupted`] are -/// handled by this function and the underlying operation is retried. -/// -/// [`read`]: Read::read -/// [`write`]: Write::write -/// -/// # Examples -/// -/// ``` -/// use std::io; -/// -/// fn main() -> io::Result<()> { -/// let mut reader: &[u8] = b"hello"; -/// let mut writer: Vec = vec![]; -/// -/// io::copy(&mut reader, &mut writer)?; -/// -/// assert_eq!(&b"hello"[..], &writer[..]); -/// Ok(()) -/// } -/// ``` -#[stable(feature = "rust1", since = "1.0.0")] -pub fn copy(reader: &mut R, writer: &mut W) -> io::Result -where - R: Read, - W: Write, -{ - let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit(); - // FIXME: #42788 - // - // - This creates a (mut) reference to a slice of - // _uninitialized_ integers, which is **undefined behavior** - // - // - Only the standard library gets to soundly "ignore" this, - // based on its privileged knowledge of unstable rustc - // internals; - unsafe { - reader.initializer().initialize(buf.assume_init_mut()); - } - - let mut written = 0; - loop { - let len = match reader.read(unsafe { buf.assume_init_mut() }) { - Ok(0) => return Ok(written), - Ok(len) => len, - Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => return Err(e), - }; - writer.write_all(unsafe { &buf.assume_init_ref()[..len] })?; - written += len as u64; - } -} +use crate::io::{self, BufRead, Initializer, IoSlice, IoSliceMut, Read, Write}; /// A reader which is always at EOF. /// diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index fa2608e6fb6d3..ffc9cf3f2eb14 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -317,6 +317,7 @@ #![feature(toowned_clone_into)] #![feature(total_cmp)] #![feature(trace_macros)] +#![feature(try_blocks)] #![feature(try_reserve)] #![feature(unboxed_closures)] #![feature(unsafe_block_in_unsafe_fn)] diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index 96594095cc36d..13cf930379cbc 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1204,88 +1204,19 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { #[cfg(any(target_os = "linux", target_os = "android"))] pub fn copy(from: &Path, to: &Path) -> io::Result { - use crate::cmp; - use crate::sync::atomic::{AtomicBool, Ordering}; - - // Kernel prior to 4.5 don't have copy_file_range - // We store the availability in a global to avoid unnecessary syscalls - static HAS_COPY_FILE_RANGE: AtomicBool = AtomicBool::new(true); - - unsafe fn copy_file_range( - fd_in: libc::c_int, - off_in: *mut libc::loff_t, - fd_out: libc::c_int, - off_out: *mut libc::loff_t, - len: libc::size_t, - flags: libc::c_uint, - ) -> libc::c_long { - libc::syscall(libc::SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags) - } - let (mut reader, reader_metadata) = open_from(from)?; let max_len = u64::MAX; let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?; - let has_copy_file_range = HAS_COPY_FILE_RANGE.load(Ordering::Relaxed); - let mut written = 0u64; - while written < max_len { - let copy_result = if has_copy_file_range { - let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64) as usize; - let copy_result = unsafe { - // We actually don't have to adjust the offsets, - // because copy_file_range adjusts the file offset automatically - cvt(copy_file_range( - reader.as_raw_fd(), - ptr::null_mut(), - writer.as_raw_fd(), - ptr::null_mut(), - bytes_to_copy, - 0, - )) - }; - if let Err(ref copy_err) = copy_result { - match copy_err.raw_os_error() { - Some(libc::ENOSYS | libc::EPERM | libc::EOPNOTSUPP) => { - HAS_COPY_FILE_RANGE.store(false, Ordering::Relaxed); - } - _ => {} - } - } - copy_result - } else { - Err(io::Error::from_raw_os_error(libc::ENOSYS)) - }; - match copy_result { - Ok(0) if written == 0 => { - // fallback to work around several kernel bugs where copy_file_range will fail to - // copy any bytes and return 0 instead of an error if - // - reading virtual files from the proc filesystem which appear to have 0 size - // but are not empty. noted in coreutils to affect kernels at least up to 5.6.19. - // - copying from an overlay filesystem in docker. reported to occur on fedora 32. - return io::copy(&mut reader, &mut writer); - } - Ok(0) => return Ok(written), // reached EOF - Ok(ret) => written += ret as u64, - Err(err) => { - match err.raw_os_error() { - Some( - libc::ENOSYS | libc::EXDEV | libc::EINVAL | libc::EPERM | libc::EOPNOTSUPP, - ) => { - // Try fallback io::copy if either: - // - Kernel version is < 4.5 (ENOSYS) - // - Files are mounted on different fs (EXDEV) - // - copy_file_range is broken in various ways on RHEL/CentOS 7 (EOPNOTSUPP) - // - copy_file_range is disallowed, for example by seccomp (EPERM) - // - copy_file_range cannot be used with pipes or device nodes (EINVAL) - assert_eq!(written, 0); - return io::copy(&mut reader, &mut writer); - } - _ => return Err(err), - } - } - } + use super::kernel_copy::{copy_regular_files, CopyResult}; + + match copy_regular_files(reader.as_raw_fd(), writer.as_raw_fd(), max_len) { + CopyResult::Ended(result) => result, + CopyResult::Fallback(written) => match io::copy::generic_copy(&mut reader, &mut writer) { + Ok(bytes) => Ok(bytes + written), + Err(e) => Err(e), + }, } - Ok(written) } #[cfg(any(target_os = "macos", target_os = "ios"))] diff --git a/library/std/src/sys/unix/kernel_copy.rs b/library/std/src/sys/unix/kernel_copy.rs new file mode 100644 index 0000000000000..ac2fcfcb53f72 --- /dev/null +++ b/library/std/src/sys/unix/kernel_copy.rs @@ -0,0 +1,603 @@ +//! This module contains specializations that can offload `io::copy()` operations on file descriptor +//! containing types (`File`, `TcpStream`, etc.) to more efficient syscalls than `read(2)` and `write(2)`. +//! +//! Specialization is only applied to wholly std-owned types so that user code can't observe +//! that the `Read` and `Write` traits are not used. +//! +//! Since a copy operation involves a reader and writer side where each can consist of different types +//! and also involve generic wrappers (e.g. `Take`, `BufReader`) it is not practical to specialize +//! a single method on all possible combinations. +//! +//! Instead readers and writers are handled separately by the `CopyRead` and `CopyWrite` specialization +//! traits and then specialized on by the `Copier::copy` method. +//! +//! `Copier` uses the specialization traits to unpack the underlying file descriptors and +//! additional prerequisites and constraints imposed by the wrapper types. +//! +//! Once it has obtained all necessary pieces and brought any wrapper types into a state where they +//! can be safely bypassed it will attempt to use the `copy_file_range(2)`, +//! `sendfile(2)` or `splice(2)` syscalls to move data directly between file descriptors. +//! Since those syscalls have requirements that cannot be fully checked in advance and +//! gathering additional information about file descriptors would require additional syscalls +//! anyway it simply attempts to use them one after another (guided by inaccurate hints) to +//! figure out which one works and and falls back to the generic read-write copy loop if none of them +//! does. +//! Once a working syscall is found for a pair of file descriptors it will be called in a loop +//! until the copy operation is completed. +//! +//! Advantages of using these syscalls: +//! +//! * fewer context switches since reads and writes are coalesced into a single syscall +//! and more bytes are transferred per syscall. This translates to higher throughput +//! and fewer CPU cycles, at least for sufficiently large transfers to amortize the initial probing. +//! * `copy_file_range` creates reflink copies on CoW filesystems, thus moving less data and +//! consuming less disk space +//! * `sendfile` and `splice` can perform zero-copy IO under some circumstances while +//! a naive copy loop would move every byte through the CPU. +//! +//! Drawbacks: +//! +//! * copy operations smaller than the default buffer size can under some circumstances, especially +//! on older kernels, incur more syscalls than the naive approach would. As mentioned above +//! the syscall selection is guided by hints to minimize this possibility but they are not perfect. +//! * optimizations only apply to std types. If a user adds a custom wrapper type, e.g. to report +//! progress, they can hit a performance cliff. +//! * complexity + +use crate::cmp::min; +use crate::convert::TryInto; +use crate::fs::{File, Metadata}; +use crate::io::copy::generic_copy; +use crate::io::{ + BufRead, BufReader, BufWriter, Error, Read, Result, StderrLock, StdinLock, StdoutLock, Take, + Write, +}; +use crate::mem::ManuallyDrop; +use crate::net::TcpStream; +use crate::os::unix::fs::FileTypeExt; +use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use crate::process::{ChildStderr, ChildStdin, ChildStdout}; +use crate::ptr; +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sys::cvt; + +#[cfg(test)] +mod tests; + +pub(crate) fn copy_spec( + read: &mut R, + write: &mut W, +) -> Result { + let copier = Copier { read, write }; + SpecCopy::copy(copier) +} + +/// This type represents either the inferred `FileType` of a `RawFd` based on the source +/// type from which it was extracted or the actual metadata +/// +/// The methods on this type only provide hints, due to `AsRawFd` and `FromRawFd` the inferred +/// type may be wrong. +enum FdMeta { + /// We obtained the FD from a type that can contain any type of `FileType` and queried the metadata + /// because it is cheaper than probing all possible syscalls (reader side) + Metadata(Metadata), + Socket, + Pipe, + /// We don't have any metadata, e.g. because the original type was `File` which can represent + /// any `FileType` and we did not query the metadata either since it did not seem beneficial + /// (writer side) + NoneObtained, +} + +impl FdMeta { + fn maybe_fifo(&self) -> bool { + match self { + FdMeta::Metadata(meta) => meta.file_type().is_fifo(), + FdMeta::Socket => false, + FdMeta::Pipe => true, + FdMeta::NoneObtained => true, + } + } + + fn potential_sendfile_source(&self) -> bool { + match self { + // procfs erronously shows 0 length on non-empty readable files. + // and if a file is truly empty then a `read` syscall will determine that and skip the write syscall + // thus there would be benefit from attempting sendfile + FdMeta::Metadata(meta) + if meta.file_type().is_file() && meta.len() > 0 + || meta.file_type().is_block_device() => + { + true + } + _ => false, + } + } + + fn copy_file_range_candidate(&self) -> bool { + match self { + // copy_file_range will fail on empty procfs files. `read` can determine whether EOF has been reached + // without extra cost and skip the write, thus there is no benefit in attempting copy_file_range + FdMeta::Metadata(meta) if meta.is_file() && meta.len() > 0 => true, + FdMeta::NoneObtained => true, + _ => false, + } + } +} + +struct CopyParams(FdMeta, Option); + +struct Copier<'a, 'b, R: Read + ?Sized, W: Write + ?Sized> { + read: &'a mut R, + write: &'b mut W, +} + +trait SpecCopy { + fn copy(self) -> Result; +} + +impl SpecCopy for Copier<'_, '_, R, W> { + default fn copy(self) -> Result { + generic_copy(self.read, self.write) + } +} + +impl SpecCopy for Copier<'_, '_, R, W> { + fn copy(self) -> Result { + let (reader, writer) = (self.read, self.write); + let r_cfg = reader.properties(); + let w_cfg = writer.properties(); + + // before direct operations on file descriptors ensure that all source and sink buffers are empty + let mut flush = || -> crate::io::Result { + let bytes = reader.drain_to(writer, u64::MAX)?; + // BufWriter buffered bytes have already been accounted for in earlier write() calls + writer.flush()?; + Ok(bytes) + }; + + let mut written = 0u64; + + if let (CopyParams(input_meta, Some(readfd)), CopyParams(output_meta, Some(writefd))) = + (r_cfg, w_cfg) + { + written += flush()?; + let max_write = reader.min_limit(); + + if input_meta.copy_file_range_candidate() && output_meta.copy_file_range_candidate() { + let result = copy_regular_files(readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(bytes) => written += bytes, + } + } + + // on modern kernels sendfile can copy from any mmapable type (some but not all regular files and block devices) + // to any writable file descriptor. On older kernels the writer side can only be a socket. + // So we just try and fallback if needed. + // If current file offsets + write sizes overflow it may also fail, we do not try to fix that and instead + // fall back to the generic copy loop. + if input_meta.potential_sendfile_source() { + let result = sendfile_splice(SpliceMode::Sendfile, readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(bytes) => written += bytes, + } + } + + if input_meta.maybe_fifo() || output_meta.maybe_fifo() { + let result = sendfile_splice(SpliceMode::Splice, readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(0) => { /* use the fallback below */ } + CopyResult::Fallback(_) => { + unreachable!("splice should not return > 0 bytes on the fallback path") + } + } + } + } + + // fallback if none of the more specialized syscalls wants to work with these file descriptors + match generic_copy(reader, writer) { + Ok(bytes) => Ok(bytes + written), + err => err, + } + } +} + +#[rustc_specialization_trait] +trait CopyRead: Read { + /// Implementations that contain buffers (i.e. `BufReader`) must transfer data from their internal + /// buffers into `writer` until either the buffers are emptied or `limit` bytes have been + /// transferred, whichever occurs sooner. + /// If nested buffers are present the outer buffers must be drained first. + /// + /// This is necessary to directly bypass the wrapper types while preserving the data order + /// when operating directly on the underlying file descriptors. + fn drain_to(&mut self, _writer: &mut W, _limit: u64) -> Result { + Ok(0) + } + + /// The minimum of the limit of all `Take<_>` wrappers, `u64::MAX` otherwise. + /// This method does not account for data `BufReader` buffers and would underreport + /// the limit of a `Take>>` type. Thus its result is only valid + /// after draining the buffers via `drain_to`. + fn min_limit(&self) -> u64 { + u64::MAX + } + + /// Extracts the file descriptor and hints/metadata, delegating through wrappers if necessary. + fn properties(&self) -> CopyParams; +} + +#[rustc_specialization_trait] +trait CopyWrite: Write { + /// Extracts the file descriptor and hints/metadata, delegating through wrappers if necessary. + fn properties(&self) -> CopyParams; +} + +impl CopyRead for &mut T +where + T: CopyRead, +{ + fn drain_to(&mut self, writer: &mut W, limit: u64) -> Result { + (**self).drain_to(writer, limit) + } + + fn min_limit(&self) -> u64 { + (**self).min_limit() + } + + fn properties(&self) -> CopyParams { + (**self).properties() + } +} + +impl CopyWrite for &mut T +where + T: CopyWrite, +{ + fn properties(&self) -> CopyParams { + (**self).properties() + } +} + +impl CopyRead for File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } +} + +impl CopyRead for &File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(*self), Some(self.as_raw_fd())) + } +} + +impl CopyWrite for File { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for &File { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyRead for TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyRead for &TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for &TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for ChildStdin { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } +} + +impl CopyRead for ChildStdout { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } +} + +impl CopyRead for ChildStderr { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } +} + +impl CopyRead for StdinLock<'_> { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let buf_reader = self.as_mut_buf(); + let buf = buf_reader.buffer(); + let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; + let bytes_drained = buf.len(); + writer.write_all(buf)?; + buf_reader.consume(bytes_drained); + + Ok(bytes_drained as u64) + } + + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } +} + +impl CopyWrite for StdoutLock<'_> { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for StderrLock<'_> { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyRead for Take { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let local_limit = self.limit(); + let combined_limit = min(outer_limit, local_limit); + let bytes_drained = self.get_mut().drain_to(writer, combined_limit)?; + // update limit since read() was bypassed + self.set_limit(local_limit - bytes_drained); + + Ok(bytes_drained) + } + + fn min_limit(&self) -> u64 { + min(Take::limit(self), self.get_ref().min_limit()) + } + + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } +} + +impl CopyRead for BufReader { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let buf = self.buffer(); + let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; + let bytes = buf.len(); + writer.write_all(buf)?; + self.consume(bytes); + + let remaining = outer_limit - bytes as u64; + + // in case of nested bufreaders we also need to drain the ones closer to the source + let inner_bytes = self.get_mut().drain_to(writer, remaining)?; + + Ok(bytes as u64 + inner_bytes) + } + + fn min_limit(&self) -> u64 { + self.get_ref().min_limit() + } + + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } +} + +impl CopyWrite for BufWriter { + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } +} + +fn fd_to_meta(fd: &T) -> FdMeta { + let fd = fd.as_raw_fd(); + let file: ManuallyDrop = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); + match file.metadata() { + Ok(meta) => FdMeta::Metadata(meta), + Err(_) => FdMeta::NoneObtained, + } +} + +pub(super) enum CopyResult { + Ended(Result), + Fallback(u64), +} + +/// linux-specific implementation that will attempt to use copy_file_range for copy offloading +/// as the name says, it only works on regular files +/// +/// Callers must handle fallback to a generic copy loop. +/// `Fallback` may indicate non-zero number of bytes already written +/// if one of the files' cursor +`max_len` would exceed u64::MAX (`EOVERFLOW`). +pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { + use crate::cmp; + + // Kernel prior to 4.5 don't have copy_file_range + // We store the availability in a global to avoid unnecessary syscalls + static HAS_COPY_FILE_RANGE: AtomicBool = AtomicBool::new(true); + + unsafe fn copy_file_range( + fd_in: libc::c_int, + off_in: *mut libc::loff_t, + fd_out: libc::c_int, + off_out: *mut libc::loff_t, + len: libc::size_t, + flags: libc::c_uint, + ) -> libc::c_long { + libc::syscall(libc::SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags) + } + + let has_copy_file_range = HAS_COPY_FILE_RANGE.load(Ordering::Relaxed); + let mut written = 0u64; + while written < max_len { + let copy_result = if has_copy_file_range { + let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64); + // cap to 1GB chunks in case u64::MAX is passed as max_len and the file has a non-zero seek position + // this allows us to copy large chunks without hitting EOVERFLOW, + // unless someone sets a file offset close to u64::MAX - 1GB, in which case a fallback would be required + let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x4000_0000usize); + let copy_result = unsafe { + // We actually don't have to adjust the offsets, + // because copy_file_range adjusts the file offset automatically + cvt(copy_file_range( + reader, + ptr::null_mut(), + writer, + ptr::null_mut(), + bytes_to_copy, + 0, + )) + }; + if let Err(ref copy_err) = copy_result { + match copy_err.raw_os_error() { + Some(libc::ENOSYS | libc::EPERM | libc::EOPNOTSUPP) => { + HAS_COPY_FILE_RANGE.store(false, Ordering::Relaxed); + } + _ => {} + } + } + copy_result + } else { + Err(Error::from_raw_os_error(libc::ENOSYS)) + }; + match copy_result { + Ok(0) if written == 0 => { + // fallback to work around several kernel bugs where copy_file_range will fail to + // copy any bytes and return 0 instead of an error if + // - reading virtual files from the proc filesystem which appear to have 0 size + // but are not empty. noted in coreutils to affect kernels at least up to 5.6.19. + // - copying from an overlay filesystem in docker. reported to occur on fedora 32. + return CopyResult::Fallback(0); + } + Ok(0) => return CopyResult::Ended(Ok(written)), // reached EOF + Ok(ret) => written += ret as u64, + Err(err) => { + return match err.raw_os_error() { + // when file offset + max_length > u64::MAX + Some(libc::EOVERFLOW) => CopyResult::Fallback(written), + Some( + libc::ENOSYS | libc::EXDEV | libc::EINVAL | libc::EPERM | libc::EOPNOTSUPP, + ) => { + // Try fallback io::copy if either: + // - Kernel version is < 4.5 (ENOSYS) + // - Files are mounted on different fs (EXDEV) + // - copy_file_range is broken in various ways on RHEL/CentOS 7 (EOPNOTSUPP) + // - copy_file_range is disallowed, for example by seccomp (EPERM) + // - copy_file_range cannot be used with pipes or device nodes (EINVAL) + assert_eq!(written, 0); + CopyResult::Fallback(0) + } + _ => CopyResult::Ended(Err(err)), + }; + } + } + } + CopyResult::Ended(Ok(written)) +} + +#[derive(PartialEq)] +enum SpliceMode { + Sendfile, + Splice, +} + +/// performs splice or sendfile between file descriptors +/// Does _not_ fall back to a generic copy loop. +fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> CopyResult { + static HAS_SENDFILE: AtomicBool = AtomicBool::new(true); + static HAS_SPLICE: AtomicBool = AtomicBool::new(true); + + syscall! { + fn splice( + srcfd: libc::c_int, + src_offset: *const i64, + dstfd: libc::c_int, + dst_offset: *const i64, + len: libc::size_t, + flags: libc::c_int + ) -> libc::ssize_t + } + + match mode { + SpliceMode::Sendfile if !HAS_SENDFILE.load(Ordering::Relaxed) => { + return CopyResult::Fallback(0); + } + SpliceMode::Splice if !HAS_SPLICE.load(Ordering::Relaxed) => { + return CopyResult::Fallback(0); + } + _ => (), + } + + let mut written = 0u64; + while written < len { + // according to its manpage that's the maximum size sendfile() will copy per invocation + let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; + + let result = match mode { + SpliceMode::Sendfile => { + cvt(unsafe { libc::sendfile(writer, reader, ptr::null_mut(), chunk_size) }) + } + SpliceMode::Splice => cvt(unsafe { + splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0) + }), + }; + + match result { + Ok(0) => break, // EOF + Ok(ret) => written += ret as u64, + Err(err) => { + return match err.raw_os_error() { + Some(libc::ENOSYS | libc::EPERM) => { + // syscall not supported (ENOSYS) + // syscall is disallowed, e.g. by seccomp (EPERM) + match mode { + SpliceMode::Sendfile => HAS_SENDFILE.store(false, Ordering::Relaxed), + SpliceMode::Splice => HAS_SPLICE.store(false, Ordering::Relaxed), + } + assert_eq!(written, 0); + CopyResult::Fallback(0) + } + Some(libc::EINVAL) => { + // splice/sendfile do not support this particular file descriptor (EINVAL) + assert_eq!(written, 0); + CopyResult::Fallback(0) + } + Some(os_err) if mode == SpliceMode::Sendfile && os_err == libc::EOVERFLOW => { + CopyResult::Fallback(written) + } + _ => CopyResult::Ended(Err(err)), + }; + } + } + } + CopyResult::Ended(Ok(written)) +} diff --git a/library/std/src/sys/unix/kernel_copy/tests.rs b/library/std/src/sys/unix/kernel_copy/tests.rs new file mode 100644 index 0000000000000..21b121c26fffc --- /dev/null +++ b/library/std/src/sys/unix/kernel_copy/tests.rs @@ -0,0 +1,213 @@ +use crate::env::temp_dir; +use crate::fs::OpenOptions; +use crate::io; +use crate::io::Result; +use crate::io::SeekFrom; +use crate::io::{BufRead, Read, Seek, Write}; +use crate::os::unix::io::AsRawFd; + +#[test] +fn copy_specialization() -> Result<()> { + use crate::io::{BufReader, BufWriter}; + + let path = crate::env::temp_dir(); + let source_path = path.join("copy-spec.source"); + let sink_path = path.join("copy-spec.sink"); + + let result: Result<()> = try { + let mut source = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&source_path)?; + source.write_all(b"abcdefghiklmnopqr")?; + source.seek(SeekFrom::Start(8))?; + let mut source = BufReader::with_capacity(8, source.take(5)); + source.fill_buf()?; + assert_eq!(source.buffer(), b"iklmn"); + source.get_mut().set_limit(6); + source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" + let mut source = source.take(10); // "iklmnbcdef" + + let mut sink = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&sink_path)?; + sink.write_all(b"000000")?; + let mut sink = BufWriter::with_capacity(5, sink); + sink.write_all(b"wxyz")?; + assert_eq!(sink.buffer(), b"wxyz"); + + let copied = crate::io::copy(&mut source, &mut sink)?; + assert_eq!(copied, 10); + assert_eq!(sink.buffer().len(), 0); + + let mut sink = sink.into_inner()?; + sink.seek(SeekFrom::Start(0))?; + let mut copied = Vec::new(); + sink.read_to_end(&mut copied)?; + assert_eq!(&copied, b"000000wxyziklmnbcdef"); + }; + + let rm1 = crate::fs::remove_file(source_path); + let rm2 = crate::fs::remove_file(sink_path); + + result.and(rm1).and(rm2) +} + +#[bench] +fn bench_file_to_file_copy(b: &mut test::Bencher) { + const BYTES: usize = 128 * 1024; + let src_path = temp_dir().join("file-copy-bench-src"); + let mut src = crate::fs::OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(src_path) + .unwrap(); + src.write(&vec![0u8; BYTES]).unwrap(); + + let sink_path = temp_dir().join("file-copy-bench-sink"); + let mut sink = crate::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(sink_path) + .unwrap(); + + b.bytes = BYTES as u64; + b.iter(|| { + src.seek(SeekFrom::Start(0)).unwrap(); + sink.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); + }); +} + +#[bench] +fn bench_file_to_socket_copy(b: &mut test::Bencher) { + const BYTES: usize = 128 * 1024; + let src_path = temp_dir().join("pipe-copy-bench-src"); + let mut src = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(src_path) + .unwrap(); + src.write(&vec![0u8; BYTES]).unwrap(); + + let sink_drainer = crate::net::TcpListener::bind("localhost:0").unwrap(); + let mut sink = crate::net::TcpStream::connect(sink_drainer.local_addr().unwrap()).unwrap(); + let mut sink_drainer = sink_drainer.accept().unwrap().0; + + crate::thread::spawn(move || { + let mut sink_buf = vec![0u8; 1024 * 1024]; + loop { + sink_drainer.read(&mut sink_buf[..]).unwrap(); + } + }); + + b.bytes = BYTES as u64; + b.iter(|| { + src.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); + }); +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +#[bench] +fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { + use super::CopyResult; + use crate::io::ErrorKind; + use crate::process::{ChildStdin, ChildStdout}; + use crate::sys_common::FromInner; + + let (read_end, write_end) = crate::sys::pipe::anon_pipe().unwrap(); + + let mut read_end = ChildStdout::from_inner(read_end); + let write_end = ChildStdin::from_inner(write_end); + + let acceptor = crate::net::TcpListener::bind("localhost:0").unwrap(); + let mut remote_end = crate::net::TcpStream::connect(acceptor.local_addr().unwrap()).unwrap(); + + let local_end = crate::sync::Arc::new(acceptor.accept().unwrap().0); + + // the data flow in this benchmark: + // + // socket(tx) local_source + // remote_end (write) +--------> (splice to) + // write_end + // + + // | + // | pipe + // v + // read_end + // remote_end (read) <---------+ (splice to) * + // socket(rx) local_end + // + // * benchmark loop using io::copy + + crate::thread::spawn(move || { + let mut sink_buf = vec![0u8; 1024 * 1024]; + remote_end.set_nonblocking(true).unwrap(); + loop { + match remote_end.write(&mut sink_buf[..]) { + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Ok(_) => {} + err => { + err.expect("write failed"); + } + }; + match remote_end.read(&mut sink_buf[..]) { + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Ok(_) => {} + err => { + err.expect("read failed"); + } + }; + } + }); + + // check that splice works, otherwise the benchmark would hang + let probe = super::sendfile_splice( + super::SpliceMode::Splice, + local_end.as_raw_fd(), + write_end.as_raw_fd(), + 1, + ); + + match probe { + CopyResult::Ended(Ok(1)) => { + // splice works + } + _ => { + eprintln!("splice failed, skipping benchmark"); + return; + } + } + + let local_source = local_end.clone(); + crate::thread::spawn(move || { + loop { + super::sendfile_splice( + super::SpliceMode::Splice, + local_source.as_raw_fd(), + write_end.as_raw_fd(), + u64::MAX, + ); + } + }); + + const BYTES: usize = 128 * 1024; + b.bytes = BYTES as u64; + b.iter(|| { + assert_eq!( + BYTES as u64, + io::copy(&mut (&mut read_end).take(BYTES as u64), &mut &*local_end).unwrap() + ); + }); +} diff --git a/library/std/src/sys/unix/mod.rs b/library/std/src/sys/unix/mod.rs index b28c6d85b7c72..7609afbdd76ac 100644 --- a/library/std/src/sys/unix/mod.rs +++ b/library/std/src/sys/unix/mod.rs @@ -51,6 +51,8 @@ pub mod fd; pub mod fs; pub mod futex; pub mod io; +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod kernel_copy; #[cfg(target_os = "l4re")] mod l4re; pub mod memchr;