Skip to content

Commit

Permalink
net: Update TcpStream::poll_peek to use ReadBuf
Browse files Browse the repository at this point in the history
Closes #2987
  • Loading branch information
LucioFranco committed Dec 11, 2020
1 parent 69e62ef commit 89481f1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 10 deletions.
12 changes: 9 additions & 3 deletions tokio/src/net/tcp/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ReadHalf<'_> {
/// # Examples
///
/// ```no_run
/// use tokio::io;
/// use tokio::io::{self, ReadBuf};
/// use tokio::net::TcpStream;
///
/// use futures::future::poll_fn;
Expand All @@ -70,6 +70,7 @@ impl ReadHalf<'_> {
/// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let (mut read_half, _) = stream.split();
/// let mut buf = [0; 10];
/// let mut buf = ReadBuf::new(&mut buf);
///
/// poll_fn(|cx| {
/// read_half.poll_peek(cx, &mut buf)
Expand All @@ -80,7 +81,11 @@ impl ReadHalf<'_> {
/// ```
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
pub fn poll_peek(
&mut self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> {
self.0.poll_peek(cx, buf)
}

Expand Down Expand Up @@ -124,7 +129,8 @@ impl ReadHalf<'_> {
/// [`read`]: fn@crate::io::AsyncReadExt::read
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_peek(cx, buf)).await
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}
}

Expand Down
12 changes: 9 additions & 3 deletions tokio/src/net/tcp/split_owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl OwnedReadHalf {
/// # Examples
///
/// ```no_run
/// use tokio::io;
/// use tokio::io::{self, ReadBuf};
/// use tokio::net::TcpStream;
///
/// use futures::future::poll_fn;
Expand All @@ -125,6 +125,7 @@ impl OwnedReadHalf {
/// let stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let (mut read_half, _) = stream.into_split();
/// let mut buf = [0; 10];
/// let mut buf = ReadBuf::new(&mut buf);
///
/// poll_fn(|cx| {
/// read_half.poll_peek(cx, &mut buf)
Expand All @@ -135,7 +136,11 @@ impl OwnedReadHalf {
/// ```
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
pub fn poll_peek(
&mut self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> {
self.inner.poll_peek(cx, buf)
}

Expand Down Expand Up @@ -179,7 +184,8 @@ impl OwnedReadHalf {
/// [`read`]: fn@crate::io::AsyncReadExt::read
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_peek(cx, buf)).await
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}
}

Expand Down
21 changes: 17 additions & 4 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl TcpStream {
/// # Examples
///
/// ```no_run
/// use tokio::io;
/// use tokio::io::{self, ReadBuf};
/// use tokio::net::TcpStream;
///
/// use futures::future::poll_fn;
Expand All @@ -300,6 +300,7 @@ impl TcpStream {
/// async fn main() -> io::Result<()> {
/// let stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let mut buf = [0; 10];
/// let mut buf = ReadBuf::new(&mut buf);
///
/// poll_fn(|cx| {
/// stream.poll_peek(cx, &mut buf)
Expand All @@ -308,12 +309,24 @@ impl TcpStream {
/// Ok(())
/// }
/// ```
pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
pub fn poll_peek(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.registration().poll_read_ready(cx))?;

match self.io.peek(buf) {
Ok(ret) => return Poll::Ready(Ok(ret)),
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};

match self.io.peek(b) {
Ok(ret) => {
unsafe { buf.assume_init(ret) };
buf.advance(ret);
return Poll::Ready(Ok(ret));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.registration().clear_readiness(ev);
}
Expand Down

0 comments on commit 89481f1

Please sign in to comment.