Skip to content

Commit

Permalink
specialize io::copy to use the memory of the writer if it is a BufWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
the8472 committed Jan 31, 2021
1 parent 0e63af5 commit 4105506
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 8 deletions.
14 changes: 13 additions & 1 deletion library/std/src/io/buffered/bufwriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<W: Write> BufWriter<W> {
/// "successfully written" (by returning nonzero success values from
/// `write`), any 0-length writes from `inner` must be reported as i/o
/// errors from this method.
pub(super) fn flush_buf(&mut self) -> io::Result<()> {
pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> {
/// Helper struct to ensure the buffer is updated after all the writes
/// are complete. It tracks the number of written bytes and drains them
/// all from the front of the buffer when dropped.
Expand Down Expand Up @@ -243,6 +243,18 @@ impl<W: Write> BufWriter<W> {
&self.buf
}

/// Returns a mutable reference to the internal buffer.
///
/// This can be used to write data directly into the buffer without triggering writers
/// to the underlying writer.
///
/// That the buffer is a `Vec` is an implementation detail.
/// Callers should not modify the capacity as there currently is no public API to do so
/// and thus any capacity changes would be unexpected by the user.
pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
}

/// Returns the number of bytes the internal buffer can hold without flushing.
///
/// # Examples
Expand Down
80 changes: 74 additions & 6 deletions library/std/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::io::{self, ErrorKind, Read, Write};
use super::{BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::mem::MaybeUninit;

/// Copies the entire contents of a reader into a writer.
Expand Down Expand Up @@ -40,7 +40,7 @@ use crate::mem::MaybeUninit;
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> Result<u64>
where
R: Read,
W: Write,
Expand All @@ -54,14 +54,82 @@ where
}
}

/// The general read-write-loop implementation of
/// `io::copy` that is used when specializations are not available or not applicable.
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
/// The userspace read-write-loop implementation of `io::copy` that is used when
/// OS-specific specializations for copy offloading are not available or not applicable.
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> Result<u64>
where
R: Read,
W: Write,
{
let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit();
BufferedCopySpec::copy_to(reader, writer)
}

/// Specialization of the read-write loop that either uses a stack buffer
/// or reuses the internal buffer of a BufWriter
trait BufferedCopySpec: Write {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64>;
}

impl<W: Write + ?Sized> BufferedCopySpec for W {
default fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
stack_buffer_copy(reader, writer)
}
}

impl<I: Write> BufferedCopySpec for BufWriter<I> {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
if writer.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, writer);
}

// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
// _uninitialized_ integers, which is **undefined behavior**
//
// - Only the standard library gets to soundly "ignore" this,
// based on its privileged knowledge of unstable rustc
// internals;
unsafe {
let spare_cap = writer.buffer_mut().spare_capacity_mut();
reader.initializer().initialize(MaybeUninit::slice_assume_init_mut(spare_cap));
}

let mut len = 0;

loop {
let buf = writer.buffer_mut();
let spare_cap = buf.spare_capacity_mut();

if spare_cap.len() >= DEFAULT_BUF_SIZE {
match reader.read(unsafe { MaybeUninit::slice_assume_init_mut(spare_cap) }) {
Ok(0) => return Ok(len), // EOF reached
Ok(bytes_read) => {
assert!(bytes_read <= spare_cap.len());
// Safety: The initializer contract guarantees that either it or `read`
// will have initialized these bytes. And we just checked that the number
// of bytes is within the buffer capacity.
unsafe { buf.set_len(buf.len() + bytes_read) };
len += bytes_read as u64;
// Read again if the buffer still has enough capacity, as BufWriter itself would do
// This will occur if the reader returns short reads
continue;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}

writer.flush_buf()?;
}
}
}

fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
reader: &mut R,
writer: &mut W,
) -> Result<u64> {
let mut buf = MaybeUninit::<[u8; DEFAULT_BUF_SIZE]>::uninit();
// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
Expand Down
50 changes: 49 additions & 1 deletion library/std/src/io/util/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::cmp::{max, min};
use crate::io::prelude::*;
use crate::io::{copy, empty, repeat, sink, Empty, Repeat, SeekFrom, Sink};
use crate::io::{
copy, empty, repeat, sink, BufWriter, Empty, Repeat, Result, SeekFrom, Sink, DEFAULT_BUF_SIZE,
};

#[test]
fn copy_copies() {
Expand All @@ -11,6 +14,51 @@ fn copy_copies() {
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}

struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}

impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}

struct WriteObserver {
observed_buffer: usize,
}

impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}

fn flush(&mut self) -> Result<()> {
Ok(())
}
}

#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}

#[test]
fn sink_sinks() {
let mut s = sink();
Expand Down
2 changes: 2 additions & 0 deletions library/std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
#![feature(maybe_uninit_extra)]
#![feature(maybe_uninit_ref)]
#![feature(maybe_uninit_slice)]
#![feature(maybe_uninit_uninit_array)]
#![feature(min_specialization)]
#![feature(needs_panic_runtime)]
#![feature(negative_impls)]
Expand Down Expand Up @@ -326,6 +327,7 @@
#![feature(unsafe_cell_raw_get)]
#![feature(unwind_attributes)]
#![feature(vec_into_raw_parts)]
#![feature(vec_spare_capacity)]
#![feature(wake_trait)]
// NB: the above list is sorted to minimize merge conflicts.
#![default_lib_allocator]
Expand Down

0 comments on commit 4105506

Please sign in to comment.