Skip to content

Commit

Permalink
Merge pull request #19 from aepsil0n/looping
Browse files Browse the repository at this point in the history
Allow cyclic event graphs
  • Loading branch information
milibopp committed Jan 21, 2015
2 parents 57dd67c + d610183 commit 5072f91
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 122 deletions.
109 changes: 78 additions & 31 deletions src/lib.rs
Expand Up @@ -124,13 +124,15 @@

#[cfg(test)]
extern crate test;
#[macro_use]
extern crate log;

use std::sync::{Arc, RwLock, mpsc};
use std::sync::{Arc, Mutex, mpsc};
use std::thread::Thread;
use subject::{
Subject, Source, Mapper, WrapArc, Snapper, Merger, Filter, Holder, Lift2,
WeakSnapperWrapper, SamplingSubject, CellSwitcher, WeakLift2Wrapper,
ChannelBuffer,
ChannelBuffer, LoopCell, LoopCellEntry,
};
use transaction::commit;

Expand Down Expand Up @@ -190,31 +192,31 @@ mod subject;
/// the iterator. However, any event sent into the sink after a call to it, may
/// come at any point between the iterator events.
pub struct Sink<A> {
source: Arc<RwLock<Source<A>>>,
source: Arc<Mutex<Source<A>>>,
}

impl<A> Clone for Sink<A> {
fn clone(&self) -> Sink<A> {
Sink { source: self.source.clone() }
}
}
}

impl<A: Send + Sync + Clone> Sink<A> {
/// Create a new sink.
pub fn new() -> Sink<A> {
Sink { source: Arc::new(RwLock::new(Source::new())) }
Sink { source: Arc::new(Mutex::new(Source::new())) }
}

/// Send a value into the sink.
///
/// When a value is sent into the sink, an event is fired in all dependent
/// streams.
pub fn send(&self, a: A) {
commit((), move |_|
self.source.write()
commit((), move |_| {
self.source.lock()
.ok().expect("Sink::send")
.send(a)
)
})
}

/// Asynchronous send.
Expand Down Expand Up @@ -256,7 +258,7 @@ impl<A: Send + Sync + Clone> Sink<A> {
///
/// This occasionally fires an event of type `A` down its dependency graph.
pub struct Stream<A> {
source: Arc<RwLock<Box<Subject<A> + 'static>>>,
source: Arc<Mutex<Box<Subject<A> + 'static>>>,
}

impl<A> Clone for Stream<A> {
Expand All @@ -282,9 +284,9 @@ impl<A: Send + Sync + Clone> Stream<A> {
where B: Send + Sync + Clone,
F: Fn(A) -> B + Send + Sync,
{
let source = Arc::new(RwLock::new(Mapper::new(f, self.source.clone())));
let source = Arc::new(Mutex::new(Mapper::new(f, self.source.clone())));
commit((), |_| {
self.source.write().ok().expect("Stream::map")
self.source.lock().ok().expect("Stream::map")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_as_subject() }
})
Expand Down Expand Up @@ -327,14 +329,14 @@ impl<A: Send + Sync + Clone> Stream<A> {
/// assert_eq!(iter.next(), Some(4));
/// ```
pub fn merge(&self, other: &Stream<A>) -> Stream<A> {
let source = Arc::new(RwLock::new(Merger::new([
let source = Arc::new(Mutex::new(Merger::new([
self.source.clone(),
other.source.clone(),
])));
commit((), |_| {
self.source.write().ok().expect("Stream::merge (self)")
self.source.lock().ok().expect("Stream::merge (self)")
.listen(source.wrap_as_listener());
other.source.write().ok().expect("Stream::merge (other)")
other.source.lock().ok().expect("Stream::merge (other)")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_as_subject() }
})
Expand All @@ -354,18 +356,29 @@ impl<A: Send + Sync + Clone> Stream<A> {
/// assert_eq!(cell.sample(), 2);
/// ```
pub fn hold(&self, initial: A) -> Cell<A> {
let source = Arc::new(RwLock::new(
let source = Arc::new(Mutex::new(
Holder::new(initial, self.source.clone())
));
commit((), |_| {
self.source.write().ok().expect("Stream::hold")
self.source.lock().ok().expect("Stream::hold")
.listen(source.wrap_as_listener());
Cell { source: source.wrap_as_sampling_subject() }
})
}

/// A blocking iterator over the stream.
pub fn iter(&self) -> StreamIter<A> { StreamIter::new(self) }

/// Accumulate events in a cell.
///
/// Starting at some initial value, each new event changes the internal
/// state of the resulting cell as prescribed by the supplied accumulator.
pub fn accumulate<B, F>(&self, initial: B, acc: F) -> Cell<B>
where B: Send + Sync + Clone,
F: Fn((B, A)) -> B + Send + Sync,
{
Cell::cyclic(initial, move |state| state.snapshot(self).map(acc))
}
}

impl<A: Send + Sync + Clone> Stream<Option<A>> {
Expand All @@ -385,9 +398,9 @@ impl<A: Send + Sync + Clone> Stream<Option<A>> {
/// assert_eq!(iter.next(), Some(5));
/// ```
pub fn filter(&self) -> Stream<A> {
let source = Arc::new(RwLock::new(Filter::new(self.source.clone())));
let source = Arc::new(Mutex::new(Filter::new(self.source.clone())));
commit((), |_| {
self.source.write().ok().expect("Stream::filter")
self.source.lock().ok().expect("Stream::filter")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_as_subject() }
})
Expand All @@ -397,7 +410,7 @@ impl<A: Send + Sync + Clone> Stream<Option<A>> {

/// A container of a value that changes over time.
pub struct Cell<A> {
source: Arc<RwLock<Box<SamplingSubject<A> + 'static>>>
source: Arc<Mutex<Box<SamplingSubject<A> + 'static>>>
}

impl<A> Clone for Cell<A> {
Expand All @@ -416,7 +429,7 @@ impl<A: Send + Sync + Clone> Cell<A> {

/// Sample without committing a transaction.
fn sample_nocommit(&self) -> A {
self.source.write().ok().expect("Cell::sample").sample()
self.source.lock().ok().expect("Cell::sample").sample()
}

/// Combine the cell with a stream in a snapshot.
Expand All @@ -441,16 +454,50 @@ impl<A: Send + Sync + Clone> Cell<A> {
/// ```
pub fn snapshot<B: Send + Sync + Clone>(&self, event: &Stream<B>) -> Stream<(A, B)> {
commit((), |_| {
let source = Arc::new(RwLock::new(Snapper::new(
let source = Arc::new(Mutex::new(Snapper::new(
self.sample_nocommit(), (self.source.clone(), event.source.clone())
)));
self.source.write().ok().expect("Cell::snapshot (self)")
self.source.lock().ok().expect("Cell::snapshot (self)")
.listen(WeakSnapperWrapper::boxed(&source));
event.source.write().ok().expect("Cell::snapshot (event)")
event.source.lock().ok().expect("Cell::snapshot (event)")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_into_subject() }
})
}

/// Create a cell with a cyclic reference.
///
/// Given a function `cycle` that maps a `cell` to a stream `cycle(cell)`
/// and some `initial` value, this constructor essentially creates a cyclic
/// pattern, where the resulting cell behaves as if it were defined as
/// `cell = cycle(cell).hold(initial)`.
///
/// This pattern is useful to implement accumulators, counters and other
/// loops that depend on the state of a cell before a transaction.
#[experimental="may change once internals are cleaned up"]
pub fn cyclic<F: FnOnce(&Cell<A>) -> Stream<A>>(initial: A, cycle: F) -> Cell<A> {
let dummy_source = Arc::new(Mutex::new(LoopCellEntry::new(initial.clone())));
let result = cycle(&Cell {
source: dummy_source.wrap_as_sampling_subject()
})
.hold(initial);
commit((), move |_| {
result.source.lock().ok()
.expect("Cell::cyclic (result listen #1)")
.listen(dummy_source.wrap_as_listener());
let source = Arc::new(Mutex::new(LoopCell::new(
result.sample_nocommit(),
(
dummy_source.wrap_as_sampling_subject(),
result.source.clone(),
)
)));
result.source.lock().ok()
.expect("Cell::cyclic (result listen #2)")
.listen(source.wrap_as_listener());
Cell { source: source.wrap_into_sampling_subject() }
})
}
}

impl<A: Send + Sync + Clone> Cell<Cell<A>> {
Expand Down Expand Up @@ -525,8 +572,8 @@ impl<A: Send + Sync + Clone> Cell<Cell<A>> {
pub fn switch(&self) -> Cell<A> {
commit((), |_| {
// Create the cell switcher
let mut self_source = self.source.write().ok().expect("Cell::switch");
let source = Arc::new(RwLock::new(
let mut self_source = self.source.lock().ok().expect("Cell::switch");
let source = Arc::new(Mutex::new(
CellSwitcher::new(
self_source.sample(),
self.source.clone(),
Expand Down Expand Up @@ -575,12 +622,12 @@ pub fn lift2<A, B, C, F>(f: F, ba: &Cell<A>, bb: &Cell<B>) -> Cell<C>
F: Fn(A, B) -> C + Send + Sync,
{
commit((f, ba, bb), |(f, ba, bb)| {
let source = Arc::new(RwLock::new(Lift2::new(
let source = Arc::new(Mutex::new(Lift2::new(
(ba.sample_nocommit(), bb.sample_nocommit()), f, (ba.source.clone(), bb.source.clone())
)));
ba.source.write().ok().expect("lift2 (ba)")
ba.source.lock().ok().expect("lift2 (ba)")
.listen(source.wrap_as_listener());
bb.source.write().ok().expect("lift2 (bb)")
bb.source.lock().ok().expect("lift2 (bb)")
.listen(WeakLift2Wrapper::boxed(&source));
Cell { source: source.wrap_into_sampling_subject() }
})
Expand All @@ -591,16 +638,16 @@ pub fn lift2<A, B, C, F>(f: F, ba: &Cell<A>, bb: &Cell<B>) -> Cell<C>
pub struct StreamIter<A> {
receiver: mpsc::Receiver<A>,
#[allow(dead_code)]
buffer: Arc<RwLock<ChannelBuffer<A>>>,
buffer: Arc<Mutex<ChannelBuffer<A>>>,
}

impl<A: Send + Sync + Clone> StreamIter<A> {
fn new(event: &Stream<A>) -> StreamIter<A> {
let (tx, rx) = mpsc::channel();
let chanbuf = Arc::new(RwLock::new(ChannelBuffer::new(
let chanbuf = Arc::new(Mutex::new(ChannelBuffer::new(
tx, event.source.clone()
)));
event.source.write().ok().expect("StreamIter::new")
event.source.lock().ok().expect("StreamIter::new")
.listen(chanbuf.wrap_as_listener());
StreamIter { receiver: rx, buffer: chanbuf }
}
Expand Down

0 comments on commit 5072f91

Please sign in to comment.