diff --git a/src/lib.rs b/src/lib.rs index 86370e6..4c40ea6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(type_alias_impl_trait)] /// A library which provide similar functionality of rxjs in rust. /// See usage in https://rxjs-dev.firebaseapp.com/guide/operators /// It supports most of the operators in rxjs. But due to language difference, diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 313752a..bbb7bc0 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -13,6 +13,7 @@ pub use combination::with_latest_from::WithLatestFrom; pub use transform::simple_count_buffer::SimpleCountBufferedStream; pub use transform::overlapped_count_buffer::OverlappedCountBufferedStream; pub use transform::simple_time_buffer::SimpleTimeBufferredStream; +pub use transform::overlapped_time_buffer::OverlappedTimeBufferedStream; use super::source; // static operators @@ -100,11 +101,20 @@ pub trait RxStreamEx: Stream { OverlappedCountBufferedStream::new(self, count, skip) } - fn buffer_time(self, time_span: Duration) -> SimpleTimeBufferredStream + fn buffer_time(self, time_span: u64) -> SimpleTimeBufferredStream where Self: Sized { SimpleTimeBufferredStream::new(self, time_span) } + fn buffer_time_with_creation_interval( + self, + time_span: Duration, + creation_interval: Duration, + ) -> OverlappedTimeBufferedStream + where Self: Sized, Self::Item: Clone + { + OverlappedTimeBufferedStream::new(self, time_span, creation_interval) + } } \ No newline at end of file diff --git a/src/operators/transform/mod.rs b/src/operators/transform/mod.rs index 631df7b..a3b4f79 100644 --- a/src/operators/transform/mod.rs +++ b/src/operators/transform/mod.rs @@ -3,4 +3,5 @@ pub mod simple_count_buffer; pub mod simple_time_buffer; pub mod overlapped_buffer; pub mod overlapped_count_buffer; +pub mod overlapped_time_buffer; mod buffered_stream; \ No newline at end of file diff --git a/src/operators/transform/overlapped_buffer.rs b/src/operators/transform/overlapped_buffer.rs index 89f87c9..c8217ac 100644 --- a/src/operators/transform/overlapped_buffer.rs +++ b/src/operators/transform/overlapped_buffer.rs @@ -5,19 +5,18 @@ pub trait BufferOpener { fn check_open(&mut self) -> bool; } -pub trait BufferCreator { - type B: Buffer; - fn new_buffer(&mut self) -> Self::B; +pub trait BufferCreator { + fn new_buffer(&mut self) -> B; } #[derive(Default)] -pub struct OverlappedBuffer> where B: Buffer, B::V: Clone { +pub struct OverlappedBuffer> where B: Buffer, B::V: Clone { pub buffers: VecDeque, pub opener: O, pub creator: C } -impl> OverlappedBuffer where B::V: Clone { +impl> OverlappedBuffer where B::V: Clone { pub fn new_internal(opener: O, creator: C) -> Self { OverlappedBuffer { buffers: VecDeque::new(), @@ -27,7 +26,7 @@ impl> OverlappedBuffer> Buffer for OverlappedBuffer where B::V: Clone { +impl> Buffer for OverlappedBuffer where B::V: Clone { type V = B::V; fn insert(&mut self, v:Self::V) -> () { diff --git a/src/operators/transform/overlapped_count_buffer.rs b/src/operators/transform/overlapped_count_buffer.rs index d665103..d885c4a 100644 --- a/src/operators/transform/overlapped_count_buffer.rs +++ b/src/operators/transform/overlapped_count_buffer.rs @@ -22,8 +22,7 @@ impl BufferOpener for CountBufferOpener { } } } -impl BufferCreator for CountBufferCreator { - type B = SimpleCountBuffer; +impl BufferCreator> for CountBufferCreator { fn new_buffer(&mut self) -> SimpleCountBuffer { SimpleCountBuffer::new(self.max_count) } diff --git a/src/operators/transform/overlapped_time_buffer.rs b/src/operators/transform/overlapped_time_buffer.rs new file mode 100644 index 0000000..3490327 --- /dev/null +++ b/src/operators/transform/overlapped_time_buffer.rs @@ -0,0 +1,60 @@ +use super::overlapped_buffer::{BufferCreator, BufferOpener, OverlappedBuffer}; +use super::buffered_stream::BufferedStream; +use super::simple_time_buffer::{new_simple_time_buffer, SimpleTimeBuffer}; +use std::time::{Duration, Instant}; +use futures::{Stream, Async}; + + +pub struct TimeBufferOpener { + period: u64, + last_check: Instant, +} +pub struct TimeBufferCreator { + time_span: u64, +} + + +pub struct StreamBasedOpener { + s: S +} + +impl BufferOpener for StreamBasedOpener { + fn check_open(&mut self) -> bool { + match self.s.poll() { + Ok(Async::Ready(Some(r))) => true, + _ => false, + } + } +} + +impl BufferCreator> for TimeBufferCreator { + fn new_buffer(&mut self) -> SimpleTimeBuffer { + new_simple_time_buffer(self.time_span) + } +} + +pub type OverlappedTimeBuffer = OverlappedBuffer, TimeBufferOpener, TimeBufferCreator>; + +impl OverlappedTimeBuffer { + fn new(time_span: Duration, creation_interval: Duration) -> Self { + let mut r = OverlappedTimeBuffer::new_internal(TimeBufferOpener { + period: creation_interval, + last_check: Instant::now(), + }, TimeBufferCreator { + time_span: time_span, + }); + let b = >::new_buffer(&mut r.creator); + r.buffers.push_back(b); + r + } +} + +pub type OverlappedTimeBufferedStream = BufferedStream::Item>>; +impl OverlappedTimeBufferedStream where S::Item: Clone { + pub fn new(s: S, time_span: Duration, creation_interval: Duration) -> Self { + OverlappedTimeBufferedStream { + s: s.fuse(), + buffer: OverlappedTimeBuffer::new(time_span, creation_interval), + } + } +} \ No newline at end of file diff --git a/src/operators/transform/simple_time_buffer.rs b/src/operators/transform/simple_time_buffer.rs index a0d015d..82de4f2 100644 --- a/src/operators/transform/simple_time_buffer.rs +++ b/src/operators/transform/simple_time_buffer.rs @@ -1,47 +1,58 @@ use super::buffered_stream::{Buffer, BufferedStream}; +use futures::{Stream, Async}; use std::mem; -use futures::Stream; -use std::time::{Instant, Duration}; +use super::super::source; #[derive(Default)] -pub struct SimpleTimeBuffer { +pub struct StreamControlledBuffer { vec: Vec, - start_time: Option, - time_span: Duration, + s: S, } -impl SimpleTimeBuffer { - fn new(time_span: Duration) -> Self { - SimpleTimeBuffer { - vec: Vec::new(), - time_span: time_span, - start_time: None, - } - } -} -impl Buffer for SimpleTimeBuffer { + +impl Buffer for StreamControlledBuffer { type V = V; fn insert(&mut self, v:V) -> () { self.vec.push(v); } fn poll_buffer(&mut self) -> Option> { - if self.start_time.is_none() { - self.start_time = Some(Instant::now()); - } - if Instant::now().duration_since(self.start_time.unwrap()) >= self.time_span { - self.start_time = Some(Instant::now()); - return Some(mem::replace(&mut self.vec, Vec::new())) + match self.s.poll() { + Ok(Async::Ready(_)) => Some(mem::replace(&mut self.vec, Vec::new())), + Ok(Async::NotReady) => None, + _ => None } - None } } +pub fn new_simple_time_buffer(time_span: u64) -> impl Buffer { + StreamControlledBuffer { + vec: Vec::::new(), + s: source::interval(time_span), + } +} + +pub fn new_simple_buferred_stream(s: S, time_span: u64) -> impl Stream, Error=S::Error> { + BufferedStream { + s: s.fuse(), + buffer: new_simple_time_buffer(time_span), + } +} + +/* Below code are not possible because the source stream is a result of source;:interval, which is a +* impl Trait, and cannot be explicitly express. existential type currently has bug. And +* impl Trait in alias is not there. +*/ + +pub type SimpleTimeBuffer = StreamControlledBuffer; pub type SimpleTimeBufferredStream = BufferedStream::Item>>; impl SimpleTimeBufferredStream { - pub fn new(s: S, time_span: Duration) -> Self { + pub fn new(s: S, time_span: u64) -> Self { SimpleTimeBufferredStream { s: s.fuse(), - buffer: SimpleTimeBuffer::new(time_span), + buffer: StreamControlledBuffer { + vec: vec![], + s: source::interval(time_span) + }, } } } \ No newline at end of file diff --git a/src/source.rs b/src/source.rs index 19fc1cc..2826b76 100644 --- a/src/source.rs +++ b/src/source.rs @@ -2,21 +2,24 @@ use futures::Stream; use tokio::timer::Interval; use std::time::{Duration, Instant}; -pub fn timer(initial: u64, period: u64) -> impl Stream { +pub type TimerStream = impl Stream; + +pub fn timer(initial: u64, period: u64) -> TimerStream { let iter = 0u64..; - Interval::new( + let b = Interval::new( Instant::now() + Duration::from_millis(initial), Duration::from_millis(period), - ).zip(futures::stream::iter_ok(iter)).map(|r| r.1) + ).zip(futures::stream::iter_ok(iter)).map(|r| r.1); + b } -pub fn interval(millis: u64) -> impl Stream { +pub fn interval(millis: u64) -> TimerStream { timer(millis, millis) } /// Interval which emit the first value immediately rather than wait for the first period pass -pub fn interval_immediate(millis: u64) -> impl Stream { +pub fn interval_immediate(millis: u64) -> TimerStream { timer(0, millis) } diff --git a/tests/test_transform_operators.rs b/tests/test_transform_operators.rs index 4c784c2..01fefc0 100644 --- a/tests/test_transform_operators.rs +++ b/tests/test_transform_operators.rs @@ -32,13 +32,21 @@ fn simple_count_buffer_emit_vecs_from_timer() { } #[test] -fn simple_time_buffer_emit_vecs_from() { +fn simple_time_buffer_emit_vecs_from_timer() { let mut runtime = Runtime::new().unwrap(); let f = source::interval(30).buffer_time(Duration::from_millis(50)).take(4).collect(); let r = runtime.block_on(f).unwrap(); assert_eq!(r, vec![vec![0], vec![1, 2], vec![3, 4], vec![5, 6]]) } +#[test] +fn simple_time_buffer_emit_vecs_with_less_duration() { + let mut runtime = Runtime::new().unwrap(); + let f = source::interval_immediate(30).buffer_time(Duration::from_millis(12)).take(4).collect(); + let r = runtime.block_on(f).unwrap(); + assert_eq!(r, vec![vec![0], vec![], vec![1], vec![]]) +} + #[test] fn overlapped_count_buffer_skip_large_than_count() { let f = source::of(0..).buffer_count_with_skip(2, 3).take(3).collect().wait().unwrap(); @@ -61,4 +69,28 @@ fn overlapped_count_buffer_skip_smaller_than_count() { fn overlapped_count_buffer_skip_smaller_than_count_with_leftover() { let f = source::of(0..).take(6).buffer_count_with_skip(3, 2).take(3).collect().wait().unwrap(); assert_eq!(f, vec![vec![0, 1, 2], vec![2, 3, 4], vec![4, 5]]) +} + +#[test] +fn ovlapped_time_buffer_creation_time_large_than_span() { + let mut runtime = Runtime::new().unwrap(); + let f = source::interval(10) + .buffer_time_with_creation_interval( + Duration::from_millis(32), + Duration::from_millis(50) + ).take(3).collect(); + let r = runtime.block_on(f).unwrap(); + assert_eq!(r, vec![[0, 1, 2], [5, 6, 7], [10, 11, 12]]) +} + +#[test] +fn ovlapped_time_buffer_creation_time_smaller_than_span() { + let mut runtime = Runtime::new().unwrap(); + let f = source::interval(10) + .buffer_time_with_creation_interval( + Duration::from_millis(32), + Duration::from_millis(22) + ).take(3).collect(); + let r = runtime.block_on(f).unwrap(); + assert_eq!(r, vec![[0, 1, 2], [2, 3, 4], [4, 5, 6]]) } \ No newline at end of file