Skip to content

Commit

Permalink
Bring back streams temporarily
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed May 3, 2016
1 parent afd9ece commit bdf163f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 83 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use select::Select;
pub use then::Then;

// streams
// pub mod stream;
pub mod stream;

// TODO: Send + 'static is annoying, but required by cancel and_then, document
// TODO: not object safe
Expand Down
2 changes: 1 addition & 1 deletion src/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<T: Send + 'static, E: Send + 'static> Future for Promise<T, E> {
return f(Err(util::reused()))
}
};
let token = inner.slot.on_full(move |slot| {
let token = inner.slot.on_full(|slot| {
match slot.try_consume() {
Ok(Some(Ok(e))) => f(Ok(e)),
Ok(Some(Err(e))) => f(Err(PollError::Other(e))),
Expand Down
5 changes: 3 additions & 2 deletions src/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ impl<T: 'static> Slot<T> {
state.0 = old;
}

// TODO: communicate that this is being called as part of a
// cancellation?
if state.flag(ON_FULL) {
let cb = self.on_full.borrow().expect("on_full interference3")
.take().expect("on_full not full??");
Expand All @@ -244,7 +246,7 @@ impl<T> TryProduceError<T> {
}
}

pub trait FnBox<T: 'static>: Send + 'static {
trait FnBox<T: 'static>: Send + 'static {
fn call_box(self: Box<Self>, other: &Slot<T>);
}

Expand Down Expand Up @@ -425,7 +427,6 @@ mod tests {

// cancel on_full
assert_eq!(hits.load(Ordering::SeqCst), 0);
let hits2 = hits.clone();
let token = slot.on_full(add());
assert_eq!(hits.load(Ordering::SeqCst), 0);
slot.cancel(token);
Expand Down
148 changes: 108 additions & 40 deletions src/stream/channel.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use Future;
use slot::Slot;
use stream::{Stream, PollError};
use {Future, PollResult, PollError, Callback};
use slot::{Slot, Token};
use stream::{Stream, StreamResult};
use util;

pub fn channel<T, E>() -> (Sender<T, E>, Receiver<T, E>)
where T: Send + 'static,
Expand All @@ -27,8 +29,17 @@ pub struct FutureSender<T, E>
where T: Send + 'static,
E: Send + 'static,
{
tx: Sender<T, E>,
msg: Result<T, E>,
state: _FutureSender<T, E>,
}

enum _FutureSender<T, E>
where T: Send + 'static,
E: Send + 'static,
{
Start(Sender<T, E>, Result<T, E>),
Canceled,
Used,
Scheduled(Arc<Inner<T, E>>, Token),
}

pub struct Receiver<T, E>
Expand All @@ -39,10 +50,16 @@ pub struct Receiver<T, E>
}

struct Inner<T, E> {
slot: Slot<Option<Result<T, E>>>,
slot: Slot<Message<Result<T, E>>>,
receiver_gone: AtomicBool,
}

enum Message<T> {
Data(T),
Done,
// Canceled,
}

pub struct SendError<T, E>(Result<T, E>);

impl<T, E> Stream for Receiver<T, E>
Expand All @@ -52,23 +69,36 @@ impl<T, E> Stream for Receiver<T, E>
type Item = T;
type Error = E;

fn poll(&mut self) -> Result<Self::Item, PollError<Self::Error>> {
fn poll(&mut self) -> Option<StreamResult<Self::Item, Self::Error>> {
match self.inner.slot.try_consume() {
Ok(Some(Ok(item))) => Ok(item),
Ok(Some(Err(item))) => Err(PollError::Other(item)),
Ok(None) => Err(PollError::Empty),
Err(..) => Err(PollError::NotReady),
Ok(Message::Data(Ok(e))) => Some(Ok(Some(e))),
Ok(Message::Data(Err(e))) => Some(Err(PollError::Other(e))),
Ok(Message::Done) => Some(Ok(None)),
// Ok(Message::Canceled) => Some(Err(PollError::Canceled)),
Err(..) => None,
}
}

fn schedule<G>(self, g: G)
where G: FnOnce(Option<Result<Self::Item, Self::Error>>, Self) +
Send + 'static
fn cancel(&mut self) {
}

fn schedule<G>(&mut self, g: G)
where G: FnOnce(StreamResult<Self::Item, Self::Error>) + Send + 'static
{
self.inner.clone().slot.on_full(move |slot| {
let val = slot.try_consume().ok().unwrap();
g(val, self);
})
self.inner.slot.on_full(|slot| {
g(match slot.try_consume() {
Ok(Message::Data(Ok(e))) => Ok(Some(e)),
Ok(Message::Data(Err(e))) => Err(PollError::Other(e)),
Ok(Message::Done) => Ok(None),
// Ok(Message::Canceled) => Err(PollError::Canceled),
Err(..) => Err(PollError::Canceled),
})
});
}

fn schedule_boxed(&mut self,
g: Box<Callback<Option<Self::Item>, Self::Error>>) {
self.schedule(|f| g.call(f))
}
}

Expand All @@ -90,8 +120,7 @@ impl<T, E> Sender<T, E>
{
pub fn send(self, t: Result<T, E>) -> FutureSender<T, E> {
FutureSender {
tx: self,
msg: t,
state: _FutureSender::Start(self, t),
}
}
}
Expand All @@ -102,7 +131,7 @@ impl<T, E> Drop for Sender<T, E>
{
fn drop(&mut self) {
self.inner.slot.on_empty(|slot| {
slot.try_produce(None).ok().unwrap();
slot.try_produce(Message::Done).ok().unwrap();
});
}
}
Expand All @@ -114,33 +143,72 @@ impl<T, E> Future for FutureSender<T, E>
type Item = Sender<T, E>;
type Error = SendError<T, E>;

fn poll(self) -> Result<Result<Self::Item, Self::Error>, Self> {
let FutureSender { tx, msg } = self;
if tx.inner.receiver_gone.load(Ordering::SeqCst) {
return Ok(Err(SendError(msg)))
fn poll(&mut self) -> Option<PollResult<Self::Item, Self::Error>> {
match mem::replace(&mut self.state, _FutureSender::Used) {
_FutureSender::Start(tx, msg) => {
if tx.inner.receiver_gone.load(Ordering::SeqCst) {
return Some(Err(PollError::Other(SendError(msg))))
}
match tx.inner.slot.try_produce(Message::Data(msg)) {
Ok(()) => Some(Ok(tx)),
Err(e) => {
let msg = match e.into_inner() {
Message::Data(d) => d,
_ => panic!(),
};
self.state = _FutureSender::Start(tx, msg);
None
}
}
}
_FutureSender::Canceled => Some(Err(PollError::Canceled)),
_FutureSender::Used => Some(Err(util::reused())),
_FutureSender::Scheduled(s, token) => {
self.state = _FutureSender::Scheduled(s, token);
Some(Err(util::reused()))
}
}
match tx.inner.slot.try_produce(Some(msg)) {
Ok(()) => return Ok(Ok(tx)),
Err(e) => {
Err(FutureSender {
tx: tx,
msg: e.into_inner().unwrap(),
})
}

fn cancel(&mut self) {
match mem::replace(&mut self.state, _FutureSender::Canceled) {
_FutureSender::Start(..) => {}
_FutureSender::Canceled => {}
_FutureSender::Scheduled(s, token) => {
s.slot.cancel(token);
}
_FutureSender::Used => self.state = _FutureSender::Used,
}
}

fn schedule<F>(self, f: F)
where F: FnOnce(Result<Self::Item, Self::Error>) + Send + 'static,
fn schedule<F>(&mut self, f: F)
where F: FnOnce(PollResult<Self::Item, Self::Error>) + Send + 'static,
{
let FutureSender { tx, msg } = self;
tx.inner.clone().slot.on_empty(move |slot| {
let (tx, msg) = match mem::replace(&mut self.state, _FutureSender::Used) {
_FutureSender::Start(tx, msg) => (tx, msg),
_FutureSender::Canceled => return f(Err(PollError::Canceled)),
_FutureSender::Used => return f(Err(util::reused())),
_FutureSender::Scheduled(s, token) => {
self.state = _FutureSender::Scheduled(s, token);
return f(Err(util::reused()))
}
};
let arc = tx.inner.clone();
let token = arc.slot.on_empty(|slot| {
if tx.inner.receiver_gone.load(Ordering::SeqCst) {
f(Err(SendError(msg)))
} else {
slot.try_produce(Some(msg)).ok().unwrap();
f(Ok(tx))
return f(Err(PollError::Other(SendError(msg))))
}
match slot.try_produce(Message::Data(msg)) {
Ok(()) => f(Ok(tx)),

// we were canceled so finish immediately
Err(..) => f(Err(PollError::Canceled)),
}
});
self.state = _FutureSender::Scheduled(arc, token);
}

fn schedule_boxed(&mut self, f: Box<Callback<Self::Item, Self::Error>>) {
self.schedule(|r| f.call(r))
}
}
45 changes: 6 additions & 39 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use {Future, IntoFuture, FutureResult, Callback};
use {Future, PollResult, Callback};

pub type StreamResult<T, E> = FutureResult<Option<T>, E>;
mod channel;
pub use self::channel::{channel, Sender, Receiver};

pub type StreamResult<T, E> = PollResult<Option<T>, E>;

// TODO: Send + 'static is unfortunate
pub trait Stream/*: Send + 'static*/ {
Expand All @@ -12,8 +15,7 @@ pub trait Stream/*: Send + 'static*/ {
fn cancel(&mut self);

fn schedule<G>(&mut self, g: G)
where G: FnOnce(StreamResult<Self::Item, Self::Error>, Self) +
Send + 'static,
where G: FnOnce(StreamResult<Self::Item, Self::Error>) + Send + 'static,
Self: Sized;

fn schedule_boxed(&mut self,
Expand Down Expand Up @@ -146,41 +148,6 @@ impl<S: ?Sized + Stream> Stream for Box<S> {
}
}

// impl<'a, F: ?Sized + Future> Future for &'a mut F {
// type Item = F::Item;
// type Error = F::Error;
//
// fn poll(&mut self) -> Option<FutureResult<Self::Item, Self::Error>> {
// (**self).poll()
// }
//
// // fn cancel(&mut self) {
// // (**self).cancel()
// // }
//
// fn schedule<G>(&mut self, g: G)
// where G: FnOnce(FutureResult<Self::Item, Self::Error>) + Send + 'static,
// {
// (**self).schedule_boxed(Box::new(g))
// }
//
// fn schedule_boxed(&mut self, f: Box<Callback<Self::Item, Self::Error>>) {
// (**self).schedule_boxed(f)
// }
// }

// impl<E> PollError<E> {
// fn map<U, F>(self, f: F) -> PollError<U>
// where F: FnOnce(E) -> U
// {
// match self {
// PollError::Empty => PollError::Empty,
// PollError::NotReady => PollError::NotReady,
// PollError::Other(e) => PollError::Other(f(e)),
// }
// }
// }

// pub struct FutureStream<F> {
// inner: Option<F>,
// }
Expand Down

0 comments on commit bdf163f

Please sign in to comment.