Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow cyclic event graphs #19

Merged
merged 1 commit into from Jan 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 78 additions & 31 deletions src/lib.rs
Expand Up @@ -124,13 +124,15 @@

#[cfg(test)]
extern crate test;
#[macro_use]
extern crate log;

use std::sync::{Arc, RwLock, mpsc};
use std::sync::{Arc, Mutex, mpsc};
use std::thread::Thread;
use subject::{
Subject, Source, Mapper, WrapArc, Snapper, Merger, Filter, Holder, Lift2,
WeakSnapperWrapper, SamplingSubject, CellSwitcher, WeakLift2Wrapper,
ChannelBuffer,
ChannelBuffer, LoopCell, LoopCellEntry,
};
use transaction::commit;

Expand Down Expand Up @@ -190,31 +192,31 @@ mod subject;
/// the iterator. However, any event sent into the sink after a call to it, may
/// come at any point between the iterator events.
pub struct Sink<A> {
source: Arc<RwLock<Source<A>>>,
source: Arc<Mutex<Source<A>>>,
}

impl<A> Clone for Sink<A> {
fn clone(&self) -> Sink<A> {
Sink { source: self.source.clone() }
}
}
}

impl<A: Send + Sync + Clone> Sink<A> {
/// Create a new sink.
pub fn new() -> Sink<A> {
Sink { source: Arc::new(RwLock::new(Source::new())) }
Sink { source: Arc::new(Mutex::new(Source::new())) }
}

/// Send a value into the sink.
///
/// When a value is sent into the sink, an event is fired in all dependent
/// streams.
pub fn send(&self, a: A) {
commit((), move |_|
self.source.write()
commit((), move |_| {
self.source.lock()
.ok().expect("Sink::send")
.send(a)
)
})
}

/// Asynchronous send.
Expand Down Expand Up @@ -256,7 +258,7 @@ impl<A: Send + Sync + Clone> Sink<A> {
///
/// This occasionally fires an event of type `A` down its dependency graph.
pub struct Stream<A> {
source: Arc<RwLock<Box<Subject<A> + 'static>>>,
source: Arc<Mutex<Box<Subject<A> + 'static>>>,
}

impl<A> Clone for Stream<A> {
Expand All @@ -282,9 +284,9 @@ impl<A: Send + Sync + Clone> Stream<A> {
where B: Send + Sync + Clone,
F: Fn(A) -> B + Send + Sync,
{
let source = Arc::new(RwLock::new(Mapper::new(f, self.source.clone())));
let source = Arc::new(Mutex::new(Mapper::new(f, self.source.clone())));
commit((), |_| {
self.source.write().ok().expect("Stream::map")
self.source.lock().ok().expect("Stream::map")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_as_subject() }
})
Expand Down Expand Up @@ -327,14 +329,14 @@ impl<A: Send + Sync + Clone> Stream<A> {
/// assert_eq!(iter.next(), Some(4));
/// ```
pub fn merge(&self, other: &Stream<A>) -> Stream<A> {
let source = Arc::new(RwLock::new(Merger::new([
let source = Arc::new(Mutex::new(Merger::new([
self.source.clone(),
other.source.clone(),
])));
commit((), |_| {
self.source.write().ok().expect("Stream::merge (self)")
self.source.lock().ok().expect("Stream::merge (self)")
.listen(source.wrap_as_listener());
other.source.write().ok().expect("Stream::merge (other)")
other.source.lock().ok().expect("Stream::merge (other)")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_as_subject() }
})
Expand All @@ -354,18 +356,29 @@ impl<A: Send + Sync + Clone> Stream<A> {
/// assert_eq!(cell.sample(), 2);
/// ```
pub fn hold(&self, initial: A) -> Cell<A> {
let source = Arc::new(RwLock::new(
let source = Arc::new(Mutex::new(
Holder::new(initial, self.source.clone())
));
commit((), |_| {
self.source.write().ok().expect("Stream::hold")
self.source.lock().ok().expect("Stream::hold")
.listen(source.wrap_as_listener());
Cell { source: source.wrap_as_sampling_subject() }
})
}

/// A blocking iterator over the stream.
pub fn iter(&self) -> StreamIter<A> { StreamIter::new(self) }

/// Accumulate events in a cell.
///
/// Starting at some initial value, each new event changes the internal
/// state of the resulting cell as prescribed by the supplied accumulator.
pub fn accumulate<B, F>(&self, initial: B, acc: F) -> Cell<B>
where B: Send + Sync + Clone,
F: Fn((B, A)) -> B + Send + Sync,
{
Cell::cyclic(initial, move |state| state.snapshot(self).map(acc))
}
}

impl<A: Send + Sync + Clone> Stream<Option<A>> {
Expand All @@ -385,9 +398,9 @@ impl<A: Send + Sync + Clone> Stream<Option<A>> {
/// assert_eq!(iter.next(), Some(5));
/// ```
pub fn filter(&self) -> Stream<A> {
let source = Arc::new(RwLock::new(Filter::new(self.source.clone())));
let source = Arc::new(Mutex::new(Filter::new(self.source.clone())));
commit((), |_| {
self.source.write().ok().expect("Stream::filter")
self.source.lock().ok().expect("Stream::filter")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_as_subject() }
})
Expand All @@ -397,7 +410,7 @@ impl<A: Send + Sync + Clone> Stream<Option<A>> {

/// A container of a value that changes over time.
pub struct Cell<A> {
source: Arc<RwLock<Box<SamplingSubject<A> + 'static>>>
source: Arc<Mutex<Box<SamplingSubject<A> + 'static>>>
}

impl<A> Clone for Cell<A> {
Expand All @@ -416,7 +429,7 @@ impl<A: Send + Sync + Clone> Cell<A> {

/// Sample without committing a transaction.
fn sample_nocommit(&self) -> A {
self.source.write().ok().expect("Cell::sample").sample()
self.source.lock().ok().expect("Cell::sample").sample()
}

/// Combine the cell with a stream in a snapshot.
Expand All @@ -441,16 +454,50 @@ impl<A: Send + Sync + Clone> Cell<A> {
/// ```
pub fn snapshot<B: Send + Sync + Clone>(&self, event: &Stream<B>) -> Stream<(A, B)> {
commit((), |_| {
let source = Arc::new(RwLock::new(Snapper::new(
let source = Arc::new(Mutex::new(Snapper::new(
self.sample_nocommit(), (self.source.clone(), event.source.clone())
)));
self.source.write().ok().expect("Cell::snapshot (self)")
self.source.lock().ok().expect("Cell::snapshot (self)")
.listen(WeakSnapperWrapper::boxed(&source));
event.source.write().ok().expect("Cell::snapshot (event)")
event.source.lock().ok().expect("Cell::snapshot (event)")
.listen(source.wrap_as_listener());
Stream { source: source.wrap_into_subject() }
})
}

/// Create a cell with a cyclic reference.
///
/// Given a function `cycle` that maps a `cell` to a stream `cycle(cell)`
/// and some `initial` value, this constructor essentially creates a cyclic
/// pattern, where the resulting cell behaves as if it were defined as
/// `cell = cycle(cell).hold(initial)`.
///
/// This pattern is useful to implement accumulators, counters and other
/// loops that depend on the state of a cell before a transaction.
#[experimental="may change once internals are cleaned up"]
pub fn cyclic<F: FnOnce(&Cell<A>) -> Stream<A>>(initial: A, cycle: F) -> Cell<A> {
let dummy_source = Arc::new(Mutex::new(LoopCellEntry::new(initial.clone())));
let result = cycle(&Cell {
source: dummy_source.wrap_as_sampling_subject()
})
.hold(initial);
commit((), move |_| {
result.source.lock().ok()
.expect("Cell::cyclic (result listen #1)")
.listen(dummy_source.wrap_as_listener());
let source = Arc::new(Mutex::new(LoopCell::new(
result.sample_nocommit(),
(
dummy_source.wrap_as_sampling_subject(),
result.source.clone(),
)
)));
result.source.lock().ok()
.expect("Cell::cyclic (result listen #2)")
.listen(source.wrap_as_listener());
Cell { source: source.wrap_into_sampling_subject() }
})
}
}

impl<A: Send + Sync + Clone> Cell<Cell<A>> {
Expand Down Expand Up @@ -525,8 +572,8 @@ impl<A: Send + Sync + Clone> Cell<Cell<A>> {
pub fn switch(&self) -> Cell<A> {
commit((), |_| {
// Create the cell switcher
let mut self_source = self.source.write().ok().expect("Cell::switch");
let source = Arc::new(RwLock::new(
let mut self_source = self.source.lock().ok().expect("Cell::switch");
let source = Arc::new(Mutex::new(
CellSwitcher::new(
self_source.sample(),
self.source.clone(),
Expand Down Expand Up @@ -575,12 +622,12 @@ pub fn lift2<A, B, C, F>(f: F, ba: &Cell<A>, bb: &Cell<B>) -> Cell<C>
F: Fn(A, B) -> C + Send + Sync,
{
commit((f, ba, bb), |(f, ba, bb)| {
let source = Arc::new(RwLock::new(Lift2::new(
let source = Arc::new(Mutex::new(Lift2::new(
(ba.sample_nocommit(), bb.sample_nocommit()), f, (ba.source.clone(), bb.source.clone())
)));
ba.source.write().ok().expect("lift2 (ba)")
ba.source.lock().ok().expect("lift2 (ba)")
.listen(source.wrap_as_listener());
bb.source.write().ok().expect("lift2 (bb)")
bb.source.lock().ok().expect("lift2 (bb)")
.listen(WeakLift2Wrapper::boxed(&source));
Cell { source: source.wrap_into_sampling_subject() }
})
Expand All @@ -591,16 +638,16 @@ pub fn lift2<A, B, C, F>(f: F, ba: &Cell<A>, bb: &Cell<B>) -> Cell<C>
pub struct StreamIter<A> {
receiver: mpsc::Receiver<A>,
#[allow(dead_code)]
buffer: Arc<RwLock<ChannelBuffer<A>>>,
buffer: Arc<Mutex<ChannelBuffer<A>>>,
}

impl<A: Send + Sync + Clone> StreamIter<A> {
fn new(event: &Stream<A>) -> StreamIter<A> {
let (tx, rx) = mpsc::channel();
let chanbuf = Arc::new(RwLock::new(ChannelBuffer::new(
let chanbuf = Arc::new(Mutex::new(ChannelBuffer::new(
tx, event.source.clone()
)));
event.source.write().ok().expect("StreamIter::new")
event.source.lock().ok().expect("StreamIter::new")
.listen(chanbuf.wrap_as_listener());
StreamIter { receiver: rx, buffer: chanbuf }
}
Expand Down