diff --git a/src/lib.rs b/src/lib.rs index 2a72875..d352126 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,13 +124,15 @@ #[cfg(test)] extern crate test; +#[macro_use] +extern crate log; -use std::sync::{Arc, RwLock, mpsc}; +use std::sync::{Arc, Mutex, mpsc}; use std::thread::Thread; use subject::{ Subject, Source, Mapper, WrapArc, Snapper, Merger, Filter, Holder, Lift2, WeakSnapperWrapper, SamplingSubject, CellSwitcher, WeakLift2Wrapper, - ChannelBuffer, + ChannelBuffer, LoopCell, LoopCellEntry, }; use transaction::commit; @@ -190,19 +192,19 @@ mod subject; /// the iterator. However, any event sent into the sink after a call to it, may /// come at any point between the iterator events. pub struct Sink { - source: Arc>>, + source: Arc>>, } impl Clone for Sink { fn clone(&self) -> Sink { Sink { source: self.source.clone() } - } + } } impl Sink { /// Create a new sink. pub fn new() -> Sink { - Sink { source: Arc::new(RwLock::new(Source::new())) } + Sink { source: Arc::new(Mutex::new(Source::new())) } } /// Send a value into the sink. @@ -210,11 +212,11 @@ impl Sink { /// When a value is sent into the sink, an event is fired in all dependent /// streams. pub fn send(&self, a: A) { - commit((), move |_| - self.source.write() + commit((), move |_| { + self.source.lock() .ok().expect("Sink::send") .send(a) - ) + }) } /// Asynchronous send. @@ -256,7 +258,7 @@ impl Sink { /// /// This occasionally fires an event of type `A` down its dependency graph. pub struct Stream { - source: Arc + 'static>>>, + source: Arc + 'static>>>, } impl Clone for Stream { @@ -282,9 +284,9 @@ impl Stream { where B: Send + Sync + Clone, F: Fn(A) -> B + Send + Sync, { - let source = Arc::new(RwLock::new(Mapper::new(f, self.source.clone()))); + let source = Arc::new(Mutex::new(Mapper::new(f, self.source.clone()))); commit((), |_| { - self.source.write().ok().expect("Stream::map") + self.source.lock().ok().expect("Stream::map") .listen(source.wrap_as_listener()); Stream { source: source.wrap_as_subject() } }) @@ -327,14 +329,14 @@ impl Stream { /// assert_eq!(iter.next(), Some(4)); /// ``` pub fn merge(&self, other: &Stream) -> Stream { - let source = Arc::new(RwLock::new(Merger::new([ + let source = Arc::new(Mutex::new(Merger::new([ self.source.clone(), other.source.clone(), ]))); commit((), |_| { - self.source.write().ok().expect("Stream::merge (self)") + self.source.lock().ok().expect("Stream::merge (self)") .listen(source.wrap_as_listener()); - other.source.write().ok().expect("Stream::merge (other)") + other.source.lock().ok().expect("Stream::merge (other)") .listen(source.wrap_as_listener()); Stream { source: source.wrap_as_subject() } }) @@ -354,11 +356,11 @@ impl Stream { /// assert_eq!(cell.sample(), 2); /// ``` pub fn hold(&self, initial: A) -> Cell { - let source = Arc::new(RwLock::new( + let source = Arc::new(Mutex::new( Holder::new(initial, self.source.clone()) )); commit((), |_| { - self.source.write().ok().expect("Stream::hold") + self.source.lock().ok().expect("Stream::hold") .listen(source.wrap_as_listener()); Cell { source: source.wrap_as_sampling_subject() } }) @@ -366,6 +368,17 @@ impl Stream { /// A blocking iterator over the stream. pub fn iter(&self) -> StreamIter { StreamIter::new(self) } + + /// Accumulate events in a cell. + /// + /// Starting at some initial value, each new event changes the internal + /// state of the resulting cell as prescribed by the supplied accumulator. + pub fn accumulate(&self, initial: B, acc: F) -> Cell + where B: Send + Sync + Clone, + F: Fn((B, A)) -> B + Send + Sync, + { + Cell::cyclic(initial, move |state| state.snapshot(self).map(acc)) + } } impl Stream> { @@ -385,9 +398,9 @@ impl Stream> { /// assert_eq!(iter.next(), Some(5)); /// ``` pub fn filter(&self) -> Stream { - let source = Arc::new(RwLock::new(Filter::new(self.source.clone()))); + let source = Arc::new(Mutex::new(Filter::new(self.source.clone()))); commit((), |_| { - self.source.write().ok().expect("Stream::filter") + self.source.lock().ok().expect("Stream::filter") .listen(source.wrap_as_listener()); Stream { source: source.wrap_as_subject() } }) @@ -397,7 +410,7 @@ impl Stream> { /// A container of a value that changes over time. pub struct Cell { - source: Arc + 'static>>> + source: Arc + 'static>>> } impl Clone for Cell { @@ -416,7 +429,7 @@ impl Cell { /// Sample without committing a transaction. fn sample_nocommit(&self) -> A { - self.source.write().ok().expect("Cell::sample").sample() + self.source.lock().ok().expect("Cell::sample").sample() } /// Combine the cell with a stream in a snapshot. @@ -441,16 +454,50 @@ impl Cell { /// ``` pub fn snapshot(&self, event: &Stream) -> Stream<(A, B)> { commit((), |_| { - let source = Arc::new(RwLock::new(Snapper::new( + let source = Arc::new(Mutex::new(Snapper::new( self.sample_nocommit(), (self.source.clone(), event.source.clone()) ))); - self.source.write().ok().expect("Cell::snapshot (self)") + self.source.lock().ok().expect("Cell::snapshot (self)") .listen(WeakSnapperWrapper::boxed(&source)); - event.source.write().ok().expect("Cell::snapshot (event)") + event.source.lock().ok().expect("Cell::snapshot (event)") .listen(source.wrap_as_listener()); Stream { source: source.wrap_into_subject() } }) } + + /// Create a cell with a cyclic reference. + /// + /// Given a function `cycle` that maps a `cell` to a stream `cycle(cell)` + /// and some `initial` value, this constructor essentially creates a cyclic + /// pattern, where the resulting cell behaves as if it were defined as + /// `cell = cycle(cell).hold(initial)`. + /// + /// This pattern is useful to implement accumulators, counters and other + /// loops that depend on the state of a cell before a transaction. + #[experimental="may change once internals are cleaned up"] + pub fn cyclic) -> Stream>(initial: A, cycle: F) -> Cell { + let dummy_source = Arc::new(Mutex::new(LoopCellEntry::new(initial.clone()))); + let result = cycle(&Cell { + source: dummy_source.wrap_as_sampling_subject() + }) + .hold(initial); + commit((), move |_| { + result.source.lock().ok() + .expect("Cell::cyclic (result listen #1)") + .listen(dummy_source.wrap_as_listener()); + let source = Arc::new(Mutex::new(LoopCell::new( + result.sample_nocommit(), + ( + dummy_source.wrap_as_sampling_subject(), + result.source.clone(), + ) + ))); + result.source.lock().ok() + .expect("Cell::cyclic (result listen #2)") + .listen(source.wrap_as_listener()); + Cell { source: source.wrap_into_sampling_subject() } + }) + } } impl Cell> { @@ -525,8 +572,8 @@ impl Cell> { pub fn switch(&self) -> Cell { commit((), |_| { // Create the cell switcher - let mut self_source = self.source.write().ok().expect("Cell::switch"); - let source = Arc::new(RwLock::new( + let mut self_source = self.source.lock().ok().expect("Cell::switch"); + let source = Arc::new(Mutex::new( CellSwitcher::new( self_source.sample(), self.source.clone(), @@ -575,12 +622,12 @@ pub fn lift2(f: F, ba: &Cell, bb: &Cell) -> Cell F: Fn(A, B) -> C + Send + Sync, { commit((f, ba, bb), |(f, ba, bb)| { - let source = Arc::new(RwLock::new(Lift2::new( + let source = Arc::new(Mutex::new(Lift2::new( (ba.sample_nocommit(), bb.sample_nocommit()), f, (ba.source.clone(), bb.source.clone()) ))); - ba.source.write().ok().expect("lift2 (ba)") + ba.source.lock().ok().expect("lift2 (ba)") .listen(source.wrap_as_listener()); - bb.source.write().ok().expect("lift2 (bb)") + bb.source.lock().ok().expect("lift2 (bb)") .listen(WeakLift2Wrapper::boxed(&source)); Cell { source: source.wrap_into_sampling_subject() } }) @@ -591,16 +638,16 @@ pub fn lift2(f: F, ba: &Cell, bb: &Cell) -> Cell pub struct StreamIter { receiver: mpsc::Receiver, #[allow(dead_code)] - buffer: Arc>>, + buffer: Arc>>, } impl StreamIter { fn new(event: &Stream) -> StreamIter { let (tx, rx) = mpsc::channel(); - let chanbuf = Arc::new(RwLock::new(ChannelBuffer::new( + let chanbuf = Arc::new(Mutex::new(ChannelBuffer::new( tx, event.source.clone() ))); - event.source.write().ok().expect("StreamIter::new") + event.source.lock().ok().expect("StreamIter::new") .listen(chanbuf.wrap_as_listener()); StreamIter { receiver: rx, buffer: chanbuf } } diff --git a/src/subject.rs b/src/subject.rs index aa37b5a..2b2a16f 100644 --- a/src/subject.rs +++ b/src/subject.rs @@ -2,13 +2,13 @@ #![allow(missing_docs)] -use std::sync::{Arc, RwLock, Weak, Mutex}; +use std::sync::{Arc, Mutex, Weak}; use std::sync::mpsc::Sender; use Cell; use transaction::register_callback; -#[derive(Show)] +#[derive(Copy, Show)] pub enum ListenerError { Disappeared, Poisoned, @@ -21,11 +21,11 @@ pub trait Listener: Send + Sync { } pub struct WeakListenerWrapper { - weak: Weak> + weak: Weak> } impl WeakListenerWrapper { - pub fn boxed(strong: &Arc>) -> Box + 'static> + pub fn boxed(strong: &Arc>) -> Box + 'static> where L: Listener, A: Send + Sync, { Box::new(WeakListenerWrapper { weak: strong.downgrade() }) @@ -37,7 +37,7 @@ impl Listener for WeakListenerWrapper { fn accept(&mut self, a: A) -> ListenerResult { match self.weak.upgrade() { - Some(listener) => match listener.write() { + Some(listener) => match listener.lock() { Ok(mut listener) => listener.accept(a), Err(_) => Err(ListenerError::Poisoned), }, @@ -47,20 +47,20 @@ impl Listener for WeakListenerWrapper } -type KeepAlive = Arc + 'static>>>; -type KeepAliveSample = Arc + 'static>>>; +type KeepAlive = Arc + 'static>>>; +type KeepAliveSample = Arc + 'static>>>; pub struct StrongSubjectWrapper { #[allow(dead_code)] - arc: Arc> + arc: Arc> } impl Subject for StrongSubjectWrapper where S: Subject + Send + Sync, A: Send + Sync { fn listen(&mut self, listener: Box + 'static>) { - self.arc.write().ok().expect("StrongSubjectWrapper::listen").listen(listener); + self.arc.lock().ok().expect("StrongSubjectWrapper::listen").listen(listener); } } @@ -68,7 +68,7 @@ impl Sample for StrongSubjectWrapper where S: Sample + Send + Sync, A: Send + Sync { fn sample(&self) -> A { - self.arc.write().ok().expect("StrongSubjectWrapper::sample").sample() + self.arc.lock().ok().expect("StrongSubjectWrapper::sample").sample() } } @@ -86,7 +86,7 @@ pub trait WrapArc { where L: SamplingSubject, A: Send + Sync + Clone; } -impl WrapArc for Arc> { +impl WrapArc for Arc> { fn wrap_as_listener(&self) -> Box + 'static> where L: Listener, A: Send + Sync { @@ -96,25 +96,25 @@ impl WrapArc for Arc> { fn wrap_as_subject(&self) -> KeepAlive where L: Subject, A: Send + Sync + Clone { - Arc::new(RwLock::new(Box::new(StrongSubjectWrapper { arc: self.clone() }))) + Arc::new(Mutex::new(Box::new(StrongSubjectWrapper { arc: self.clone() }))) } fn wrap_into_subject(self) -> KeepAlive where L: Subject, A: Send + Sync + Clone { - Arc::new(RwLock::new(Box::new(StrongSubjectWrapper { arc: self }))) + Arc::new(Mutex::new(Box::new(StrongSubjectWrapper { arc: self }))) } fn wrap_as_sampling_subject(&self) -> KeepAliveSample where L: SamplingSubject, A: Send + Sync + Clone { - Arc::new(RwLock::new(Box::new(StrongSubjectWrapper { arc: self.clone() }))) + Arc::new(Mutex::new(Box::new(StrongSubjectWrapper { arc: self.clone() }))) } fn wrap_into_sampling_subject(self) -> KeepAliveSample where L: SamplingSubject, A: Send + Sync + Clone { - Arc::new(RwLock::new(Box::new(StrongSubjectWrapper { arc: self }))) + Arc::new(Mutex::new(Box::new(StrongSubjectWrapper { arc: self }))) } } @@ -273,7 +273,6 @@ impl Listener for Holder { pub struct Snapper { current: A, - update: Option, source: Source<(A, B)>, #[allow(dead_code)] keep_alive: (KeepAliveSample, KeepAlive), @@ -283,7 +282,6 @@ impl Snapper { pub fn new(initial: A, keep_alive: (KeepAliveSample, KeepAlive)) -> Snapper { Snapper { current: initial, - update: None, source: Source::new(), keep_alive: keep_alive } @@ -304,41 +302,31 @@ impl Subject<(A, B)> for Snapper { pub struct WeakSnapperWrapper { - weak: Weak>>, + weak: Weak>>, } -impl WeakSnapperWrapper { - pub fn boxed(strong: &Arc>>) -> Box + 'static> { +impl WeakSnapperWrapper { + pub fn boxed(strong: &Arc>>) -> Box + 'static> { Box::new(WeakSnapperWrapper { weak: strong.downgrade() }) } } -impl Listener for WeakSnapperWrapper { +impl Listener for WeakSnapperWrapper { fn accept(&mut self, a: A) -> ListenerResult { - match self.weak.upgrade() { - Some(arc) => match arc.write() { - Ok(mut snapper) => { - snapper.update = Some(a); - let weak = self.weak.clone(); - register_callback(move || { - match weak.upgrade() { - Some(arc) => { - let mut snapper = arc.write().ok() - .expect("snapshot too poisonous for callback"); - match snapper.update.take() { - Some(up) => snapper.current = up, - None => (), - } - } - None => (), - } - }); - Ok(()) + let weak = self.weak.clone(); + register_callback(move || { + let _result = match weak.upgrade() { + Some(arc) => match arc.lock() { + Ok(mut snapper) => { + snapper.current = a.clone(); + Ok(()) + }, + Err(_) => Err(ListenerError::Poisoned), }, - Err(_) => Err(ListenerError::Poisoned), - }, - None => Err(ListenerError::Disappeared), - } + None => Err(ListenerError::Disappeared), + }; + }); + Ok(()) } } @@ -459,7 +447,7 @@ impl Sample for Lift2 pub struct WeakLift2Wrapper { - weak: Weak>>, + weak: Weak>>, } impl WeakLift2Wrapper @@ -468,7 +456,7 @@ impl WeakLift2Wrapper C: Send + Sync + Clone, F: Fn(A, B) -> C + Send + Sync, { - pub fn boxed(strong: &Arc>>) -> Box + 'static> { + pub fn boxed(strong: &Arc>>) -> Box + 'static> { Box::new(WeakLift2Wrapper { weak: strong.downgrade() }) } } @@ -481,7 +469,7 @@ impl Listener for WeakLift2Wrapper { fn accept(&mut self, b: B) -> ListenerResult { match self.weak.upgrade() { - Some(arc) => match arc.write() { + Some(arc) => match arc.lock() { Ok(mut snapper) => { snapper.current.1 = b; let (a, b) = snapper.current.clone(); @@ -496,6 +484,89 @@ impl Listener for WeakLift2Wrapper } +/// Helper object to create loops in the event graph. +/// +/// This allows one to create a Listener/Subject node in the graph first and +/// supply the object to be kept alive later. +pub struct LoopCell { + current: A, + source: Source, + #[allow(dead_code)] + keep_alive: (KeepAliveSample, KeepAliveSample), +} + +impl LoopCell { + /// Create a new looping + pub fn new(initial: A, keep_alive: (KeepAliveSample, KeepAliveSample)) + -> LoopCell + { + LoopCell { + current: initial, + source: Source::new(), + keep_alive: keep_alive, + } + } +} + +impl Subject for LoopCell { + fn listen(&mut self, listener: Box + 'static>) { + self.source.listen(listener); + } +} + +impl Listener for LoopCell { + fn accept(&mut self, a: A) -> ListenerResult { + self.current = a.clone(); + self.source.accept(a) + } +} + +impl Sample for LoopCell { + fn sample(&self) -> A { + self.current.clone() + } +} + + +/// Entry into a cell loop. +/// +/// This feeds the cell loop with data it has produced itself. +pub struct LoopCellEntry { + current: A, + source: Source, +} + +impl LoopCellEntry { + /// Create a new looping + pub fn new(initial: A) -> LoopCellEntry + { + LoopCellEntry { + current: initial, + source: Source::new(), + } + } +} + +impl Subject for LoopCellEntry { + fn listen(&mut self, listener: Box + 'static>) { + self.source.listen(listener); + } +} + +impl Listener for LoopCellEntry { + fn accept(&mut self, a: A) -> ListenerResult { + self.current = a.clone(); + self.source.accept(a) + } +} + +impl Sample for LoopCellEntry { + fn sample(&self) -> A { + self.current.clone() + } +} + + pub struct ChannelBuffer { sender: Mutex>, #[allow(dead_code)] @@ -523,87 +594,87 @@ impl Listener for ChannelBuffer { #[cfg(test)] mod test { - use std::sync::{Arc, RwLock, mpsc}; + use std::sync::{Arc, Mutex, mpsc}; use transaction::commit; use super::*; #[test] fn src_recv() { - let src = Arc::new(RwLock::new(Source::new())); + let src = Arc::new(Mutex::new(Source::new())); let (tx, rx) = mpsc::channel(); - let recv = Arc::new(RwLock::new(ChannelBuffer::new(tx, src.wrap_as_subject()))); - src.write().unwrap().listen(recv.wrap_as_listener()); - src.write().unwrap().send(3); + let recv = Arc::new(Mutex::new(ChannelBuffer::new(tx, src.wrap_as_subject()))); + src.lock().unwrap().listen(recv.wrap_as_listener()); + src.lock().unwrap().send(3); assert_eq!(rx.recv(), Ok(3)); } #[test] fn map() { - let src = Arc::new(RwLock::new(Source::new())); - let map = Arc::new(RwLock::new(Mapper::new(|x: i32| x + 3, src.wrap_as_subject()))); - src.write().unwrap().listen(map.wrap_as_listener()); + let src = Arc::new(Mutex::new(Source::new())); + let map = Arc::new(Mutex::new(Mapper::new(|x: i32| x + 3, src.wrap_as_subject()))); + src.lock().unwrap().listen(map.wrap_as_listener()); let (tx, rx) = mpsc::channel(); - let recv = Arc::new(RwLock::new(ChannelBuffer::new(tx, map.wrap_as_subject()))); - map.write().unwrap().listen(recv.wrap_as_listener()); - src.write().unwrap().send(3); + let recv = Arc::new(Mutex::new(ChannelBuffer::new(tx, map.wrap_as_subject()))); + map.lock().unwrap().listen(recv.wrap_as_listener()); + src.lock().unwrap().send(3); assert_eq!(rx.recv(), Ok(6)); } #[test] fn fork() { - let src = Arc::new(RwLock::new(Source::new())); - let map = Arc::new(RwLock::new(Mapper::new(|x: i32| x + 3, src.wrap_as_subject()))); - src.write().unwrap().listen(map.wrap_as_listener()); + let src = Arc::new(Mutex::new(Source::new())); + let map = Arc::new(Mutex::new(Mapper::new(|x: i32| x + 3, src.wrap_as_subject()))); + src.lock().unwrap().listen(map.wrap_as_listener()); let (tx1, rx1) = mpsc::channel(); - let recv1 = Arc::new(RwLock::new(ChannelBuffer::new(tx1, map.wrap_as_subject()))); - map.write().unwrap().listen(recv1.wrap_as_listener()); + let recv1 = Arc::new(Mutex::new(ChannelBuffer::new(tx1, map.wrap_as_subject()))); + map.lock().unwrap().listen(recv1.wrap_as_listener()); let (tx2, rx2) = mpsc::channel(); - let recv2 = Arc::new(RwLock::new(ChannelBuffer::new(tx2, src.wrap_as_subject()))); - src.write().unwrap().listen(recv2.wrap_as_listener()); - src.write().unwrap().send(4); + let recv2 = Arc::new(Mutex::new(ChannelBuffer::new(tx2, src.wrap_as_subject()))); + src.lock().unwrap().listen(recv2.wrap_as_listener()); + src.lock().unwrap().send(4); assert_eq!(rx1.recv(), Ok(7)); assert_eq!(rx2.recv(), Ok(4)); } #[test] fn filter() { - let src = Arc::new(RwLock::new(Source::new())); - let filter = Arc::new(RwLock::new(Filter::new(src.wrap_as_subject()))); - src.write().unwrap().listen(filter.wrap_as_listener()); + let src = Arc::new(Mutex::new(Source::new())); + let filter = Arc::new(Mutex::new(Filter::new(src.wrap_as_subject()))); + src.lock().unwrap().listen(filter.wrap_as_listener()); let (tx, rx) = mpsc::channel(); - let recv = Arc::new(RwLock::new(ChannelBuffer::new(tx, filter.wrap_as_subject()))); - filter.write().unwrap().listen(recv.wrap_as_listener()); - src.write().unwrap().send(None); - src.write().unwrap().send(Some(3)); + let recv = Arc::new(Mutex::new(ChannelBuffer::new(tx, filter.wrap_as_subject()))); + filter.lock().unwrap().listen(recv.wrap_as_listener()); + src.lock().unwrap().send(None); + src.lock().unwrap().send(Some(3)); assert_eq!(rx.recv(), Ok(3)); } #[test] fn holder() { - let src = Arc::new(RwLock::new(Source::new())); - let holder = Arc::new(RwLock::new(Holder::new(1, src.wrap_as_subject()))); - src.write().unwrap().listen(holder.wrap_as_listener()); - assert_eq!(holder.write().unwrap().sample(), 1); - src.write().unwrap().send(3); - assert_eq!(holder.write().unwrap().sample(), 3); + let src = Arc::new(Mutex::new(Source::new())); + let holder = Arc::new(Mutex::new(Holder::new(1, src.wrap_as_subject()))); + src.lock().unwrap().listen(holder.wrap_as_listener()); + assert_eq!(holder.lock().unwrap().sample(), 1); + src.lock().unwrap().send(3); + assert_eq!(holder.lock().unwrap().sample(), 3); } #[test] fn snapper() { - let src1 = Arc::new(RwLock::new(Source::::new())); - let src2 = Arc::new(RwLock::new(Source::::new())); - let holder = Arc::new(RwLock::new(Holder::new(1, src1.wrap_as_subject()))); - src1.write().unwrap().listen(holder.wrap_as_listener()); - let snapper = Arc::new(RwLock::new(Snapper::new(3, (holder.wrap_as_sampling_subject(), src2.wrap_as_subject())))); - src1.write().unwrap().listen(WeakSnapperWrapper::boxed(&snapper)); - src2.write().unwrap().listen(snapper.wrap_as_listener()); + let src1 = Arc::new(Mutex::new(Source::::new())); + let src2 = Arc::new(Mutex::new(Source::::new())); + let holder = Arc::new(Mutex::new(Holder::new(1, src1.wrap_as_subject()))); + src1.lock().unwrap().listen(holder.wrap_as_listener()); + let snapper = Arc::new(Mutex::new(Snapper::new(3, (holder.wrap_as_sampling_subject(), src2.wrap_as_subject())))); + src1.lock().unwrap().listen(WeakSnapperWrapper::boxed(&snapper)); + src2.lock().unwrap().listen(snapper.wrap_as_listener()); let (tx, rx) = mpsc::channel(); - let recv = Arc::new(RwLock::new(ChannelBuffer::new(tx, snapper.wrap_as_subject()))); - snapper.write().unwrap().listen(recv.wrap_as_listener()); - commit((), |_| src2.write().unwrap().send(6.0)); + let recv = Arc::new(Mutex::new(ChannelBuffer::new(tx, snapper.wrap_as_subject()))); + snapper.lock().unwrap().listen(recv.wrap_as_listener()); + commit((), |_| src2.lock().unwrap().send(6.0)); assert_eq!(rx.recv(), Ok((3, 6.0))); - commit((), |_| src1.write().unwrap().send(5)); - commit((), |_| src2.write().unwrap().send(-4.0)); + commit((), |_| src1.lock().unwrap().send(5)); + commit((), |_| src2.lock().unwrap().send(-4.0)); assert_eq!(rx.recv(), Ok((5, -4.0))); } } diff --git a/src/test_lib.rs b/src/test_lib.rs index cf31314..d0b346e 100644 --- a/src/test_lib.rs +++ b/src/test_lib.rs @@ -216,3 +216,20 @@ fn snapshot_order_alternative() { sink.send(1); assert_eq!(iter.next(), Some((0, 1))); } + +#[test] +fn cyclic_snapshot_accum() { + let sink = Sink::::new(); + let stream = sink.stream(); + let accum = Cell::::cyclic(0, |accum| + accum.snapshot(&stream) + .map(|(a, s)| a + s) + ); + assert_eq!(accum.sample(), 0); + sink.send(3); + assert_eq!(accum.sample(), 3); + sink.send(7); + assert_eq!(accum.sample(), 10); + sink.send(-21); + assert_eq!(accum.sample(), -11); +}