Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: add AsyncReadExt::read_buf #3003

Merged
merged 6 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ full = [
]

fs = []
io-util = ["memchr"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
Expand All @@ -58,6 +58,7 @@ net = [
"mio/uds",
]
process = [
"bytes",
"lazy_static",
"libc",
"mio/os-poll",
Expand Down Expand Up @@ -88,10 +89,10 @@ time = []
[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }

bytes = "0.5.0"
pin-project-lite = "0.1.1"

# Everything else is optional...
bytes = { version = "0.6.0", optional = true }
fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
Expand Down
68 changes: 68 additions & 0 deletions tokio/src/io/util/async_read_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_int::{
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
Expand All @@ -12,6 +13,8 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
use crate::io::AsyncRead;

use bytes::BufMut;

cfg_io_util! {
/// Defines numeric reader
macro_rules! read_impl {
Expand Down Expand Up @@ -163,6 +166,71 @@ cfg_io_util! {
read(self, buf)
}

/// Pulls some bytes from this source into the specified buffer,
/// advancing the buffer's internal cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// Usually, only a single `read` syscall is issued, even if there is
/// more space in the supplied buffer.
///
/// This function does not provide any guarantees about whether it
/// completes immediately or asynchronously
///
/// # Return
///
/// On a successful read, the number of read bytes is returned. If the
/// supplied buffer is not empty and the function returns `Ok(0)` then
/// the source as reached an "end-of-file" event.
///
/// # Errors
///
/// If this function encounters any form of I/O or other error, an error
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
/// # Examples
///
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
///
/// [`File`]: crate::fs::File
/// [`BytesMut`]: bytes::BytesMut
/// [`BufMut`]: bytes::BufMut
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::{self, AsyncReadExt};
///
/// use bytes::BytesMut;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut f = File::open("foo.txt").await?;
/// let mut buffer = BytesMut::with_capacity(10);
///
/// assert!(buffer.is_empty());
///
/// // read up to 10 bytes, note that the return value is not needed
/// // to access the data that was read as `buffer`'s internal
/// // cursor is updated.
/// f.read_buf(&mut buffer).await?;
///
/// println!("The bytes: {:?}", &buffer[..]);
/// Ok(())
/// }
/// ```
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where
Self: Sized + Unpin,
B: BufMut,
{
read_buf(self, buf)
}

/// Reads the exact number of bytes required to fill `buf`.
///
/// Equivalent to:
Expand Down
77 changes: 77 additions & 0 deletions tokio/src/io/util/async_write_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
use crate::io::util::write_buf::{write_buf, WriteBuf};
use crate::io::util::write_int::{
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
WriteI8,
Expand All @@ -12,6 +13,8 @@ use crate::io::util::write_int::{
};
use crate::io::AsyncWrite;

use bytes::Buf;

cfg_io_util! {
/// Defines numeric writer
macro_rules! write_impl {
Expand Down Expand Up @@ -116,6 +119,80 @@ cfg_io_util! {
write(self, src)
}


/// Writes a buffer into this writer, advancing the buffer's internal
/// cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// This function will attempt to write the entire contents of `buf`, but
/// the entire write may not succeed, or the write may also generate an
/// error. After the operation completes, the buffer's
/// internal cursor is advanced by the number of bytes written. A
/// subsequent call to `write_buf` using the **same** `buf` value will
/// resume from the point that the first call to `write_buf` completed.
/// A call to `write` represents *at most one* attempt to write to any
carllerche marked this conversation as resolved.
Show resolved Hide resolved
/// wrapped object.
///
/// # Return
///
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
/// buf.len()`. A return value of `0` typically means that the
/// underlying object is no longer able to accept bytes and will likely
/// not be able to in the future as well, or that the buffer provided is
/// empty.
///
/// # Errors
///
/// Each call to `write` may generate an I/O error indicating that the
/// operation could not be completed. If an error is returned then no bytes
/// in the buffer were written to this writer.
///
/// It is **not** considered an error if the entire buffer could not be
/// written to this writer.
///
/// # Examples
///
/// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`File`]: crate::fs::File
/// [`Buf`]: bytes::Buf
///
/// ```no_run
/// use tokio::io::{self, AsyncWriteExt};
/// use tokio::fs::File;
///
/// use bytes::Buf;
/// use std::io::Cursor;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// let mut buffer = Cursor::new(b"data to write");
///
/// // Loop until the entire contents of the buffer are written to
/// // the file.
/// while buffer.has_remaining() {
/// // Writes some prefix of the byte string, not necessarily
/// // all of it.
/// file.write_buf(&mut buffer).await?;
/// }
///
/// Ok(())
/// }
/// ```
fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B>
where
Self: Sized + Unpin,
B: Buf,
{
write_buf(self, src)
}

/// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ cfg_io_util! {
pub use mem::{duplex, DuplexStream};

mod read;
mod read_buf;
mod read_exact;
mod read_int;
mod read_line;
Expand Down Expand Up @@ -70,6 +71,7 @@ cfg_io_util! {

mod write;
mod write_all;
mod write_buf;
mod write_int;


Expand Down
72 changes: 72 additions & 0 deletions tokio/src/io/util/read_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::io::AsyncRead;

use bytes::BufMut;
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};

pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B>
where
R: AsyncRead + Unpin,
B: BufMut,
{
ReadBuf {
reader,
buf,
_pin: PhantomPinned,
}
}

pin_project! {
/// Future returned by [`read_buf`](crate::io::AsyncReadExt::read_buf).
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadBuf<'a, R, B> {
reader: &'a mut R,
buf: &'a mut B,
#[pin]
_pin: PhantomPinned,
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<R, B> Future for ReadBuf<'_, R, B>
where
R: AsyncRead + Unpin,
B: BufMut,
{
type Output = io::Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
use crate::io::ReadBuf;
use std::mem::MaybeUninit;

let me = self.project();

if !me.buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let n = {
let dst = me.buf.bytes_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
ready!(Pin::new(me.reader).poll_read(cx, &mut buf)?);

// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This the right way to ensure safety?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used std::ptr::eq elsewhere, but it should be the same.

buf.filled().len()
};

// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
me.buf.advance_mut(n);
}

Poll::Ready(Ok(n))
}
}
3 changes: 2 additions & 1 deletion tokio/src/io/util/read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ fn reserve(buf: &mut Vec<u8>, bytes: usize) {

/// Returns the unused capacity of the provided vector.
fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
bytes::BufMut::bytes_mut(buf)
let uninit = bytes::BufMut::bytes_mut(buf);
unsafe { &mut *(uninit as *mut _ as *mut [MaybeUninit<u8>]) }
}

impl<A> Future for ReadToEnd<'_, A>
Expand Down
55 changes: 55 additions & 0 deletions tokio/src/io/util/write_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::io::AsyncWrite;

use bytes::Buf;
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
/// A future to write some of the buffer to an `AsyncWrite`.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteBuf<'a, W, B> {
writer: &'a mut W,
buf: &'a mut B,
#[pin]
_pin: PhantomPinned,
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Tries to write some bytes from the given `buf` to the writer in an
/// asynchronous manner, returning a future.
pub(crate) fn write_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteBuf<'a, W, B>
where
W: AsyncWrite + Unpin,
B: Buf,
{
WriteBuf {
writer,
buf,
_pin: PhantomPinned,
}
}

impl<W, B> Future for WriteBuf<'_, W, B>
where
W: AsyncWrite + Unpin,
B: Buf,
{
type Output = io::Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = self.project();

if !me.buf.has_remaining() {
return Poll::Ready(Ok(0));
}

let n = ready!(Pin::new(me.writer).poll_write(cx, me.buf.bytes()))?;
me.buf.advance(n);
Poll::Ready(Ok(n))
}
}
21 changes: 21 additions & 0 deletions tokio/tests/io_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,24 @@ async fn read() {
assert_eq!(n, 11);
assert_eq!(buf[..], b"hello world"[..]);
}

struct BadAsyncRead;

impl AsyncRead for BadAsyncRead {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
*buf = ReadBuf::new(Box::leak(vec![0; buf.capacity()].into_boxed_slice()));
buf.advance(buf.capacity());
Poll::Ready(Ok(()))
}
}

#[tokio::test]
#[should_panic]
async fn read_buf_bad_async_read() {
let mut buf = Vec::with_capacity(10);
BadAsyncRead.read_buf(&mut buf).await.unwrap();
}
Loading