Skip to content

Commit

Permalink
Remove pin-related unsafe code
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and seanmonstar committed Nov 13, 2020
1 parent ff507e1 commit 2dec3b7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 25 deletions.
20 changes: 11 additions & 9 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures_core::Stream;
use http_body::Body as HttpBody;
use pin_project_lite::pin_project;
use tokio::time::Delay;

/// An asynchronous request body.
Expand All @@ -30,7 +31,12 @@ enum Inner {
},
}

struct WrapStream<S>(S);
pin_project! {
struct WrapStream<S> {
#[pin]
inner: S,
}
}

struct WrapHyper(hyper::Body);

Expand Down Expand Up @@ -86,9 +92,9 @@ impl Body {
{
use futures_util::TryStreamExt;

let body = Box::pin(WrapStream(
stream.map_ok(Bytes::from).map_err(Into::into),
));
let body = Box::pin(WrapStream {
inner: stream.map_ok(Bytes::from).map_err(Into::into),
});
Body {
inner: Inner::Streaming {
body,
Expand Down Expand Up @@ -279,11 +285,7 @@ where
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// safe pin projection
let item =
futures_core::ready!(
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }.poll_next(cx)?
);
let item = futures_core::ready!(self.project().inner.poll_next(cx)?);

Poll::Ready(item.map(|val| Ok(val.into())))
}
Expand Down
40 changes: 24 additions & 16 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::Delay;
use pin_project_lite::pin_project;

use log::debug;

Expand Down Expand Up @@ -1265,44 +1266,51 @@ impl ClientRef {
}
}

pub(super) struct Pending {
inner: PendingInner,
pin_project! {
pub(super) struct Pending {
#[pin]
inner: PendingInner,
}
}

enum PendingInner {
Request(PendingRequest),
Error(Option<crate::Error>),
}

struct PendingRequest {
method: Method,
url: Url,
headers: HeaderMap,
body: Option<Option<Bytes>>,
pin_project! {
struct PendingRequest {
method: Method,
url: Url,
headers: HeaderMap,
body: Option<Option<Bytes>>,

urls: Vec<Url>,
urls: Vec<Url>,

client: Arc<ClientRef>,
client: Arc<ClientRef>,

in_flight: ResponseFuture,
timeout: Option<Delay>,
#[pin]
in_flight: ResponseFuture,
#[pin]
timeout: Option<Delay>,
}
}

impl PendingRequest {
fn in_flight(self: Pin<&mut Self>) -> Pin<&mut ResponseFuture> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.in_flight) }
self.project().in_flight
}

fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Delay>> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.timeout) }
self.project().timeout
}

fn urls(self: Pin<&mut Self>) -> &mut Vec<Url> {
unsafe { &mut Pin::get_unchecked_mut(self).urls }
self.project().urls
}

fn headers(self: Pin<&mut Self>) -> &mut HeaderMap {
unsafe { &mut Pin::get_unchecked_mut(self).headers }
self.project().headers
}
}

Expand All @@ -1314,7 +1322,7 @@ impl Pending {
}

fn inner(self: Pin<&mut Self>) -> Pin<&mut PendingInner> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.inner) }
self.project().inner
}
}

Expand Down

0 comments on commit 2dec3b7

Please sign in to comment.