From 24b35f3a9c66bef755c52fbdb2c53575b53d34d7 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sat, 10 Jun 2017 13:01:44 +0530 Subject: [PATCH] fixup! feat(take-until): add take-until operator --- benchmarks/bm.takeUntil.ts | 4 +-- src/operators/TakeUntil.ts | 57 ++++++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/benchmarks/bm.takeUntil.ts b/benchmarks/bm.takeUntil.ts index 3c26a3b..33c7df2 100644 --- a/benchmarks/bm.takeUntil.ts +++ b/benchmarks/bm.takeUntil.ts @@ -9,7 +9,7 @@ import {takeUntil} from '../src/operators/TakeUntil' import {fromArray} from '../src/sources/FromArray' import {array, IDeferred, run} from './lib' -const a = array(10) +const a = array(1e4) export function bm_takeUntil (suite: Suite) { return suite.add( @@ -18,7 +18,7 @@ export function bm_takeUntil (suite: Suite) { const src = multicast(fromArray(a)) return run(takeUntil( src, - slice(5, Infinity, src) + slice(5000, Infinity, src) ), d) }, {defer: true} diff --git a/src/operators/TakeUntil.ts b/src/operators/TakeUntil.ts index ec937de..624f243 100644 --- a/src/operators/TakeUntil.ts +++ b/src/operators/TakeUntil.ts @@ -8,7 +8,38 @@ import {IScheduler} from '../lib/Scheduler' import {CompositeSubscription, ISubscription} from '../lib/Subscription' import {curry} from '../lib/Utils' +class SourceObserver implements IObserver { + constructor (private sink: IObserver, private sub: ISubscription) {} + next (val: T): void { + this.sink.next(val) + } + + error (err: Error): void { + this.sink.error(err) + } + + complete (): void { + this.sink.complete() + this.sub.unsubscribe() + } +} +class SignalObserver implements IObserver { + constructor (private sink: IObserver, + private sub: ISubscription) {} + + next (val: any): void { + this.sub.unsubscribe() + this.sink.complete() + } + + error (err: Error): void { + this.sink.error(err) + } + + complete (): void { + } +} class TakeUntil implements IObservable { constructor (private source: IObservable, private signal: IObservable) { @@ -16,30 +47,8 @@ class TakeUntil implements IObservable { subscribe (observer: IObserver, scheduler: IScheduler): ISubscription { const cSub = new CompositeSubscription() - cSub.add(this.source.subscribe({ - next (value: T) { - observer.next(value) - }, - complete () { - observer.complete() - cSub.unsubscribe() - }, - error (err: Error) { - observer.error(err) - } - }, scheduler)) - - cSub.add(this.signal.subscribe({ - next () { - cSub.unsubscribe() - observer.complete() - }, - complete () {}, - error (err: Error) { - observer.error(err) - } - }, scheduler)) - + cSub.add(this.source.subscribe(new SourceObserver(observer, cSub), scheduler)) + cSub.add(this.signal.subscribe(new SignalObserver(observer, cSub), scheduler)) return cSub } }