From 16236470c1774f88374bab29d2b9d1875cb97246 Mon Sep 17 00:00:00 2001 From: The8472 Date: Wed, 29 Jul 2020 00:35:01 +0200 Subject: [PATCH 01/13] specialize io::copy to use copy_file_range, splice or sendfile Currently it only applies to linux systems. It can be extended to make use of similar syscalls on other unix systems. --- library/std/src/fs.rs | 2 +- library/std/src/io/copy.rs | 456 +++++++++++++++++++++++++++++++++ library/std/src/io/mod.rs | 7 +- library/std/src/io/stdio.rs | 6 + library/std/src/io/util.rs | 73 +----- library/std/src/lib.rs | 1 + library/std/src/sys/unix/fs.rs | 75 +++++- 7 files changed, 540 insertions(+), 80 deletions(-) create mode 100644 library/std/src/io/copy.rs diff --git a/library/std/src/fs.rs b/library/std/src/fs.rs index 161bfe3795c2c..c9fd20bd204b0 100644 --- a/library/std/src/fs.rs +++ b/library/std/src/fs.rs @@ -1656,7 +1656,7 @@ pub fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> /// the length of the `to` file as reported by `metadata`. /// /// If you’re wanting to copy the contents of one file to another and you’re -/// working with [`File`]s, see the [`io::copy`] function. +/// working with [`File`]s, see the [`io::copy()`] function. /// /// # Platform-specific behavior /// diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs new file mode 100644 index 0000000000000..45291293655cb --- /dev/null +++ b/library/std/src/io/copy.rs @@ -0,0 +1,456 @@ +use crate::io::{self, ErrorKind, Read, Write}; +use crate::mem::MaybeUninit; + +/// Copies the entire contents of a reader into a writer. +/// +/// This function will continuously read data from `reader` and then +/// write it into `writer` in a streaming fashion until `reader` +/// returns EOF. +/// +/// On success, the total number of bytes that were copied from +/// `reader` to `writer` is returned. +/// +/// If you’re wanting to copy the contents of one file to another and you’re +/// working with filesystem paths, see the [`fs::copy`] function. +/// +/// [`fs::copy`]: crate::fs::copy +/// +/// # Errors +/// +/// This function will return an error immediately if any call to [`read`] or +/// [`write`] returns an error. All instances of [`ErrorKind::Interrupted`] are +/// handled by this function and the underlying operation is retried. +/// +/// [`read`]: Read::read +/// [`write`]: Write::write +/// +/// # Examples +/// +/// ``` +/// use std::io; +/// +/// fn main() -> io::Result<()> { +/// let mut reader: &[u8] = b"hello"; +/// let mut writer: Vec = vec![]; +/// +/// io::copy(&mut reader, &mut writer)?; +/// +/// assert_eq!(&b"hello"[..], &writer[..]); +/// Ok(()) +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub fn copy(reader: &mut R, writer: &mut W) -> io::Result +where + R: Read, + W: Write, +{ + #[cfg(any(target_os = "linux", target_os = "android"))] + { + kernel_copy::copy_spec(reader, writer) + } + + #[cfg(not(any(target_os = "linux", target_os = "android")))] + generic_copy(reader, writer) +} + +pub(crate) fn generic_copy(reader: &mut R, writer: &mut W) -> io::Result +where + R: Read, + W: Write, +{ + let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit(); + // FIXME: #42788 + // + // - This creates a (mut) reference to a slice of + // _uninitialized_ integers, which is **undefined behavior** + // + // - Only the standard library gets to soundly "ignore" this, + // based on its privileged knowledge of unstable rustc + // internals; + unsafe { + reader.initializer().initialize(buf.assume_init_mut()); + } + + let mut written = 0; + loop { + let len = match reader.read(unsafe { buf.assume_init_mut() }) { + Ok(0) => return Ok(written), + Ok(len) => len, + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + writer.write_all(unsafe { &buf.assume_init_ref()[..len] })?; + written += len as u64; + } +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +mod kernel_copy { + + use crate::cmp::min; + use crate::convert::TryInto; + use crate::fs::{File, Metadata}; + use crate::io::{ + BufRead, BufReader, BufWriter, Read, Result, StderrLock, StdinLock, StdoutLock, Take, Write, + }; + use crate::mem::ManuallyDrop; + use crate::net::TcpStream; + use crate::os::unix::fs::FileTypeExt; + use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; + use crate::process::{ChildStderr, ChildStdin, ChildStdout}; + + pub(super) fn copy_spec( + read: &mut R, + write: &mut W, + ) -> Result { + let copier = Copier { read, write }; + SpecCopy::copy(copier) + } + + enum FdMeta { + Metadata(Metadata), + Socket, + Pipe, + None, + } + + impl FdMeta { + fn is_fifo(&self) -> bool { + match self { + FdMeta::Metadata(meta) => meta.file_type().is_fifo(), + FdMeta::Socket => false, + FdMeta::Pipe => true, + FdMeta::None => false, + } + } + } + + struct CopyParams(FdMeta, Option); + + struct Copier<'a, 'b, R: Read + ?Sized, W: Write + ?Sized> { + pub read: &'a mut R, + pub write: &'b mut W, + } + + trait SpecCopy { + fn copy(self) -> Result; + } + + impl SpecCopy for Copier<'_, '_, R, W> { + default fn copy(self) -> Result { + super::generic_copy(self.read, self.write) + } + } + + impl SpecCopy for Copier<'_, '_, R, W> { + fn copy(self) -> Result { + let (reader, writer) = (self.read, self.write); + let r_cfg = reader.properties(); + let w_cfg = writer.properties(); + + // before direct operations on file descriptors ensure that all source and sink buffers are emtpy + let mut flush = || -> crate::io::Result { + let bytes = reader.drain_to(writer, u64::MAX)?; + writer.flush()?; + Ok(bytes) + }; + + match (r_cfg, w_cfg) { + ( + CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)), + CopyParams(FdMeta::Metadata(writer_meta), Some(writefd)), + ) if reader_meta.is_file() && writer_meta.is_file() => { + let bytes_flushed = flush()?; + let max_write = reader.min_limit(); + let (mut reader, mut writer) = + unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; + let len = reader_meta.len(); + crate::sys::fs::copy_regular_files( + &mut reader, + &mut writer, + min(len, max_write), + ) + .map(|bytes_copied| bytes_copied + bytes_flushed) + } + (CopyParams(reader_meta, Some(readfd)), CopyParams(writer_meta, Some(writefd))) + if reader_meta.is_fifo() || writer_meta.is_fifo() => + { + // splice + let bytes_flushed = flush()?; + let max_write = reader.min_limit(); + let (mut reader, mut writer) = + unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; + crate::sys::fs::sendfile_splice( + crate::sys::fs::SpliceMode::Splice, + &mut reader, + &mut writer, + max_write, + ) + .map(|bytes_sent| bytes_sent + bytes_flushed) + } + ( + CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)), + CopyParams(_, Some(writefd)), + ) if reader_meta.is_file() => { + // try sendfile, most modern systems it should work with any target as long as the source is a mmapable file. + // in the rare cases where it's no supported the wrapper function will fall back to a normal copy loop + let bytes_flushed = flush()?; + let (mut reader, mut writer) = + unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; + let len = reader_meta.len(); + let max_write = reader.min_limit(); + crate::sys::fs::sendfile_splice( + crate::sys::fs::SpliceMode::Sendfile, + &mut reader, + &mut writer, + min(len, max_write), + ) + .map(|bytes_sent| bytes_sent + bytes_flushed) + } + _ => super::generic_copy(reader, writer), + } + } + } + + #[rustc_specialization_trait] + trait CopyRead: Read { + fn drain_to(&mut self, _writer: &mut W, _limit: u64) -> Result { + Ok(0) + } + + /// The minimum of the limit of all `Take<_>` wrappers, `u64::MAX` otherwise. + /// This method does not account for data `BufReader` buffers and would underreport + /// the limit of a `Take>>` type. Thus its result is only valid + /// after draining the buffers. + fn min_limit(&self) -> u64 { + u64::MAX + } + + fn properties(&self) -> CopyParams; + } + + #[rustc_specialization_trait] + trait CopyWrite: Write { + fn properties(&self) -> CopyParams; + } + + impl CopyRead for File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } + } + + impl CopyRead for &File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(*self), Some(self.as_raw_fd())) + } + } + + impl CopyWrite for File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } + } + + impl CopyWrite for &File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(*self), Some(self.as_raw_fd())) + } + } + + impl CopyRead for TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } + } + + impl CopyRead for &TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } + } + + impl CopyWrite for TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } + } + + impl CopyWrite for &TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } + } + + impl CopyWrite for ChildStdin { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } + } + + impl CopyRead for ChildStdout { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } + } + + impl CopyRead for ChildStderr { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } + } + + impl CopyRead for StdinLock<'_> { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let buf_reader = self.as_mut_buf(); + let buf = buf_reader.buffer(); + let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; + let bytes_drained = buf.len(); + writer.write_all(buf)?; + buf_reader.consume(bytes_drained); + + Ok(bytes_drained as u64) + } + + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } + } + + impl CopyWrite for StdoutLock<'_> { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } + } + + impl CopyWrite for StderrLock<'_> { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } + } + + impl CopyRead for Take { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let local_limit = self.limit(); + let combined_limit = min(outer_limit, local_limit); + let bytes_drained = self.get_mut().drain_to(writer, combined_limit)?; + // update limit since read() was bypassed + self.set_limit(local_limit - bytes_drained); + + Ok(bytes_drained) + } + + fn min_limit(&self) -> u64 { + min(Take::limit(self), self.get_ref().min_limit()) + } + + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } + } + + impl CopyRead for BufReader { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let buf = self.buffer(); + let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; + let bytes = buf.len(); + writer.write_all(buf)?; + self.consume(bytes); + + let remaining = outer_limit - bytes as u64; + + // in case of nested bufreaders we also need to drain the ones closer to the source + let inner_bytes = self.get_mut().drain_to(writer, remaining)?; + + Ok(bytes as u64 + inner_bytes) + } + + fn min_limit(&self) -> u64 { + self.get_ref().min_limit() + } + + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } + } + + impl CopyWrite for BufWriter { + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } + } + + fn fd_to_meta(fd: &T) -> FdMeta { + let fd = fd.as_raw_fd(); + let file: ManuallyDrop = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); + match file.metadata() { + Ok(meta) => FdMeta::Metadata(meta), + Err(_) => FdMeta::None, + } + } + + unsafe fn fd_as_file(fd: RawFd) -> ManuallyDrop { + ManuallyDrop::new(File::from_raw_fd(fd)) + } + + #[cfg(test)] + mod tests { + + use crate::io::{BufRead, BufReader, BufWriter, Read, Result, Seek, SeekFrom, Write}; + + #[test] + fn copy_specialization() -> Result<()> { + let path = crate::env::temp_dir(); + let source_path = path.join("copy-spec.source"); + let sink_path = path.join("copy-spec.sink"); + + let result: Result<()> = try { + let mut source = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&source_path)?; + source.write_all(b"abcdefghiklmnopqr")?; + source.seek(SeekFrom::Start(8))?; + let mut source = BufReader::with_capacity(8, source.take(5)); + source.fill_buf()?; + assert_eq!(source.buffer(), b"iklmn"); + source.get_mut().set_limit(6); + source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" + let mut source = source.take(10); // "iklmnbcdef" + + let mut sink = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&sink_path)?; + sink.write_all(b"000000")?; + let mut sink = BufWriter::with_capacity(5, sink); + sink.write_all(b"wxyz")?; + assert_eq!(sink.buffer(), b"wxyz"); + + let copied = crate::io::copy(&mut source, &mut sink)?; + assert_eq!(copied, 10); + assert_eq!(sink.buffer().len(), 0); + + let mut sink = sink.into_inner()?; + sink.seek(SeekFrom::Start(0))?; + let mut copied = Vec::new(); + sink.read_to_end(&mut copied)?; + assert_eq!(&copied, b"000000wxyziklmnbcdef"); + }; + + let rm1 = crate::fs::remove_file(source_path); + let rm2 = crate::fs::remove_file(sink_path); + + result.and(rm1).and(rm2) + } + } +} diff --git a/library/std/src/io/mod.rs b/library/std/src/io/mod.rs index d9d0380781925..0529b42347e23 100644 --- a/library/std/src/io/mod.rs +++ b/library/std/src/io/mod.rs @@ -266,6 +266,8 @@ pub use self::buffered::IntoInnerError; #[stable(feature = "rust1", since = "1.0.0")] pub use self::buffered::{BufReader, BufWriter, LineWriter}; #[stable(feature = "rust1", since = "1.0.0")] +pub use self::copy::copy; +#[stable(feature = "rust1", since = "1.0.0")] pub use self::cursor::Cursor; #[stable(feature = "rust1", since = "1.0.0")] pub use self::error::{Error, ErrorKind, Result}; @@ -279,9 +281,12 @@ pub use self::stdio::{_eprint, _print}; #[doc(no_inline, hidden)] pub use self::stdio::{set_panic, set_print}; #[stable(feature = "rust1", since = "1.0.0")] -pub use self::util::{copy, empty, repeat, sink, Empty, Repeat, Sink}; +pub use self::util::{empty, repeat, sink, Empty, Repeat, Sink}; + +pub(crate) use self::copy::generic_copy; mod buffered; +mod copy; mod cursor; mod error; mod impls; diff --git a/library/std/src/io/stdio.rs b/library/std/src/io/stdio.rs index 36b49401591f5..47ff13f529863 100644 --- a/library/std/src/io/stdio.rs +++ b/library/std/src/io/stdio.rs @@ -409,6 +409,12 @@ impl Read for Stdin { } } +impl StdinLock<'_> { + pub(crate) fn as_mut_buf(&mut self) -> &mut BufReader { + &mut self.inner + } +} + #[stable(feature = "rust1", since = "1.0.0")] impl Read for StdinLock<'_> { fn read(&mut self, buf: &mut [u8]) -> io::Result { diff --git a/library/std/src/io/util.rs b/library/std/src/io/util.rs index dc05b9648fd6b..ba80eb46599ba 100644 --- a/library/std/src/io/util.rs +++ b/library/std/src/io/util.rs @@ -4,78 +4,7 @@ mod tests; use crate::fmt; -use crate::io::{self, BufRead, ErrorKind, Initializer, IoSlice, IoSliceMut, Read, Write}; -use crate::mem::MaybeUninit; - -/// Copies the entire contents of a reader into a writer. -/// -/// This function will continuously read data from `reader` and then -/// write it into `writer` in a streaming fashion until `reader` -/// returns EOF. -/// -/// On success, the total number of bytes that were copied from -/// `reader` to `writer` is returned. -/// -/// If you’re wanting to copy the contents of one file to another and you’re -/// working with filesystem paths, see the [`fs::copy`] function. -/// -/// [`fs::copy`]: crate::fs::copy -/// -/// # Errors -/// -/// This function will return an error immediately if any call to [`read`] or -/// [`write`] returns an error. All instances of [`ErrorKind::Interrupted`] are -/// handled by this function and the underlying operation is retried. -/// -/// [`read`]: Read::read -/// [`write`]: Write::write -/// -/// # Examples -/// -/// ``` -/// use std::io; -/// -/// fn main() -> io::Result<()> { -/// let mut reader: &[u8] = b"hello"; -/// let mut writer: Vec = vec![]; -/// -/// io::copy(&mut reader, &mut writer)?; -/// -/// assert_eq!(&b"hello"[..], &writer[..]); -/// Ok(()) -/// } -/// ``` -#[stable(feature = "rust1", since = "1.0.0")] -pub fn copy(reader: &mut R, writer: &mut W) -> io::Result -where - R: Read, - W: Write, -{ - let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit(); - // FIXME: #42788 - // - // - This creates a (mut) reference to a slice of - // _uninitialized_ integers, which is **undefined behavior** - // - // - Only the standard library gets to soundly "ignore" this, - // based on its privileged knowledge of unstable rustc - // internals; - unsafe { - reader.initializer().initialize(buf.assume_init_mut()); - } - - let mut written = 0; - loop { - let len = match reader.read(unsafe { buf.assume_init_mut() }) { - Ok(0) => return Ok(written), - Ok(len) => len, - Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => return Err(e), - }; - writer.write_all(unsafe { &buf.assume_init_ref()[..len] })?; - written += len as u64; - } -} +use crate::io::{self, BufRead, Initializer, IoSlice, IoSliceMut, Read, Write}; /// A reader which is always at EOF. /// diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index 96a7755c68821..bf4ec9c8b644b 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -316,6 +316,7 @@ #![feature(toowned_clone_into)] #![feature(total_cmp)] #![feature(trace_macros)] +#![feature(try_blocks)] #![feature(try_reserve)] #![feature(unboxed_closures)] #![feature(unsafe_block_in_unsafe_fn)] diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index d27d6e2c5659b..012e4a16b9f84 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1191,6 +1191,21 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { #[cfg(any(target_os = "linux", target_os = "android"))] pub fn copy(from: &Path, to: &Path) -> io::Result { + let (mut reader, reader_metadata) = open_from(from)?; + let max_len = u64::MAX; + let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?; + + copy_regular_files(&mut reader, &mut writer, max_len) +} + +/// linux-specific implementation that will attempt to use copy_file_range for copy offloading +/// as the name says, it only works on regular files +#[cfg(any(target_os = "linux", target_os = "android"))] +pub(crate) fn copy_regular_files( + reader: &mut crate::fs::File, + writer: &mut crate::fs::File, + max_len: u64, +) -> io::Result { use crate::cmp; use crate::sync::atomic::{AtomicBool, Ordering}; @@ -1209,10 +1224,6 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { libc::syscall(libc::SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags) } - let (mut reader, reader_metadata) = open_from(from)?; - let max_len = u64::MAX; - let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?; - let has_copy_file_range = HAS_COPY_FILE_RANGE.load(Ordering::Relaxed); let mut written = 0u64; while written < max_len { @@ -1249,7 +1260,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { // - reading virtual files from the proc filesystem which appear to have 0 size // but are not empty. noted in coreutils to affect kernels at least up to 5.6.19. // - copying from an overlay filesystem in docker. reported to occur on fedora 32. - return io::copy(&mut reader, &mut writer); + return io::copy(reader, writer); } Ok(0) => return Ok(written), // reached EOF Ok(ret) => written += ret as u64, @@ -1265,7 +1276,59 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { // - copy_file_range is disallowed, for example by seccomp (EPERM) // - copy_file_range cannot be used with pipes or device nodes (EINVAL) assert_eq!(written, 0); - return io::copy(&mut reader, &mut writer); + return io::generic_copy(reader, writer); + } + _ => return Err(err), + } + } + } + } + Ok(written) +} + +pub(crate) enum SpliceMode { + Sendfile, + Splice, +} + +/// performs splice or sendfile between file descriptors +#[cfg(any(target_os = "linux", target_os = "android"))] +pub(crate) fn sendfile_splice( + mode: SpliceMode, + reader: &mut crate::fs::File, + writer: &mut crate::fs::File, + len: u64, +) -> io::Result { + let mut written = 0u64; + while written < len { + let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; + + let result = match mode { + SpliceMode::Sendfile => cvt(unsafe { + libc::sendfile(writer.as_raw_fd(), reader.as_raw_fd(), ptr::null_mut(), chunk_size) + }), + SpliceMode::Splice => cvt(unsafe { + libc::splice( + reader.as_raw_fd(), + ptr::null_mut(), + writer.as_raw_fd(), + ptr::null_mut(), + chunk_size, + 0, + ) + }), + }; + + match result { + Ok(0) => break, // EOF + Ok(ret) => written += ret as u64, + Err(err) => { + match err.raw_os_error() { + Some(os_err) if os_err == libc::EINVAL => { + // Try fallback io::copy if splice/sendfile do not support this particular + // file descritor (EINVAL) + assert_eq!(written, 0); + return io::generic_copy(reader, writer); } _ => return Err(err), } From 5eb88fa5c72d0f9b5abe106881f2c5ffba9b073d Mon Sep 17 00:00:00 2001 From: The8472 Date: Sat, 8 Aug 2020 01:30:03 +0200 Subject: [PATCH 02/13] hide unused exports on other platforms --- library/std/src/io/mod.rs | 4 +--- library/std/src/io/stdio.rs | 2 ++ library/std/src/sys/unix/fs.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/library/std/src/io/mod.rs b/library/std/src/io/mod.rs index 0529b42347e23..cc0e1172a533d 100644 --- a/library/std/src/io/mod.rs +++ b/library/std/src/io/mod.rs @@ -283,10 +283,8 @@ pub use self::stdio::{set_panic, set_print}; #[stable(feature = "rust1", since = "1.0.0")] pub use self::util::{empty, repeat, sink, Empty, Repeat, Sink}; -pub(crate) use self::copy::generic_copy; - mod buffered; -mod copy; +pub(crate) mod copy; mod cursor; mod error; mod impls; diff --git a/library/std/src/io/stdio.rs b/library/std/src/io/stdio.rs index 47ff13f529863..8a69e9ccd55bc 100644 --- a/library/std/src/io/stdio.rs +++ b/library/std/src/io/stdio.rs @@ -409,6 +409,8 @@ impl Read for Stdin { } } +// only used by platform-dependent io::copy specializations, i.e. unused on some platforms +#[cfg(any(target_os = "linux", target_os = "android"))] impl StdinLock<'_> { pub(crate) fn as_mut_buf(&mut self) -> &mut BufReader { &mut self.inner diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index 012e4a16b9f84..b106eb0a5b6da 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1276,7 +1276,7 @@ pub(crate) fn copy_regular_files( // - copy_file_range is disallowed, for example by seccomp (EPERM) // - copy_file_range cannot be used with pipes or device nodes (EINVAL) assert_eq!(written, 0); - return io::generic_copy(reader, writer); + return io::copy::generic_copy(reader, writer); } _ => return Err(err), } @@ -1328,7 +1328,7 @@ pub(crate) fn sendfile_splice( // Try fallback io::copy if splice/sendfile do not support this particular // file descritor (EINVAL) assert_eq!(written, 0); - return io::generic_copy(reader, writer); + return io::copy::generic_copy(reader, writer); } _ => return Err(err), } From 67a6059aa5dab924a09194d5490d43da518a40a5 Mon Sep 17 00:00:00 2001 From: The8472 Date: Tue, 1 Sep 2020 23:25:31 +0200 Subject: [PATCH 03/13] move tests module into separate file --- library/std/src/io/copy.rs | 56 ------------------------------------- library/std/src/io/tests.rs | 53 ++++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 57 deletions(-) diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index 45291293655cb..4604f2ab002d9 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -397,60 +397,4 @@ mod kernel_copy { unsafe fn fd_as_file(fd: RawFd) -> ManuallyDrop { ManuallyDrop::new(File::from_raw_fd(fd)) } - - #[cfg(test)] - mod tests { - - use crate::io::{BufRead, BufReader, BufWriter, Read, Result, Seek, SeekFrom, Write}; - - #[test] - fn copy_specialization() -> Result<()> { - let path = crate::env::temp_dir(); - let source_path = path.join("copy-spec.source"); - let sink_path = path.join("copy-spec.sink"); - - let result: Result<()> = try { - let mut source = crate::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&source_path)?; - source.write_all(b"abcdefghiklmnopqr")?; - source.seek(SeekFrom::Start(8))?; - let mut source = BufReader::with_capacity(8, source.take(5)); - source.fill_buf()?; - assert_eq!(source.buffer(), b"iklmn"); - source.get_mut().set_limit(6); - source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" - let mut source = source.take(10); // "iklmnbcdef" - - let mut sink = crate::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&sink_path)?; - sink.write_all(b"000000")?; - let mut sink = BufWriter::with_capacity(5, sink); - sink.write_all(b"wxyz")?; - assert_eq!(sink.buffer(), b"wxyz"); - - let copied = crate::io::copy(&mut source, &mut sink)?; - assert_eq!(copied, 10); - assert_eq!(sink.buffer().len(), 0); - - let mut sink = sink.into_inner()?; - sink.seek(SeekFrom::Start(0))?; - let mut copied = Vec::new(); - sink.read_to_end(&mut copied)?; - assert_eq!(&copied, b"000000wxyziklmnbcdef"); - }; - - let rm1 = crate::fs::remove_file(source_path); - let rm2 = crate::fs::remove_file(sink_path); - - result.and(rm1).and(rm2) - } - } } diff --git a/library/std/src/io/tests.rs b/library/std/src/io/tests.rs index 913b28538b7c4..3a983bc176e48 100644 --- a/library/std/src/io/tests.rs +++ b/library/std/src/io/tests.rs @@ -1,7 +1,7 @@ use super::{repeat, Cursor, SeekFrom}; use crate::cmp::{self, min}; -use crate::io::prelude::*; use crate::io::{self, IoSlice, IoSliceMut}; +use crate::io::{BufRead, BufReader, BufWriter, Read, Result, Seek, Write}; use crate::ops::Deref; #[test] @@ -492,3 +492,54 @@ fn test_write_all_vectored() { } } } + +#[test] +#[cfg(unix)] +fn copy_specialization() -> Result<()> { + let path = crate::env::temp_dir(); + let source_path = path.join("copy-spec.source"); + let sink_path = path.join("copy-spec.sink"); + + let result: Result<()> = try { + let mut source = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&source_path)?; + source.write_all(b"abcdefghiklmnopqr")?; + source.seek(SeekFrom::Start(8))?; + let mut source = BufReader::with_capacity(8, source.take(5)); + source.fill_buf()?; + assert_eq!(source.buffer(), b"iklmn"); + source.get_mut().set_limit(6); + source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" + let mut source = source.take(10); // "iklmnbcdef" + + let mut sink = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&sink_path)?; + sink.write_all(b"000000")?; + let mut sink = BufWriter::with_capacity(5, sink); + sink.write_all(b"wxyz")?; + assert_eq!(sink.buffer(), b"wxyz"); + + let copied = crate::io::copy(&mut source, &mut sink)?; + assert_eq!(copied, 10); + assert_eq!(sink.buffer().len(), 0); + + let mut sink = sink.into_inner()?; + sink.seek(SeekFrom::Start(0))?; + let mut copied = Vec::new(); + sink.read_to_end(&mut copied)?; + assert_eq!(&copied, b"000000wxyziklmnbcdef"); + }; + + let rm1 = crate::fs::remove_file(source_path); + let rm2 = crate::fs::remove_file(sink_path); + + result.and(rm1).and(rm2) +} From cd3bddc044728e040d6a22c684f1b9f711566772 Mon Sep 17 00:00:00 2001 From: The8472 Date: Sun, 6 Sep 2020 22:47:58 +0200 Subject: [PATCH 04/13] prioritize sendfile over splice since it results in fewer context switches when sending to pipes splice returns to userspace when the pipe is full, sendfile just blocks until it's done, this can achieve much higher throughput --- library/std/src/io/copy.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index 4604f2ab002d9..f54a9b8b7649b 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -173,22 +173,6 @@ mod kernel_copy { ) .map(|bytes_copied| bytes_copied + bytes_flushed) } - (CopyParams(reader_meta, Some(readfd)), CopyParams(writer_meta, Some(writefd))) - if reader_meta.is_fifo() || writer_meta.is_fifo() => - { - // splice - let bytes_flushed = flush()?; - let max_write = reader.min_limit(); - let (mut reader, mut writer) = - unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; - crate::sys::fs::sendfile_splice( - crate::sys::fs::SpliceMode::Splice, - &mut reader, - &mut writer, - max_write, - ) - .map(|bytes_sent| bytes_sent + bytes_flushed) - } ( CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)), CopyParams(_, Some(writefd)), @@ -205,6 +189,22 @@ mod kernel_copy { &mut reader, &mut writer, min(len, max_write), + ) + .map(|bytes_sent| bytes_sent + bytes_flushed) + } + (CopyParams(reader_meta, Some(readfd)), CopyParams(writer_meta, Some(writefd))) + if reader_meta.is_fifo() || writer_meta.is_fifo() => + { + // splice + let bytes_flushed = flush()?; + let max_write = reader.min_limit(); + let (mut reader, mut writer) = + unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; + crate::sys::fs::sendfile_splice( + crate::sys::fs::SpliceMode::Splice, + &mut reader, + &mut writer, + max_write, ) .map(|bytes_sent| bytes_sent + bytes_flushed) } From 0624730d9e9e2b6de974b6f4edd1ea48ab5f240c Mon Sep 17 00:00:00 2001 From: The8472 Date: Sun, 6 Sep 2020 22:50:35 +0200 Subject: [PATCH 05/13] add forwarding specializations for &mut variants `impl Write for &mut T where T: Write`, thus the same should apply to the specialization traits --- library/std/src/io/copy.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index f54a9b8b7649b..e8cbe6a7e715a 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -235,6 +235,27 @@ mod kernel_copy { fn properties(&self) -> CopyParams; } + impl CopyRead for &mut T where T: CopyRead { + fn drain_to(&mut self, writer: &mut W, limit: u64) -> Result { + (**self).drain_to(writer, limit) + } + + fn min_limit(&self) -> u64 { + (**self).min_limit() + } + + fn properties(&self) -> CopyParams { + (**self).properties() + } + } + + impl CopyWrite for &mut T where T: CopyWrite { + fn properties(&self) -> CopyParams { + (**self).properties() + } + } + + impl CopyRead for File { fn properties(&self) -> CopyParams { CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) From 46e7fbe60b53e486ff39d29c571428c8a345e925 Mon Sep 17 00:00:00 2001 From: The8472 Date: Thu, 10 Sep 2020 22:12:42 +0200 Subject: [PATCH 06/13] reduce syscalls by inferring FD types based on source struct instead of calling stat() also adds handling for edge-cases involving large sparse files where sendfile could fail with EOVERFLOW --- library/std/src/io/copy.rs | 168 ++++++++++++++++++++------------- library/std/src/sys/unix/fs.rs | 83 ++++++++++------ 2 files changed, 158 insertions(+), 93 deletions(-) diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index e8cbe6a7e715a..31bfdb63386b5 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -99,6 +99,7 @@ mod kernel_copy { use crate::os::unix::fs::FileTypeExt; use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use crate::process::{ChildStderr, ChildStdin, ChildStdout}; + use crate::sys::fs::{copy_regular_files, sendfile_splice, CopyResult, SpliceMode}; pub(super) fn copy_spec( read: &mut R, @@ -108,20 +109,55 @@ mod kernel_copy { SpecCopy::copy(copier) } + /// This type represents either the inferred `FileType` of a `RawFd` based on the source + /// type from which it was extracted or the actual metadata + /// + /// The methods on this type only provide hints, due to `AsRawFd` and `FromRawFd` the inferred + /// type may be wrong. enum FdMeta { + /// We obtained the FD from a type that can contain any type of `FileType` and queried the metadata + /// because it is cheaper than probing all possible syscalls (reader side) Metadata(Metadata), Socket, Pipe, - None, + /// We don't have any metadata, e.g. because the original type was `File` which can represent + /// any `FileType` and we did not query the metadata either since it did not seem beneficial + /// (writer side) + NoneObtained, } impl FdMeta { - fn is_fifo(&self) -> bool { + fn maybe_fifo(&self) -> bool { match self { FdMeta::Metadata(meta) => meta.file_type().is_fifo(), FdMeta::Socket => false, FdMeta::Pipe => true, - FdMeta::None => false, + FdMeta::NoneObtained => true, + } + } + + fn potential_sendfile_source(&self) -> bool { + match self { + // procfs erronously shows 0 length on non-empty readable files. + // and if a file is truly empty then a `read` syscall will determine that and skip the write syscall + // thus there would be benefit from attempting sendfile + FdMeta::Metadata(meta) + if meta.file_type().is_file() && meta.len() > 0 + || meta.file_type().is_block_device() => + { + true + } + _ => false, + } + } + + fn copy_file_range_candidate(&self) -> bool { + match self { + // copy_file_range will fail on empty procfs files. `read` can determine whether EOF has been reached + // without extra cost and skip the write, thus there is no benefit in attempting copy_file_range + FdMeta::Metadata(meta) if meta.is_file() && meta.len() > 0 => true, + FdMeta::NoneObtained => true, + _ => false, } } } @@ -149,66 +185,65 @@ mod kernel_copy { let r_cfg = reader.properties(); let w_cfg = writer.properties(); - // before direct operations on file descriptors ensure that all source and sink buffers are emtpy + // before direct operations on file descriptors ensure that all source and sink buffers are emtpy let mut flush = || -> crate::io::Result { let bytes = reader.drain_to(writer, u64::MAX)?; + // BufWriter buffered bytes have already been accounted for in earlier write() calls writer.flush()?; Ok(bytes) }; - match (r_cfg, w_cfg) { - ( - CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)), - CopyParams(FdMeta::Metadata(writer_meta), Some(writefd)), - ) if reader_meta.is_file() && writer_meta.is_file() => { - let bytes_flushed = flush()?; - let max_write = reader.min_limit(); - let (mut reader, mut writer) = - unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; - let len = reader_meta.len(); - crate::sys::fs::copy_regular_files( - &mut reader, - &mut writer, - min(len, max_write), - ) - .map(|bytes_copied| bytes_copied + bytes_flushed) + let mut written = 0u64; + + if let (CopyParams(input_meta, Some(readfd)), CopyParams(output_meta, Some(writefd))) = + (r_cfg, w_cfg) + { + written += flush()?; + let max_write = reader.min_limit(); + + if input_meta.copy_file_range_candidate() && output_meta.copy_file_range_candidate() + { + let result = copy_regular_files(readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(bytes) => written += bytes, + } } - ( - CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)), - CopyParams(_, Some(writefd)), - ) if reader_meta.is_file() => { - // try sendfile, most modern systems it should work with any target as long as the source is a mmapable file. - // in the rare cases where it's no supported the wrapper function will fall back to a normal copy loop - let bytes_flushed = flush()?; - let (mut reader, mut writer) = - unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; - let len = reader_meta.len(); - let max_write = reader.min_limit(); - crate::sys::fs::sendfile_splice( - crate::sys::fs::SpliceMode::Sendfile, - &mut reader, - &mut writer, - min(len, max_write), - ) - .map(|bytes_sent| bytes_sent + bytes_flushed) + + // on modern kernels sendfile can copy from any mmapable type (some but not all regular files and block devices) + // to any writable file descriptor. On older kernels the writer side can only be a socket. + // So we just try and fallback if needed. + // If current file offsets + write sizes overflow it may also fail, we do not try to fix that and instead + // fall back to the generic copy loop. + if input_meta.potential_sendfile_source() { + let result = sendfile_splice(SpliceMode::Sendfile, readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(bytes) => written += bytes, + } } - (CopyParams(reader_meta, Some(readfd)), CopyParams(writer_meta, Some(writefd))) - if reader_meta.is_fifo() || writer_meta.is_fifo() => - { - // splice - let bytes_flushed = flush()?; - let max_write = reader.min_limit(); - let (mut reader, mut writer) = - unsafe { (fd_as_file(readfd), fd_as_file(writefd)) }; - crate::sys::fs::sendfile_splice( - crate::sys::fs::SpliceMode::Splice, - &mut reader, - &mut writer, - max_write, - ) - .map(|bytes_sent| bytes_sent + bytes_flushed) + + if input_meta.maybe_fifo() || output_meta.maybe_fifo() { + let result = sendfile_splice(SpliceMode::Splice, readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(0) => { /* use fallback */ } + CopyResult::Fallback(_) => { + unreachable!("splice should not return > 0 bytes on the fallback path") + } + } } - _ => super::generic_copy(reader, writer), + } + + match super::generic_copy(reader, writer) { + Ok(bytes) => Ok(bytes + written), + err => err, } } } @@ -235,7 +270,10 @@ mod kernel_copy { fn properties(&self) -> CopyParams; } - impl CopyRead for &mut T where T: CopyRead { + impl CopyRead for &mut T + where + T: CopyRead, + { fn drain_to(&mut self, writer: &mut W, limit: u64) -> Result { (**self).drain_to(writer, limit) } @@ -249,13 +287,15 @@ mod kernel_copy { } } - impl CopyWrite for &mut T where T: CopyWrite { + impl CopyWrite for &mut T + where + T: CopyWrite, + { fn properties(&self) -> CopyParams { (**self).properties() } } - impl CopyRead for File { fn properties(&self) -> CopyParams { CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) @@ -270,13 +310,13 @@ mod kernel_copy { impl CopyWrite for File { fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) } } impl CopyWrite for &File { fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(*self), Some(self.as_raw_fd())) + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) } } @@ -345,13 +385,13 @@ mod kernel_copy { impl CopyWrite for StdoutLock<'_> { fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) } } impl CopyWrite for StderrLock<'_> { fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) } } @@ -411,11 +451,7 @@ mod kernel_copy { let file: ManuallyDrop = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); match file.metadata() { Ok(meta) => FdMeta::Metadata(meta), - Err(_) => FdMeta::None, + Err(_) => FdMeta::NoneObtained, } } - - unsafe fn fd_as_file(fd: RawFd) -> ManuallyDrop { - ManuallyDrop::new(File::from_raw_fd(fd)) - } } diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index b106eb0a5b6da..0bab95053a499 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1195,17 +1195,26 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { let max_len = u64::MAX; let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?; - copy_regular_files(&mut reader, &mut writer, max_len) + return match copy_regular_files(reader.as_raw_fd(), writer.as_raw_fd(), max_len) { + CopyResult::Ended(result) => result, + CopyResult::Fallback(written) => { + // fallback is only > 0 on EOVERFLOW, which shouldn't happen + // because the copy loop starts at a file offset 0 and countns down from `len` + assert_eq!(0, written); + io::copy::generic_copy(&mut reader, &mut writer) + } + }; } /// linux-specific implementation that will attempt to use copy_file_range for copy offloading /// as the name says, it only works on regular files +/// +/// Callers must handle fallback to a generic copy loop. +/// `Fallback` may indicate non-zero number of bytes already written +/// if one of the files' cursor +`max_len` would exceed u64::MAX (`EOVERFLOW`). +/// If the initial file offset was 0 then `Fallback` will only contain `0`. #[cfg(any(target_os = "linux", target_os = "android"))] -pub(crate) fn copy_regular_files( - reader: &mut crate::fs::File, - writer: &mut crate::fs::File, - max_len: u64, -) -> io::Result { +pub(crate) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { use crate::cmp; use crate::sync::atomic::{AtomicBool, Ordering}; @@ -1228,14 +1237,18 @@ pub(crate) fn copy_regular_files( let mut written = 0u64; while written < max_len { let copy_result = if has_copy_file_range { - let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64) as usize; + let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64); + // cap to 2GB chunks in case u64::MAX is passed in as file size and the file has a non-zero offset + // this allows us to copy large chunks without hitting the limit, + // unless someone sets a file offset close to u64::MAX - 2GB, in which case the fallback would kick in + let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x8000_0000usize); let copy_result = unsafe { // We actually don't have to adjust the offsets, // because copy_file_range adjusts the file offset automatically cvt(copy_file_range( - reader.as_raw_fd(), + reader, ptr::null_mut(), - writer.as_raw_fd(), + writer, ptr::null_mut(), bytes_to_copy, 0, @@ -1260,12 +1273,14 @@ pub(crate) fn copy_regular_files( // - reading virtual files from the proc filesystem which appear to have 0 size // but are not empty. noted in coreutils to affect kernels at least up to 5.6.19. // - copying from an overlay filesystem in docker. reported to occur on fedora 32. - return io::copy(reader, writer); + return CopyResult::Fallback(0); } - Ok(0) => return Ok(written), // reached EOF + Ok(0) => return CopyResult::Ended(Ok(written)), // reached EOF Ok(ret) => written += ret as u64, Err(err) => { match err.raw_os_error() { + // when file offset + max_length > u64::MAX + Some(libc::EOVERFLOW) => return CopyResult::Fallback(written), Some( libc::ENOSYS | libc::EXDEV | libc::EINVAL | libc::EPERM | libc::EOPNOTSUPP, ) => { @@ -1276,43 +1291,55 @@ pub(crate) fn copy_regular_files( // - copy_file_range is disallowed, for example by seccomp (EPERM) // - copy_file_range cannot be used with pipes or device nodes (EINVAL) assert_eq!(written, 0); - return io::copy::generic_copy(reader, writer); + return CopyResult::Fallback(0); } - _ => return Err(err), + _ => return CopyResult::Ended(Err(err)), } } } } - Ok(written) + CopyResult::Ended(Ok(written)) } +#[derive(PartialEq)] pub(crate) enum SpliceMode { Sendfile, Splice, } +pub(crate) enum CopyResult { + Ended(io::Result), + Fallback(u64), +} + /// performs splice or sendfile between file descriptors +/// Does _not_ fall back to a generic copy loop. #[cfg(any(target_os = "linux", target_os = "android"))] pub(crate) fn sendfile_splice( mode: SpliceMode, - reader: &mut crate::fs::File, - writer: &mut crate::fs::File, + reader: RawFd, + writer: RawFd, len: u64, -) -> io::Result { +) -> CopyResult { let mut written = 0u64; while written < len { let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; let result = match mode { - SpliceMode::Sendfile => cvt(unsafe { - libc::sendfile(writer.as_raw_fd(), reader.as_raw_fd(), ptr::null_mut(), chunk_size) - }), + SpliceMode::Sendfile => { + cvt(unsafe { libc::sendfile(writer, reader, ptr::null_mut(), chunk_size) }) + } SpliceMode::Splice => cvt(unsafe { libc::splice( - reader.as_raw_fd(), + reader, ptr::null_mut(), - writer.as_raw_fd(), + writer, ptr::null_mut(), + // default pipe size is 64KiB. try to only fill/drain half of that capacity + // so that the next loop iteration won't be put to sleep. + // If reader and writer operate at the same pace they will experience fewer blocking waits. + // This is only needed for splice since sendfile stays in kernel space when it has to block. + //crate::cmp::min(32*1024, chunk_size), chunk_size, 0, ) @@ -1325,17 +1352,19 @@ pub(crate) fn sendfile_splice( Err(err) => { match err.raw_os_error() { Some(os_err) if os_err == libc::EINVAL => { - // Try fallback io::copy if splice/sendfile do not support this particular - // file descritor (EINVAL) + // splice/sendfile do not support this particular file descritor (EINVAL) assert_eq!(written, 0); - return io::copy::generic_copy(reader, writer); + return CopyResult::Fallback(0); + } + Some(os_err) if mode == SpliceMode::Sendfile && os_err == libc::EOVERFLOW => { + return CopyResult::Fallback(written); } - _ => return Err(err), + _ => return CopyResult::Ended(Err(err)), } } } } - Ok(written) + CopyResult::Ended(Ok(written)) } #[cfg(any(target_os = "macos", target_os = "ios"))] From ad9b07c7e5c1a24ec3b75a5bc70708dbed2e40f9 Mon Sep 17 00:00:00 2001 From: The8472 Date: Sat, 5 Sep 2020 00:34:16 +0200 Subject: [PATCH 07/13] add benchmarks --- library/std/src/io/tests.rs | 132 +++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/library/std/src/io/tests.rs b/library/std/src/io/tests.rs index 3a983bc176e48..9a0672e88b80f 100644 --- a/library/std/src/io/tests.rs +++ b/library/std/src/io/tests.rs @@ -1,8 +1,15 @@ use super::{repeat, Cursor, SeekFrom}; use crate::cmp::{self, min}; +use crate::env::temp_dir; +#[cfg(unix)] +use crate::fs::OpenOptions; +#[cfg(unix)] +use crate::io::Result; use crate::io::{self, IoSlice, IoSliceMut}; -use crate::io::{BufRead, BufReader, BufWriter, Read, Result, Seek, Write}; +use crate::io::{BufRead, Read, Seek, Write}; use crate::ops::Deref; +#[cfg(unix)] +use crate::os::unix::io::AsRawFd; #[test] #[cfg_attr(target_os = "emscripten", ignore)] @@ -496,6 +503,8 @@ fn test_write_all_vectored() { #[test] #[cfg(unix)] fn copy_specialization() -> Result<()> { + use crate::io::{BufReader, BufWriter}; + let path = crate::env::temp_dir(); let source_path = path.join("copy-spec.source"); let sink_path = path.join("copy-spec.sink"); @@ -543,3 +552,124 @@ fn copy_specialization() -> Result<()> { result.and(rm1).and(rm2) } + +#[bench] +fn bench_file_to_file_copy(b: &mut test::Bencher) { + const BYTES: usize = 128 * 1024; + let src_path = temp_dir().join("file-copy-bench-src"); + let mut src = crate::fs::OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(src_path) + .unwrap(); + src.write(&vec![0u8; BYTES]).unwrap(); + + let sink_path = temp_dir().join("file-copy-bench-sink"); + let mut sink = crate::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(sink_path) + .unwrap(); + + b.bytes = BYTES as u64; + b.iter(|| { + src.seek(SeekFrom::Start(0)).unwrap(); + sink.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); + }); +} + +#[cfg(unix)] +#[bench] +fn bench_file_to_socket_copy(b: &mut test::Bencher) { + const BYTES: usize = 128 * 1024; + let src_path = temp_dir().join("pipe-copy-bench-src"); + let mut src = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(src_path) + .unwrap(); + src.write(&vec![0u8; BYTES]).unwrap(); + + let sink_drainer = crate::net::TcpListener::bind("localhost:0").unwrap(); + let mut sink = crate::net::TcpStream::connect(sink_drainer.local_addr().unwrap()).unwrap(); + let mut sink_drainer = sink_drainer.accept().unwrap().0; + + crate::thread::spawn(move || { + let mut sink_buf = vec![0u8; 1024 * 1024]; + loop { + sink_drainer.read(&mut sink_buf[..]).unwrap(); + } + }); + + b.bytes = BYTES as u64; + b.iter(|| { + src.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); + }); +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +#[bench] +fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { + use crate::io::ErrorKind; + use crate::process::{ChildStdin, ChildStdout}; + use crate::sys_common::FromInner; + + let (read_end, write_end) = crate::sys::pipe::anon_pipe().unwrap(); + + let mut read_end = ChildStdout::from_inner(read_end); + let write_end = ChildStdin::from_inner(write_end); + + let acceptor = crate::net::TcpListener::bind("localhost:0").unwrap(); + let mut remote_end = crate::net::TcpStream::connect(acceptor.local_addr().unwrap()).unwrap(); + + let local_end = crate::sync::Arc::new(acceptor.accept().unwrap().0); + + crate::thread::spawn(move || { + let mut sink_buf = vec![0u8; 1024 * 1024]; + remote_end.set_nonblocking(true).unwrap(); + loop { + match remote_end.write(&mut sink_buf[..]) { + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Ok(_) => {} + err => { + err.expect("write failed"); + } + }; + match remote_end.read(&mut sink_buf[..]) { + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Ok(_) => {} + err => { + err.expect("read failed"); + } + }; + } + }); + + let local_source = local_end.clone(); + crate::thread::spawn(move || { + loop { + crate::sys::fs::sendfile_splice( + crate::sys::fs::SpliceMode::Splice, + local_source.as_raw_fd(), + write_end.as_raw_fd(), + u64::MAX, + ); + } + }); + + const BYTES: usize = 128 * 1024; + b.bytes = BYTES as u64; + b.iter(|| { + assert_eq!( + BYTES as u64, + io::copy(&mut (&mut read_end).take(BYTES as u64), &mut &*local_end).unwrap() + ); + }); +} From 7f5d2722afea902e89e44062765733c4800b7d21 Mon Sep 17 00:00:00 2001 From: The8472 Date: Wed, 7 Oct 2020 01:01:12 +0200 Subject: [PATCH 08/13] move copy specialization into sys::unix module --- library/std/src/io/copy.rs | 385 +-------------------- library/std/src/sys/unix/fs.rs | 18 +- library/std/src/sys/unix/kernel_copy.rs | 422 ++++++++++++++++++++++++ library/std/src/sys/unix/mod.rs | 2 + 4 files changed, 435 insertions(+), 392 deletions(-) create mode 100644 library/std/src/sys/unix/kernel_copy.rs diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index 31bfdb63386b5..b88bca2f2b4ff 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -45,15 +45,17 @@ where R: Read, W: Write, { - #[cfg(any(target_os = "linux", target_os = "android"))] - { - kernel_copy::copy_spec(reader, writer) + cfg_if::cfg_if! { + if #[cfg(any(target_os = "linux", target_os = "android"))] { + crate::sys::kernel_copy::copy_spec(reader, writer) + } else { + generic_copy(reader, writer) + } } - - #[cfg(not(any(target_os = "linux", target_os = "android")))] - generic_copy(reader, writer) } +/// The general read-write-loop implementation of +/// `io::copy` that is used when specializations are not available or not applicable. pub(crate) fn generic_copy(reader: &mut R, writer: &mut W) -> io::Result where R: Read, @@ -84,374 +86,3 @@ where written += len as u64; } } - -#[cfg(any(target_os = "linux", target_os = "android"))] -mod kernel_copy { - - use crate::cmp::min; - use crate::convert::TryInto; - use crate::fs::{File, Metadata}; - use crate::io::{ - BufRead, BufReader, BufWriter, Read, Result, StderrLock, StdinLock, StdoutLock, Take, Write, - }; - use crate::mem::ManuallyDrop; - use crate::net::TcpStream; - use crate::os::unix::fs::FileTypeExt; - use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; - use crate::process::{ChildStderr, ChildStdin, ChildStdout}; - use crate::sys::fs::{copy_regular_files, sendfile_splice, CopyResult, SpliceMode}; - - pub(super) fn copy_spec( - read: &mut R, - write: &mut W, - ) -> Result { - let copier = Copier { read, write }; - SpecCopy::copy(copier) - } - - /// This type represents either the inferred `FileType` of a `RawFd` based on the source - /// type from which it was extracted or the actual metadata - /// - /// The methods on this type only provide hints, due to `AsRawFd` and `FromRawFd` the inferred - /// type may be wrong. - enum FdMeta { - /// We obtained the FD from a type that can contain any type of `FileType` and queried the metadata - /// because it is cheaper than probing all possible syscalls (reader side) - Metadata(Metadata), - Socket, - Pipe, - /// We don't have any metadata, e.g. because the original type was `File` which can represent - /// any `FileType` and we did not query the metadata either since it did not seem beneficial - /// (writer side) - NoneObtained, - } - - impl FdMeta { - fn maybe_fifo(&self) -> bool { - match self { - FdMeta::Metadata(meta) => meta.file_type().is_fifo(), - FdMeta::Socket => false, - FdMeta::Pipe => true, - FdMeta::NoneObtained => true, - } - } - - fn potential_sendfile_source(&self) -> bool { - match self { - // procfs erronously shows 0 length on non-empty readable files. - // and if a file is truly empty then a `read` syscall will determine that and skip the write syscall - // thus there would be benefit from attempting sendfile - FdMeta::Metadata(meta) - if meta.file_type().is_file() && meta.len() > 0 - || meta.file_type().is_block_device() => - { - true - } - _ => false, - } - } - - fn copy_file_range_candidate(&self) -> bool { - match self { - // copy_file_range will fail on empty procfs files. `read` can determine whether EOF has been reached - // without extra cost and skip the write, thus there is no benefit in attempting copy_file_range - FdMeta::Metadata(meta) if meta.is_file() && meta.len() > 0 => true, - FdMeta::NoneObtained => true, - _ => false, - } - } - } - - struct CopyParams(FdMeta, Option); - - struct Copier<'a, 'b, R: Read + ?Sized, W: Write + ?Sized> { - pub read: &'a mut R, - pub write: &'b mut W, - } - - trait SpecCopy { - fn copy(self) -> Result; - } - - impl SpecCopy for Copier<'_, '_, R, W> { - default fn copy(self) -> Result { - super::generic_copy(self.read, self.write) - } - } - - impl SpecCopy for Copier<'_, '_, R, W> { - fn copy(self) -> Result { - let (reader, writer) = (self.read, self.write); - let r_cfg = reader.properties(); - let w_cfg = writer.properties(); - - // before direct operations on file descriptors ensure that all source and sink buffers are emtpy - let mut flush = || -> crate::io::Result { - let bytes = reader.drain_to(writer, u64::MAX)?; - // BufWriter buffered bytes have already been accounted for in earlier write() calls - writer.flush()?; - Ok(bytes) - }; - - let mut written = 0u64; - - if let (CopyParams(input_meta, Some(readfd)), CopyParams(output_meta, Some(writefd))) = - (r_cfg, w_cfg) - { - written += flush()?; - let max_write = reader.min_limit(); - - if input_meta.copy_file_range_candidate() && output_meta.copy_file_range_candidate() - { - let result = copy_regular_files(readfd, writefd, max_write); - - match result { - CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), - CopyResult::Ended(err) => return err, - CopyResult::Fallback(bytes) => written += bytes, - } - } - - // on modern kernels sendfile can copy from any mmapable type (some but not all regular files and block devices) - // to any writable file descriptor. On older kernels the writer side can only be a socket. - // So we just try and fallback if needed. - // If current file offsets + write sizes overflow it may also fail, we do not try to fix that and instead - // fall back to the generic copy loop. - if input_meta.potential_sendfile_source() { - let result = sendfile_splice(SpliceMode::Sendfile, readfd, writefd, max_write); - - match result { - CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), - CopyResult::Ended(err) => return err, - CopyResult::Fallback(bytes) => written += bytes, - } - } - - if input_meta.maybe_fifo() || output_meta.maybe_fifo() { - let result = sendfile_splice(SpliceMode::Splice, readfd, writefd, max_write); - - match result { - CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), - CopyResult::Ended(err) => return err, - CopyResult::Fallback(0) => { /* use fallback */ } - CopyResult::Fallback(_) => { - unreachable!("splice should not return > 0 bytes on the fallback path") - } - } - } - } - - match super::generic_copy(reader, writer) { - Ok(bytes) => Ok(bytes + written), - err => err, - } - } - } - - #[rustc_specialization_trait] - trait CopyRead: Read { - fn drain_to(&mut self, _writer: &mut W, _limit: u64) -> Result { - Ok(0) - } - - /// The minimum of the limit of all `Take<_>` wrappers, `u64::MAX` otherwise. - /// This method does not account for data `BufReader` buffers and would underreport - /// the limit of a `Take>>` type. Thus its result is only valid - /// after draining the buffers. - fn min_limit(&self) -> u64 { - u64::MAX - } - - fn properties(&self) -> CopyParams; - } - - #[rustc_specialization_trait] - trait CopyWrite: Write { - fn properties(&self) -> CopyParams; - } - - impl CopyRead for &mut T - where - T: CopyRead, - { - fn drain_to(&mut self, writer: &mut W, limit: u64) -> Result { - (**self).drain_to(writer, limit) - } - - fn min_limit(&self) -> u64 { - (**self).min_limit() - } - - fn properties(&self) -> CopyParams { - (**self).properties() - } - } - - impl CopyWrite for &mut T - where - T: CopyWrite, - { - fn properties(&self) -> CopyParams { - (**self).properties() - } - } - - impl CopyRead for File { - fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) - } - } - - impl CopyRead for &File { - fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(*self), Some(self.as_raw_fd())) - } - } - - impl CopyWrite for File { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) - } - } - - impl CopyWrite for &File { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) - } - } - - impl CopyRead for TcpStream { - fn properties(&self) -> CopyParams { - // avoid the stat syscall since we can be fairly sure it's a socket - CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) - } - } - - impl CopyRead for &TcpStream { - fn properties(&self) -> CopyParams { - // avoid the stat syscall since we can be fairly sure it's a socket - CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) - } - } - - impl CopyWrite for TcpStream { - fn properties(&self) -> CopyParams { - // avoid the stat syscall since we can be fairly sure it's a socket - CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) - } - } - - impl CopyWrite for &TcpStream { - fn properties(&self) -> CopyParams { - // avoid the stat syscall since we can be fairly sure it's a socket - CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) - } - } - - impl CopyWrite for ChildStdin { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) - } - } - - impl CopyRead for ChildStdout { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) - } - } - - impl CopyRead for ChildStderr { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) - } - } - - impl CopyRead for StdinLock<'_> { - fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { - let buf_reader = self.as_mut_buf(); - let buf = buf_reader.buffer(); - let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; - let bytes_drained = buf.len(); - writer.write_all(buf)?; - buf_reader.consume(bytes_drained); - - Ok(bytes_drained as u64) - } - - fn properties(&self) -> CopyParams { - CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) - } - } - - impl CopyWrite for StdoutLock<'_> { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) - } - } - - impl CopyWrite for StderrLock<'_> { - fn properties(&self) -> CopyParams { - CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) - } - } - - impl CopyRead for Take { - fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { - let local_limit = self.limit(); - let combined_limit = min(outer_limit, local_limit); - let bytes_drained = self.get_mut().drain_to(writer, combined_limit)?; - // update limit since read() was bypassed - self.set_limit(local_limit - bytes_drained); - - Ok(bytes_drained) - } - - fn min_limit(&self) -> u64 { - min(Take::limit(self), self.get_ref().min_limit()) - } - - fn properties(&self) -> CopyParams { - self.get_ref().properties() - } - } - - impl CopyRead for BufReader { - fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { - let buf = self.buffer(); - let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; - let bytes = buf.len(); - writer.write_all(buf)?; - self.consume(bytes); - - let remaining = outer_limit - bytes as u64; - - // in case of nested bufreaders we also need to drain the ones closer to the source - let inner_bytes = self.get_mut().drain_to(writer, remaining)?; - - Ok(bytes as u64 + inner_bytes) - } - - fn min_limit(&self) -> u64 { - self.get_ref().min_limit() - } - - fn properties(&self) -> CopyParams { - self.get_ref().properties() - } - } - - impl CopyWrite for BufWriter { - fn properties(&self) -> CopyParams { - self.get_ref().properties() - } - } - - fn fd_to_meta(fd: &T) -> FdMeta { - let fd = fd.as_raw_fd(); - let file: ManuallyDrop = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); - match file.metadata() { - Ok(meta) => FdMeta::Metadata(meta), - Err(_) => FdMeta::NoneObtained, - } - } -} diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index 0bab95053a499..81cc0a59eb61f 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1195,7 +1195,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { let max_len = u64::MAX; let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?; - return match copy_regular_files(reader.as_raw_fd(), writer.as_raw_fd(), max_len) { + match copy_regular_files(reader.as_raw_fd(), writer.as_raw_fd(), max_len) { CopyResult::Ended(result) => result, CopyResult::Fallback(written) => { // fallback is only > 0 on EOVERFLOW, which shouldn't happen @@ -1203,7 +1203,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { assert_eq!(0, written); io::copy::generic_copy(&mut reader, &mut writer) } - }; + } } /// linux-specific implementation that will attempt to use copy_file_range for copy offloading @@ -1330,19 +1330,7 @@ pub(crate) fn sendfile_splice( cvt(unsafe { libc::sendfile(writer, reader, ptr::null_mut(), chunk_size) }) } SpliceMode::Splice => cvt(unsafe { - libc::splice( - reader, - ptr::null_mut(), - writer, - ptr::null_mut(), - // default pipe size is 64KiB. try to only fill/drain half of that capacity - // so that the next loop iteration won't be put to sleep. - // If reader and writer operate at the same pace they will experience fewer blocking waits. - // This is only needed for splice since sendfile stays in kernel space when it has to block. - //crate::cmp::min(32*1024, chunk_size), - chunk_size, - 0, - ) + libc::splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0) }), }; diff --git a/library/std/src/sys/unix/kernel_copy.rs b/library/std/src/sys/unix/kernel_copy.rs new file mode 100644 index 0000000000000..4b7a5f508d9e1 --- /dev/null +++ b/library/std/src/sys/unix/kernel_copy.rs @@ -0,0 +1,422 @@ +//! This module contains specializations that can offload `io::copy()` operations on file descriptor +//! containing types (`File`, `TcpStream`, etc.) to more efficient syscalls than `read(2)` and `write(2)`. +//! +//! Specialization is only applied to wholly std-owned types so that user code can't observe +//! that the `Read` and `Write` traits are not used. +//! +//! Since a copy operation involves a reader and writer side where each can consist of different types +//! and also involve generic wrappers (e.g. `Take`, `BufReader`) it is not practical to specialize +//! a single method on all possible combinations. +//! +//! Instead readers and writers are handled separately by the `CopyRead` and `CopyWrite` specialization +//! traits and then specialized on by the `Copier::copy` method. +//! +//! `Copier` uses the specialization traits to unpack the underlying file descriptors and +//! additional prerequisites and constraints imposed by the wrapper types. +//! +//! Once it has obtained all necessary pieces and brought any wrapper types into a state where they +//! can be safely bypassed it will attempt to use the `copy_file_range(2)`, +//! `sendfile(2)` or `splice(2)` syscalls to move data directly between file descriptors. +//! Since those syscalls have requirements that cannot be fully checked in advance and +//! gathering additional information about file descriptors would require additional syscalls +//! anyway it simply attempts to use them one after another (guided by inaccurate hints) to +//! figure out which one works and and falls back to the generic read-write copy loop if none of them +//! does. +//! Once a working syscall is found for a pair of file descriptors it will be called in a loop +//! until the copy operation is completed. +//! +//! Advantages of using these syscalls: +//! +//! * fewer context switches since reads and writes are coalesced into a single syscall +//! and more bytes are transferred per syscall. This translates to higher throughput +//! and fewer CPU cycles, at least for sufficiently large transfers to amortize the initial probing. +//! * `copy_file_range` creates reflink copies on CoW filesystems, thus moving less data and +//! consuming less disk space +//! * `sendfile` and `splice` can perform zero-copy IO under some circumstances while +//! a naive copy loop would move every byte through the CPU. +//! +//! Drawbacks: +//! +//! * copy operations smaller than the default buffer size can under some circumstances, especially +//! on older kernels, incur more syscalls than the naive approach would. As mentioned above +//! the syscall selection is guided by hints to minimize this possibility but they are not perfect. +//! * optimizations only apply to std types. If a user adds a custom wrapper type, e.g. to report +//! progress, they can hit a performance cliff. +//! * complexity + +use crate::cmp::min; +use crate::convert::TryInto; +use crate::fs::{File, Metadata}; +use crate::io::copy::generic_copy; +use crate::io::{ + BufRead, BufReader, BufWriter, Read, Result, StderrLock, StdinLock, StdoutLock, Take, Write, +}; +use crate::mem::ManuallyDrop; +use crate::net::TcpStream; +use crate::os::unix::fs::FileTypeExt; +use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use crate::process::{ChildStderr, ChildStdin, ChildStdout}; +use crate::sys::fs::{copy_regular_files, sendfile_splice, CopyResult, SpliceMode}; + +pub(crate) fn copy_spec( + read: &mut R, + write: &mut W, +) -> Result { + let copier = Copier { read, write }; + SpecCopy::copy(copier) +} + +/// This type represents either the inferred `FileType` of a `RawFd` based on the source +/// type from which it was extracted or the actual metadata +/// +/// The methods on this type only provide hints, due to `AsRawFd` and `FromRawFd` the inferred +/// type may be wrong. +enum FdMeta { + /// We obtained the FD from a type that can contain any type of `FileType` and queried the metadata + /// because it is cheaper than probing all possible syscalls (reader side) + Metadata(Metadata), + Socket, + Pipe, + /// We don't have any metadata, e.g. because the original type was `File` which can represent + /// any `FileType` and we did not query the metadata either since it did not seem beneficial + /// (writer side) + NoneObtained, +} + +impl FdMeta { + fn maybe_fifo(&self) -> bool { + match self { + FdMeta::Metadata(meta) => meta.file_type().is_fifo(), + FdMeta::Socket => false, + FdMeta::Pipe => true, + FdMeta::NoneObtained => true, + } + } + + fn potential_sendfile_source(&self) -> bool { + match self { + // procfs erronously shows 0 length on non-empty readable files. + // and if a file is truly empty then a `read` syscall will determine that and skip the write syscall + // thus there would be benefit from attempting sendfile + FdMeta::Metadata(meta) + if meta.file_type().is_file() && meta.len() > 0 + || meta.file_type().is_block_device() => + { + true + } + _ => false, + } + } + + fn copy_file_range_candidate(&self) -> bool { + match self { + // copy_file_range will fail on empty procfs files. `read` can determine whether EOF has been reached + // without extra cost and skip the write, thus there is no benefit in attempting copy_file_range + FdMeta::Metadata(meta) if meta.is_file() && meta.len() > 0 => true, + FdMeta::NoneObtained => true, + _ => false, + } + } +} + +struct CopyParams(FdMeta, Option); + +struct Copier<'a, 'b, R: Read + ?Sized, W: Write + ?Sized> { + read: &'a mut R, + write: &'b mut W, +} + +trait SpecCopy { + fn copy(self) -> Result; +} + +impl SpecCopy for Copier<'_, '_, R, W> { + default fn copy(self) -> Result { + generic_copy(self.read, self.write) + } +} + +impl SpecCopy for Copier<'_, '_, R, W> { + fn copy(self) -> Result { + let (reader, writer) = (self.read, self.write); + let r_cfg = reader.properties(); + let w_cfg = writer.properties(); + + // before direct operations on file descriptors ensure that all source and sink buffers are empty + let mut flush = || -> crate::io::Result { + let bytes = reader.drain_to(writer, u64::MAX)?; + // BufWriter buffered bytes have already been accounted for in earlier write() calls + writer.flush()?; + Ok(bytes) + }; + + let mut written = 0u64; + + if let (CopyParams(input_meta, Some(readfd)), CopyParams(output_meta, Some(writefd))) = + (r_cfg, w_cfg) + { + written += flush()?; + let max_write = reader.min_limit(); + + if input_meta.copy_file_range_candidate() && output_meta.copy_file_range_candidate() { + let result = copy_regular_files(readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(bytes) => written += bytes, + } + } + + // on modern kernels sendfile can copy from any mmapable type (some but not all regular files and block devices) + // to any writable file descriptor. On older kernels the writer side can only be a socket. + // So we just try and fallback if needed. + // If current file offsets + write sizes overflow it may also fail, we do not try to fix that and instead + // fall back to the generic copy loop. + if input_meta.potential_sendfile_source() { + let result = sendfile_splice(SpliceMode::Sendfile, readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(bytes) => written += bytes, + } + } + + if input_meta.maybe_fifo() || output_meta.maybe_fifo() { + let result = sendfile_splice(SpliceMode::Splice, readfd, writefd, max_write); + + match result { + CopyResult::Ended(Ok(bytes_copied)) => return Ok(bytes_copied + written), + CopyResult::Ended(err) => return err, + CopyResult::Fallback(0) => { /* use the fallback below */ } + CopyResult::Fallback(_) => { + unreachable!("splice should not return > 0 bytes on the fallback path") + } + } + } + } + + // fallback if none of the more specialized syscalls wants to work with these file descriptors + match generic_copy(reader, writer) { + Ok(bytes) => Ok(bytes + written), + err => err, + } + } +} + +#[rustc_specialization_trait] +trait CopyRead: Read { + /// Implementations that contain buffers (i.e. `BufReader`) must transfer data from their internal + /// buffers into `writer` until either the buffers are emptied or `limit` bytes have been + /// transferred, whichever occurs sooner. + /// If nested buffers are present the outer buffers must be drained first. + /// + /// This is necessary to directly bypass the wrapper types while preserving the data order + /// when operating directly on the underlying file descriptors. + fn drain_to(&mut self, _writer: &mut W, _limit: u64) -> Result { + Ok(0) + } + + /// The minimum of the limit of all `Take<_>` wrappers, `u64::MAX` otherwise. + /// This method does not account for data `BufReader` buffers and would underreport + /// the limit of a `Take>>` type. Thus its result is only valid + /// after draining the buffers via `drain_to`. + fn min_limit(&self) -> u64 { + u64::MAX + } + + /// Extracts the file descriptor and hints/metadata, delegating through wrappers if necessary. + fn properties(&self) -> CopyParams; +} + +#[rustc_specialization_trait] +trait CopyWrite: Write { + /// Extracts the file descriptor and hints/metadata, delegating through wrappers if necessary. + fn properties(&self) -> CopyParams; +} + +impl CopyRead for &mut T +where + T: CopyRead, +{ + fn drain_to(&mut self, writer: &mut W, limit: u64) -> Result { + (**self).drain_to(writer, limit) + } + + fn min_limit(&self) -> u64 { + (**self).min_limit() + } + + fn properties(&self) -> CopyParams { + (**self).properties() + } +} + +impl CopyWrite for &mut T +where + T: CopyWrite, +{ + fn properties(&self) -> CopyParams { + (**self).properties() + } +} + +impl CopyRead for File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } +} + +impl CopyRead for &File { + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(*self), Some(self.as_raw_fd())) + } +} + +impl CopyWrite for File { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for &File { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyRead for TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyRead for &TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for &TcpStream { + fn properties(&self) -> CopyParams { + // avoid the stat syscall since we can be fairly sure it's a socket + CopyParams(FdMeta::Socket, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for ChildStdin { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } +} + +impl CopyRead for ChildStdout { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } +} + +impl CopyRead for ChildStderr { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::Pipe, Some(self.as_raw_fd())) + } +} + +impl CopyRead for StdinLock<'_> { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let buf_reader = self.as_mut_buf(); + let buf = buf_reader.buffer(); + let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; + let bytes_drained = buf.len(); + writer.write_all(buf)?; + buf_reader.consume(bytes_drained); + + Ok(bytes_drained as u64) + } + + fn properties(&self) -> CopyParams { + CopyParams(fd_to_meta(self), Some(self.as_raw_fd())) + } +} + +impl CopyWrite for StdoutLock<'_> { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyWrite for StderrLock<'_> { + fn properties(&self) -> CopyParams { + CopyParams(FdMeta::NoneObtained, Some(self.as_raw_fd())) + } +} + +impl CopyRead for Take { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let local_limit = self.limit(); + let combined_limit = min(outer_limit, local_limit); + let bytes_drained = self.get_mut().drain_to(writer, combined_limit)?; + // update limit since read() was bypassed + self.set_limit(local_limit - bytes_drained); + + Ok(bytes_drained) + } + + fn min_limit(&self) -> u64 { + min(Take::limit(self), self.get_ref().min_limit()) + } + + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } +} + +impl CopyRead for BufReader { + fn drain_to(&mut self, writer: &mut W, outer_limit: u64) -> Result { + let buf = self.buffer(); + let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))]; + let bytes = buf.len(); + writer.write_all(buf)?; + self.consume(bytes); + + let remaining = outer_limit - bytes as u64; + + // in case of nested bufreaders we also need to drain the ones closer to the source + let inner_bytes = self.get_mut().drain_to(writer, remaining)?; + + Ok(bytes as u64 + inner_bytes) + } + + fn min_limit(&self) -> u64 { + self.get_ref().min_limit() + } + + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } +} + +impl CopyWrite for BufWriter { + fn properties(&self) -> CopyParams { + self.get_ref().properties() + } +} + +fn fd_to_meta(fd: &T) -> FdMeta { + let fd = fd.as_raw_fd(); + let file: ManuallyDrop = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); + match file.metadata() { + Ok(meta) => FdMeta::Metadata(meta), + Err(_) => FdMeta::NoneObtained, + } +} diff --git a/library/std/src/sys/unix/mod.rs b/library/std/src/sys/unix/mod.rs index b28c6d85b7c72..7609afbdd76ac 100644 --- a/library/std/src/sys/unix/mod.rs +++ b/library/std/src/sys/unix/mod.rs @@ -51,6 +51,8 @@ pub mod fd; pub mod fs; pub mod futex; pub mod io; +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod kernel_copy; #[cfg(target_os = "l4re")] mod l4re; pub mod memchr; From 18bfe2a66beb79f912f8d1e344dda5f054cdd9c3 Mon Sep 17 00:00:00 2001 From: The8472 Date: Wed, 7 Oct 2020 23:07:04 +0200 Subject: [PATCH 09/13] move copy specialization tests to their own module --- library/std/src/io/copy.rs | 3 + library/std/src/io/copy/tests.rs | 179 ++++++++++++++++++++++++++++++ library/std/src/io/tests.rs | 181 ------------------------------- 3 files changed, 182 insertions(+), 181 deletions(-) create mode 100644 library/std/src/io/copy/tests.rs diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index b88bca2f2b4ff..db47ba97fe4b6 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -1,6 +1,9 @@ use crate::io::{self, ErrorKind, Read, Write}; use crate::mem::MaybeUninit; +#[cfg(all(test, unix))] +mod tests; + /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then diff --git a/library/std/src/io/copy/tests.rs b/library/std/src/io/copy/tests.rs new file mode 100644 index 0000000000000..fcff5fa934d1c --- /dev/null +++ b/library/std/src/io/copy/tests.rs @@ -0,0 +1,179 @@ +use crate::env::temp_dir; +use crate::fs::OpenOptions; +use crate::io; +use crate::io::Result; +use crate::io::SeekFrom; +use crate::io::{BufRead, Read, Seek, Write}; +use crate::os::unix::io::AsRawFd; + +#[test] +fn copy_specialization() -> Result<()> { + use crate::io::{BufReader, BufWriter}; + + let path = crate::env::temp_dir(); + let source_path = path.join("copy-spec.source"); + let sink_path = path.join("copy-spec.sink"); + + let result: Result<()> = try { + let mut source = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&source_path)?; + source.write_all(b"abcdefghiklmnopqr")?; + source.seek(SeekFrom::Start(8))?; + let mut source = BufReader::with_capacity(8, source.take(5)); + source.fill_buf()?; + assert_eq!(source.buffer(), b"iklmn"); + source.get_mut().set_limit(6); + source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" + let mut source = source.take(10); // "iklmnbcdef" + + let mut sink = crate::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&sink_path)?; + sink.write_all(b"000000")?; + let mut sink = BufWriter::with_capacity(5, sink); + sink.write_all(b"wxyz")?; + assert_eq!(sink.buffer(), b"wxyz"); + + let copied = crate::io::copy(&mut source, &mut sink)?; + assert_eq!(copied, 10); + assert_eq!(sink.buffer().len(), 0); + + let mut sink = sink.into_inner()?; + sink.seek(SeekFrom::Start(0))?; + let mut copied = Vec::new(); + sink.read_to_end(&mut copied)?; + assert_eq!(&copied, b"000000wxyziklmnbcdef"); + }; + + let rm1 = crate::fs::remove_file(source_path); + let rm2 = crate::fs::remove_file(sink_path); + + result.and(rm1).and(rm2) +} + +#[bench] +fn bench_file_to_file_copy(b: &mut test::Bencher) { + const BYTES: usize = 128 * 1024; + let src_path = temp_dir().join("file-copy-bench-src"); + let mut src = crate::fs::OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(src_path) + .unwrap(); + src.write(&vec![0u8; BYTES]).unwrap(); + + let sink_path = temp_dir().join("file-copy-bench-sink"); + let mut sink = crate::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(sink_path) + .unwrap(); + + b.bytes = BYTES as u64; + b.iter(|| { + src.seek(SeekFrom::Start(0)).unwrap(); + sink.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); + }); +} + +#[bench] +fn bench_file_to_socket_copy(b: &mut test::Bencher) { + const BYTES: usize = 128 * 1024; + let src_path = temp_dir().join("pipe-copy-bench-src"); + let mut src = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(src_path) + .unwrap(); + src.write(&vec![0u8; BYTES]).unwrap(); + + let sink_drainer = crate::net::TcpListener::bind("localhost:0").unwrap(); + let mut sink = crate::net::TcpStream::connect(sink_drainer.local_addr().unwrap()).unwrap(); + let mut sink_drainer = sink_drainer.accept().unwrap().0; + + crate::thread::spawn(move || { + let mut sink_buf = vec![0u8; 1024 * 1024]; + loop { + sink_drainer.read(&mut sink_buf[..]).unwrap(); + } + }); + + b.bytes = BYTES as u64; + b.iter(|| { + src.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); + }); +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +#[bench] +fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { + use crate::io::ErrorKind; + use crate::process::{ChildStdin, ChildStdout}; + use crate::sys_common::FromInner; + + let (read_end, write_end) = crate::sys::pipe::anon_pipe().unwrap(); + + let mut read_end = ChildStdout::from_inner(read_end); + let write_end = ChildStdin::from_inner(write_end); + + let acceptor = crate::net::TcpListener::bind("localhost:0").unwrap(); + let mut remote_end = crate::net::TcpStream::connect(acceptor.local_addr().unwrap()).unwrap(); + + let local_end = crate::sync::Arc::new(acceptor.accept().unwrap().0); + + crate::thread::spawn(move || { + let mut sink_buf = vec![0u8; 1024 * 1024]; + remote_end.set_nonblocking(true).unwrap(); + loop { + match remote_end.write(&mut sink_buf[..]) { + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Ok(_) => {} + err => { + err.expect("write failed"); + } + }; + match remote_end.read(&mut sink_buf[..]) { + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Ok(_) => {} + err => { + err.expect("read failed"); + } + }; + } + }); + + let local_source = local_end.clone(); + crate::thread::spawn(move || { + loop { + crate::sys::fs::sendfile_splice( + crate::sys::fs::SpliceMode::Splice, + local_source.as_raw_fd(), + write_end.as_raw_fd(), + u64::MAX, + ); + } + }); + + const BYTES: usize = 128 * 1024; + b.bytes = BYTES as u64; + b.iter(|| { + assert_eq!( + BYTES as u64, + io::copy(&mut (&mut read_end).take(BYTES as u64), &mut &*local_end).unwrap() + ); + }); +} diff --git a/library/std/src/io/tests.rs b/library/std/src/io/tests.rs index 9a0672e88b80f..f176c2f088cb3 100644 --- a/library/std/src/io/tests.rs +++ b/library/std/src/io/tests.rs @@ -1,15 +1,8 @@ use super::{repeat, Cursor, SeekFrom}; use crate::cmp::{self, min}; -use crate::env::temp_dir; -#[cfg(unix)] -use crate::fs::OpenOptions; -#[cfg(unix)] -use crate::io::Result; use crate::io::{self, IoSlice, IoSliceMut}; use crate::io::{BufRead, Read, Seek, Write}; use crate::ops::Deref; -#[cfg(unix)] -use crate::os::unix::io::AsRawFd; #[test] #[cfg_attr(target_os = "emscripten", ignore)] @@ -499,177 +492,3 @@ fn test_write_all_vectored() { } } } - -#[test] -#[cfg(unix)] -fn copy_specialization() -> Result<()> { - use crate::io::{BufReader, BufWriter}; - - let path = crate::env::temp_dir(); - let source_path = path.join("copy-spec.source"); - let sink_path = path.join("copy-spec.sink"); - - let result: Result<()> = try { - let mut source = crate::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&source_path)?; - source.write_all(b"abcdefghiklmnopqr")?; - source.seek(SeekFrom::Start(8))?; - let mut source = BufReader::with_capacity(8, source.take(5)); - source.fill_buf()?; - assert_eq!(source.buffer(), b"iklmn"); - source.get_mut().set_limit(6); - source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" - let mut source = source.take(10); // "iklmnbcdef" - - let mut sink = crate::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&sink_path)?; - sink.write_all(b"000000")?; - let mut sink = BufWriter::with_capacity(5, sink); - sink.write_all(b"wxyz")?; - assert_eq!(sink.buffer(), b"wxyz"); - - let copied = crate::io::copy(&mut source, &mut sink)?; - assert_eq!(copied, 10); - assert_eq!(sink.buffer().len(), 0); - - let mut sink = sink.into_inner()?; - sink.seek(SeekFrom::Start(0))?; - let mut copied = Vec::new(); - sink.read_to_end(&mut copied)?; - assert_eq!(&copied, b"000000wxyziklmnbcdef"); - }; - - let rm1 = crate::fs::remove_file(source_path); - let rm2 = crate::fs::remove_file(sink_path); - - result.and(rm1).and(rm2) -} - -#[bench] -fn bench_file_to_file_copy(b: &mut test::Bencher) { - const BYTES: usize = 128 * 1024; - let src_path = temp_dir().join("file-copy-bench-src"); - let mut src = crate::fs::OpenOptions::new() - .create(true) - .truncate(true) - .read(true) - .write(true) - .open(src_path) - .unwrap(); - src.write(&vec![0u8; BYTES]).unwrap(); - - let sink_path = temp_dir().join("file-copy-bench-sink"); - let mut sink = crate::fs::OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(sink_path) - .unwrap(); - - b.bytes = BYTES as u64; - b.iter(|| { - src.seek(SeekFrom::Start(0)).unwrap(); - sink.seek(SeekFrom::Start(0)).unwrap(); - assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); - }); -} - -#[cfg(unix)] -#[bench] -fn bench_file_to_socket_copy(b: &mut test::Bencher) { - const BYTES: usize = 128 * 1024; - let src_path = temp_dir().join("pipe-copy-bench-src"); - let mut src = OpenOptions::new() - .create(true) - .truncate(true) - .read(true) - .write(true) - .open(src_path) - .unwrap(); - src.write(&vec![0u8; BYTES]).unwrap(); - - let sink_drainer = crate::net::TcpListener::bind("localhost:0").unwrap(); - let mut sink = crate::net::TcpStream::connect(sink_drainer.local_addr().unwrap()).unwrap(); - let mut sink_drainer = sink_drainer.accept().unwrap().0; - - crate::thread::spawn(move || { - let mut sink_buf = vec![0u8; 1024 * 1024]; - loop { - sink_drainer.read(&mut sink_buf[..]).unwrap(); - } - }); - - b.bytes = BYTES as u64; - b.iter(|| { - src.seek(SeekFrom::Start(0)).unwrap(); - assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); - }); -} - -#[cfg(any(target_os = "linux", target_os = "android"))] -#[bench] -fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { - use crate::io::ErrorKind; - use crate::process::{ChildStdin, ChildStdout}; - use crate::sys_common::FromInner; - - let (read_end, write_end) = crate::sys::pipe::anon_pipe().unwrap(); - - let mut read_end = ChildStdout::from_inner(read_end); - let write_end = ChildStdin::from_inner(write_end); - - let acceptor = crate::net::TcpListener::bind("localhost:0").unwrap(); - let mut remote_end = crate::net::TcpStream::connect(acceptor.local_addr().unwrap()).unwrap(); - - let local_end = crate::sync::Arc::new(acceptor.accept().unwrap().0); - - crate::thread::spawn(move || { - let mut sink_buf = vec![0u8; 1024 * 1024]; - remote_end.set_nonblocking(true).unwrap(); - loop { - match remote_end.write(&mut sink_buf[..]) { - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Ok(_) => {} - err => { - err.expect("write failed"); - } - }; - match remote_end.read(&mut sink_buf[..]) { - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Ok(_) => {} - err => { - err.expect("read failed"); - } - }; - } - }); - - let local_source = local_end.clone(); - crate::thread::spawn(move || { - loop { - crate::sys::fs::sendfile_splice( - crate::sys::fs::SpliceMode::Splice, - local_source.as_raw_fd(), - write_end.as_raw_fd(), - u64::MAX, - ); - } - }); - - const BYTES: usize = 128 * 1024; - b.bytes = BYTES as u64; - b.iter(|| { - assert_eq!( - BYTES as u64, - io::copy(&mut (&mut read_end).take(BYTES as u64), &mut &*local_end).unwrap() - ); - }); -} From 888b1031bc267132c66dbba49b43a55a83452ca4 Mon Sep 17 00:00:00 2001 From: The8472 Date: Thu, 15 Oct 2020 01:51:47 +0200 Subject: [PATCH 10/13] limit visibility of copy offload helpers to sys::unix module --- library/std/src/io/copy.rs | 3 --- library/std/src/sys/unix/fs.rs | 8 ++++---- library/std/src/sys/unix/kernel_copy.rs | 3 +++ .../std/src/{io/copy => sys/unix/kernel_copy}/tests.rs | 0 4 files changed, 7 insertions(+), 7 deletions(-) rename library/std/src/{io/copy => sys/unix/kernel_copy}/tests.rs (100%) diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index db47ba97fe4b6..b88bca2f2b4ff 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -1,9 +1,6 @@ use crate::io::{self, ErrorKind, Read, Write}; use crate::mem::MaybeUninit; -#[cfg(all(test, unix))] -mod tests; - /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index 81cc0a59eb61f..0b162f9b30074 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1214,7 +1214,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { /// if one of the files' cursor +`max_len` would exceed u64::MAX (`EOVERFLOW`). /// If the initial file offset was 0 then `Fallback` will only contain `0`. #[cfg(any(target_os = "linux", target_os = "android"))] -pub(crate) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { +pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { use crate::cmp; use crate::sync::atomic::{AtomicBool, Ordering}; @@ -1302,12 +1302,12 @@ pub(crate) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> } #[derive(PartialEq)] -pub(crate) enum SpliceMode { +pub(super) enum SpliceMode { Sendfile, Splice, } -pub(crate) enum CopyResult { +pub(super) enum CopyResult { Ended(io::Result), Fallback(u64), } @@ -1315,7 +1315,7 @@ pub(crate) enum CopyResult { /// performs splice or sendfile between file descriptors /// Does _not_ fall back to a generic copy loop. #[cfg(any(target_os = "linux", target_os = "android"))] -pub(crate) fn sendfile_splice( +pub(super) fn sendfile_splice( mode: SpliceMode, reader: RawFd, writer: RawFd, diff --git a/library/std/src/sys/unix/kernel_copy.rs b/library/std/src/sys/unix/kernel_copy.rs index 4b7a5f508d9e1..d0a9401b30f9f 100644 --- a/library/std/src/sys/unix/kernel_copy.rs +++ b/library/std/src/sys/unix/kernel_copy.rs @@ -58,6 +58,9 @@ use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use crate::process::{ChildStderr, ChildStdin, ChildStdout}; use crate::sys::fs::{copy_regular_files, sendfile_splice, CopyResult, SpliceMode}; +#[cfg(test)] +mod tests; + pub(crate) fn copy_spec( read: &mut R, write: &mut W, diff --git a/library/std/src/io/copy/tests.rs b/library/std/src/sys/unix/kernel_copy/tests.rs similarity index 100% rename from library/std/src/io/copy/tests.rs rename to library/std/src/sys/unix/kernel_copy/tests.rs From 3dfc377aa12293ace29f9a055f0aeb634d107ed9 Mon Sep 17 00:00:00 2001 From: The8472 Date: Thu, 15 Oct 2020 03:07:57 +0200 Subject: [PATCH 11/13] move sendfile/splice/copy_file_range into kernel_copy module --- library/std/src/sys/unix/fs.rs | 151 +----------------- library/std/src/sys/unix/kernel_copy.rs | 148 ++++++++++++++++- library/std/src/sys/unix/kernel_copy/tests.rs | 4 +- 3 files changed, 150 insertions(+), 153 deletions(-) diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index 0b162f9b30074..8da2c7f13df14 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1195,6 +1195,8 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { let max_len = u64::MAX; let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?; + use super::kernel_copy::{copy_regular_files, CopyResult}; + match copy_regular_files(reader.as_raw_fd(), writer.as_raw_fd(), max_len) { CopyResult::Ended(result) => result, CopyResult::Fallback(written) => { @@ -1206,155 +1208,6 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { } } -/// linux-specific implementation that will attempt to use copy_file_range for copy offloading -/// as the name says, it only works on regular files -/// -/// Callers must handle fallback to a generic copy loop. -/// `Fallback` may indicate non-zero number of bytes already written -/// if one of the files' cursor +`max_len` would exceed u64::MAX (`EOVERFLOW`). -/// If the initial file offset was 0 then `Fallback` will only contain `0`. -#[cfg(any(target_os = "linux", target_os = "android"))] -pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { - use crate::cmp; - use crate::sync::atomic::{AtomicBool, Ordering}; - - // Kernel prior to 4.5 don't have copy_file_range - // We store the availability in a global to avoid unnecessary syscalls - static HAS_COPY_FILE_RANGE: AtomicBool = AtomicBool::new(true); - - unsafe fn copy_file_range( - fd_in: libc::c_int, - off_in: *mut libc::loff_t, - fd_out: libc::c_int, - off_out: *mut libc::loff_t, - len: libc::size_t, - flags: libc::c_uint, - ) -> libc::c_long { - libc::syscall(libc::SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags) - } - - let has_copy_file_range = HAS_COPY_FILE_RANGE.load(Ordering::Relaxed); - let mut written = 0u64; - while written < max_len { - let copy_result = if has_copy_file_range { - let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64); - // cap to 2GB chunks in case u64::MAX is passed in as file size and the file has a non-zero offset - // this allows us to copy large chunks without hitting the limit, - // unless someone sets a file offset close to u64::MAX - 2GB, in which case the fallback would kick in - let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x8000_0000usize); - let copy_result = unsafe { - // We actually don't have to adjust the offsets, - // because copy_file_range adjusts the file offset automatically - cvt(copy_file_range( - reader, - ptr::null_mut(), - writer, - ptr::null_mut(), - bytes_to_copy, - 0, - )) - }; - if let Err(ref copy_err) = copy_result { - match copy_err.raw_os_error() { - Some(libc::ENOSYS | libc::EPERM | libc::EOPNOTSUPP) => { - HAS_COPY_FILE_RANGE.store(false, Ordering::Relaxed); - } - _ => {} - } - } - copy_result - } else { - Err(io::Error::from_raw_os_error(libc::ENOSYS)) - }; - match copy_result { - Ok(0) if written == 0 => { - // fallback to work around several kernel bugs where copy_file_range will fail to - // copy any bytes and return 0 instead of an error if - // - reading virtual files from the proc filesystem which appear to have 0 size - // but are not empty. noted in coreutils to affect kernels at least up to 5.6.19. - // - copying from an overlay filesystem in docker. reported to occur on fedora 32. - return CopyResult::Fallback(0); - } - Ok(0) => return CopyResult::Ended(Ok(written)), // reached EOF - Ok(ret) => written += ret as u64, - Err(err) => { - match err.raw_os_error() { - // when file offset + max_length > u64::MAX - Some(libc::EOVERFLOW) => return CopyResult::Fallback(written), - Some( - libc::ENOSYS | libc::EXDEV | libc::EINVAL | libc::EPERM | libc::EOPNOTSUPP, - ) => { - // Try fallback io::copy if either: - // - Kernel version is < 4.5 (ENOSYS) - // - Files are mounted on different fs (EXDEV) - // - copy_file_range is broken in various ways on RHEL/CentOS 7 (EOPNOTSUPP) - // - copy_file_range is disallowed, for example by seccomp (EPERM) - // - copy_file_range cannot be used with pipes or device nodes (EINVAL) - assert_eq!(written, 0); - return CopyResult::Fallback(0); - } - _ => return CopyResult::Ended(Err(err)), - } - } - } - } - CopyResult::Ended(Ok(written)) -} - -#[derive(PartialEq)] -pub(super) enum SpliceMode { - Sendfile, - Splice, -} - -pub(super) enum CopyResult { - Ended(io::Result), - Fallback(u64), -} - -/// performs splice or sendfile between file descriptors -/// Does _not_ fall back to a generic copy loop. -#[cfg(any(target_os = "linux", target_os = "android"))] -pub(super) fn sendfile_splice( - mode: SpliceMode, - reader: RawFd, - writer: RawFd, - len: u64, -) -> CopyResult { - let mut written = 0u64; - while written < len { - let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; - - let result = match mode { - SpliceMode::Sendfile => { - cvt(unsafe { libc::sendfile(writer, reader, ptr::null_mut(), chunk_size) }) - } - SpliceMode::Splice => cvt(unsafe { - libc::splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0) - }), - }; - - match result { - Ok(0) => break, // EOF - Ok(ret) => written += ret as u64, - Err(err) => { - match err.raw_os_error() { - Some(os_err) if os_err == libc::EINVAL => { - // splice/sendfile do not support this particular file descritor (EINVAL) - assert_eq!(written, 0); - return CopyResult::Fallback(0); - } - Some(os_err) if mode == SpliceMode::Sendfile && os_err == libc::EOVERFLOW => { - return CopyResult::Fallback(written); - } - _ => return CopyResult::Ended(Err(err)), - } - } - } - } - CopyResult::Ended(Ok(written)) -} - #[cfg(any(target_os = "macos", target_os = "ios"))] pub fn copy(from: &Path, to: &Path) -> io::Result { use crate::sync::atomic::{AtomicBool, Ordering}; diff --git a/library/std/src/sys/unix/kernel_copy.rs b/library/std/src/sys/unix/kernel_copy.rs index d0a9401b30f9f..6d4dcc30b45f5 100644 --- a/library/std/src/sys/unix/kernel_copy.rs +++ b/library/std/src/sys/unix/kernel_copy.rs @@ -49,14 +49,16 @@ use crate::convert::TryInto; use crate::fs::{File, Metadata}; use crate::io::copy::generic_copy; use crate::io::{ - BufRead, BufReader, BufWriter, Read, Result, StderrLock, StdinLock, StdoutLock, Take, Write, + BufRead, BufReader, BufWriter, Error, Read, Result, StderrLock, StdinLock, StdoutLock, Take, + Write, }; use crate::mem::ManuallyDrop; use crate::net::TcpStream; use crate::os::unix::fs::FileTypeExt; use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use crate::process::{ChildStderr, ChildStdin, ChildStdout}; -use crate::sys::fs::{copy_regular_files, sendfile_splice, CopyResult, SpliceMode}; +use crate::ptr; +use crate::sys::cvt; #[cfg(test)] mod tests; @@ -423,3 +425,145 @@ fn fd_to_meta(fd: &T) -> FdMeta { Err(_) => FdMeta::NoneObtained, } } + +pub(super) enum CopyResult { + Ended(Result), + Fallback(u64), +} + +/// linux-specific implementation that will attempt to use copy_file_range for copy offloading +/// as the name says, it only works on regular files +/// +/// Callers must handle fallback to a generic copy loop. +/// `Fallback` may indicate non-zero number of bytes already written +/// if one of the files' cursor +`max_len` would exceed u64::MAX (`EOVERFLOW`). +/// If the initial file offset was 0 then `Fallback` will only contain `0`. +pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { + use crate::cmp; + use crate::sync::atomic::{AtomicBool, Ordering}; + + // Kernel prior to 4.5 don't have copy_file_range + // We store the availability in a global to avoid unnecessary syscalls + static HAS_COPY_FILE_RANGE: AtomicBool = AtomicBool::new(true); + + unsafe fn copy_file_range( + fd_in: libc::c_int, + off_in: *mut libc::loff_t, + fd_out: libc::c_int, + off_out: *mut libc::loff_t, + len: libc::size_t, + flags: libc::c_uint, + ) -> libc::c_long { + libc::syscall(libc::SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags) + } + + let has_copy_file_range = HAS_COPY_FILE_RANGE.load(Ordering::Relaxed); + let mut written = 0u64; + while written < max_len { + let copy_result = if has_copy_file_range { + let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64); + // cap to 2GB chunks in case u64::MAX is passed in as file size and the file has a non-zero offset + // this allows us to copy large chunks without hitting the limit, + // unless someone sets a file offset close to u64::MAX - 2GB, in which case the fallback would kick in + let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x8000_0000usize); + let copy_result = unsafe { + // We actually don't have to adjust the offsets, + // because copy_file_range adjusts the file offset automatically + cvt(copy_file_range( + reader, + ptr::null_mut(), + writer, + ptr::null_mut(), + bytes_to_copy, + 0, + )) + }; + if let Err(ref copy_err) = copy_result { + match copy_err.raw_os_error() { + Some(libc::ENOSYS | libc::EPERM | libc::EOPNOTSUPP) => { + HAS_COPY_FILE_RANGE.store(false, Ordering::Relaxed); + } + _ => {} + } + } + copy_result + } else { + Err(Error::from_raw_os_error(libc::ENOSYS)) + }; + match copy_result { + Ok(0) if written == 0 => { + // fallback to work around several kernel bugs where copy_file_range will fail to + // copy any bytes and return 0 instead of an error if + // - reading virtual files from the proc filesystem which appear to have 0 size + // but are not empty. noted in coreutils to affect kernels at least up to 5.6.19. + // - copying from an overlay filesystem in docker. reported to occur on fedora 32. + return CopyResult::Fallback(0); + } + Ok(0) => return CopyResult::Ended(Ok(written)), // reached EOF + Ok(ret) => written += ret as u64, + Err(err) => { + return match err.raw_os_error() { + // when file offset + max_length > u64::MAX + Some(libc::EOVERFLOW) => CopyResult::Fallback(written), + Some( + libc::ENOSYS | libc::EXDEV | libc::EINVAL | libc::EPERM | libc::EOPNOTSUPP, + ) => { + // Try fallback io::copy if either: + // - Kernel version is < 4.5 (ENOSYS) + // - Files are mounted on different fs (EXDEV) + // - copy_file_range is broken in various ways on RHEL/CentOS 7 (EOPNOTSUPP) + // - copy_file_range is disallowed, for example by seccomp (EPERM) + // - copy_file_range cannot be used with pipes or device nodes (EINVAL) + assert_eq!(written, 0); + CopyResult::Fallback(0) + } + _ => CopyResult::Ended(Err(err)), + }; + } + } + } + CopyResult::Ended(Ok(written)) +} + +#[derive(PartialEq)] +enum SpliceMode { + Sendfile, + Splice, +} + +/// performs splice or sendfile between file descriptors +/// Does _not_ fall back to a generic copy loop. +fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> CopyResult { + let mut written = 0u64; + while written < len { + let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; + + let result = match mode { + SpliceMode::Sendfile => { + cvt(unsafe { libc::sendfile(writer, reader, ptr::null_mut(), chunk_size) }) + } + SpliceMode::Splice => cvt(unsafe { + libc::splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0) + }), + }; + + match result { + Ok(0) => break, // EOF + Ok(ret) => written += ret as u64, + Err(err) => { + return match err.raw_os_error() { + Some(os_err) if os_err == libc::EINVAL => { + // splice/sendfile do not support this particular file descritor (EINVAL) + assert_eq!(written, 0); + CopyResult::Fallback(0) + } + Some(os_err) if mode == SpliceMode::Sendfile && os_err == libc::EOVERFLOW => { + CopyResult::Fallback(written) + } + _ => CopyResult::Ended(Err(err)), + }; + } + } + } + CopyResult::Ended(Ok(written)) +} diff --git a/library/std/src/sys/unix/kernel_copy/tests.rs b/library/std/src/sys/unix/kernel_copy/tests.rs index fcff5fa934d1c..889216924534c 100644 --- a/library/std/src/sys/unix/kernel_copy/tests.rs +++ b/library/std/src/sys/unix/kernel_copy/tests.rs @@ -159,8 +159,8 @@ fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { let local_source = local_end.clone(); crate::thread::spawn(move || { loop { - crate::sys::fs::sendfile_splice( - crate::sys::fs::SpliceMode::Splice, + super::sendfile_splice( + super::SpliceMode::Splice, local_source.as_raw_fd(), write_end.as_raw_fd(), u64::MAX, From 4854d418a5245b07eca7dbec92a29d18af13a821 Mon Sep 17 00:00:00 2001 From: The8472 Date: Fri, 6 Nov 2020 23:36:23 +0100 Subject: [PATCH 12/13] do direct splice syscall and probe availability to get android builds to work Android builds use feature level 14, the libc wrapper for splice is gated on feature level 21+ so we have to invoke the syscall directly. Additionally the emulator doesn't seem to support it so we also have to add ENOSYS checks. --- library/std/src/sys/unix/kernel_copy.rs | 42 +++++++++++++++++-- library/std/src/sys/unix/kernel_copy/tests.rs | 34 +++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/library/std/src/sys/unix/kernel_copy.rs b/library/std/src/sys/unix/kernel_copy.rs index 6d4dcc30b45f5..99533dd3c07d6 100644 --- a/library/std/src/sys/unix/kernel_copy.rs +++ b/library/std/src/sys/unix/kernel_copy.rs @@ -58,6 +58,7 @@ use crate::os::unix::fs::FileTypeExt; use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use crate::process::{ChildStderr, ChildStdin, ChildStdout}; use crate::ptr; +use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sys::cvt; #[cfg(test)] @@ -440,7 +441,6 @@ pub(super) enum CopyResult { /// If the initial file offset was 0 then `Fallback` will only contain `0`. pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { use crate::cmp; - use crate::sync::atomic::{AtomicBool, Ordering}; // Kernel prior to 4.5 don't have copy_file_range // We store the availability in a global to avoid unnecessary syscalls @@ -534,6 +534,30 @@ enum SpliceMode { /// performs splice or sendfile between file descriptors /// Does _not_ fall back to a generic copy loop. fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> CopyResult { + static HAS_SENDFILE: AtomicBool = AtomicBool::new(true); + static HAS_SPLICE: AtomicBool = AtomicBool::new(true); + + syscall! { + fn splice( + srcfd: libc::c_int, + src_offset: *const i64, + dstfd: libc::c_int, + dst_offset: *const i64, + len: libc::size_t, + flags: libc::c_int + ) -> libc::ssize_t + } + + match mode { + SpliceMode::Sendfile if !HAS_SENDFILE.load(Ordering::Relaxed) => { + return CopyResult::Fallback(0); + } + SpliceMode::Splice if !HAS_SPLICE.load(Ordering::Relaxed) => { + return CopyResult::Fallback(0); + } + _ => (), + } + let mut written = 0u64; while written < len { let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; @@ -543,7 +567,7 @@ fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> cvt(unsafe { libc::sendfile(writer, reader, ptr::null_mut(), chunk_size) }) } SpliceMode::Splice => cvt(unsafe { - libc::splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0) + splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0) }), }; @@ -552,8 +576,18 @@ fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> Ok(ret) => written += ret as u64, Err(err) => { return match err.raw_os_error() { - Some(os_err) if os_err == libc::EINVAL => { - // splice/sendfile do not support this particular file descritor (EINVAL) + Some(libc::ENOSYS | libc::EPERM) => { + // syscall not supported (ENOSYS) + // syscall is disallowed, e.g. by seccomp (EPERM) + match mode { + SpliceMode::Sendfile => HAS_SENDFILE.store(false, Ordering::Relaxed), + SpliceMode::Splice => HAS_SPLICE.store(false, Ordering::Relaxed), + } + assert_eq!(written, 0); + CopyResult::Fallback(0) + } + Some(libc::EINVAL) => { + // splice/sendfile do not support this particular file descriptor (EINVAL) assert_eq!(written, 0); CopyResult::Fallback(0) } diff --git a/library/std/src/sys/unix/kernel_copy/tests.rs b/library/std/src/sys/unix/kernel_copy/tests.rs index 889216924534c..21b121c26fffc 100644 --- a/library/std/src/sys/unix/kernel_copy/tests.rs +++ b/library/std/src/sys/unix/kernel_copy/tests.rs @@ -121,6 +121,7 @@ fn bench_file_to_socket_copy(b: &mut test::Bencher) { #[cfg(any(target_os = "linux", target_os = "android"))] #[bench] fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { + use super::CopyResult; use crate::io::ErrorKind; use crate::process::{ChildStdin, ChildStdout}; use crate::sys_common::FromInner; @@ -135,6 +136,21 @@ fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { let local_end = crate::sync::Arc::new(acceptor.accept().unwrap().0); + // the data flow in this benchmark: + // + // socket(tx) local_source + // remote_end (write) +--------> (splice to) + // write_end + // + + // | + // | pipe + // v + // read_end + // remote_end (read) <---------+ (splice to) * + // socket(rx) local_end + // + // * benchmark loop using io::copy + crate::thread::spawn(move || { let mut sink_buf = vec![0u8; 1024 * 1024]; remote_end.set_nonblocking(true).unwrap(); @@ -156,6 +172,24 @@ fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { } }); + // check that splice works, otherwise the benchmark would hang + let probe = super::sendfile_splice( + super::SpliceMode::Splice, + local_end.as_raw_fd(), + write_end.as_raw_fd(), + 1, + ); + + match probe { + CopyResult::Ended(Ok(1)) => { + // splice works + } + _ => { + eprintln!("splice failed, skipping benchmark"); + return; + } + } + let local_source = local_end.clone(); crate::thread::spawn(move || { loop { From bbfa92c82debed28417350b15fc6a2f46135346d Mon Sep 17 00:00:00 2001 From: The8472 Date: Thu, 12 Nov 2020 23:39:49 +0100 Subject: [PATCH 13/13] Always handle EOVERFLOW by falling back to the generic copy loop Previously EOVERFLOW handling was only applied for io::copy specialization but not for fs::copy sharing the same code. Additionally we lower the chunk size to 1GB since we have a user report that older kernels may return EINVAL when passing 0x8000_0000 but smaller values succeed. --- library/std/src/sys/unix/fs.rs | 10 ++++------ library/std/src/sys/unix/kernel_copy.rs | 10 +++++----- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/library/std/src/sys/unix/fs.rs b/library/std/src/sys/unix/fs.rs index 8da2c7f13df14..15054fdad8412 100644 --- a/library/std/src/sys/unix/fs.rs +++ b/library/std/src/sys/unix/fs.rs @@ -1199,12 +1199,10 @@ pub fn copy(from: &Path, to: &Path) -> io::Result { match copy_regular_files(reader.as_raw_fd(), writer.as_raw_fd(), max_len) { CopyResult::Ended(result) => result, - CopyResult::Fallback(written) => { - // fallback is only > 0 on EOVERFLOW, which shouldn't happen - // because the copy loop starts at a file offset 0 and countns down from `len` - assert_eq!(0, written); - io::copy::generic_copy(&mut reader, &mut writer) - } + CopyResult::Fallback(written) => match io::copy::generic_copy(&mut reader, &mut writer) { + Ok(bytes) => Ok(bytes + written), + Err(e) => Err(e), + }, } } diff --git a/library/std/src/sys/unix/kernel_copy.rs b/library/std/src/sys/unix/kernel_copy.rs index 99533dd3c07d6..ac2fcfcb53f72 100644 --- a/library/std/src/sys/unix/kernel_copy.rs +++ b/library/std/src/sys/unix/kernel_copy.rs @@ -438,7 +438,6 @@ pub(super) enum CopyResult { /// Callers must handle fallback to a generic copy loop. /// `Fallback` may indicate non-zero number of bytes already written /// if one of the files' cursor +`max_len` would exceed u64::MAX (`EOVERFLOW`). -/// If the initial file offset was 0 then `Fallback` will only contain `0`. pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult { use crate::cmp; @@ -462,10 +461,10 @@ pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> while written < max_len { let copy_result = if has_copy_file_range { let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64); - // cap to 2GB chunks in case u64::MAX is passed in as file size and the file has a non-zero offset - // this allows us to copy large chunks without hitting the limit, - // unless someone sets a file offset close to u64::MAX - 2GB, in which case the fallback would kick in - let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x8000_0000usize); + // cap to 1GB chunks in case u64::MAX is passed as max_len and the file has a non-zero seek position + // this allows us to copy large chunks without hitting EOVERFLOW, + // unless someone sets a file offset close to u64::MAX - 1GB, in which case a fallback would be required + let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x4000_0000usize); let copy_result = unsafe { // We actually don't have to adjust the offsets, // because copy_file_range adjusts the file offset automatically @@ -560,6 +559,7 @@ fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> let mut written = 0u64; while written < len { + // according to its manpage that's the maximum size sendfile() will copy per invocation let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize; let result = match mode {