Skip to content

Commit

Permalink
fixup! feat(take-until): add take-until operator
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Jun 10, 2017
1 parent 1a68e00 commit 24b35f3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
4 changes: 2 additions & 2 deletions benchmarks/bm.takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {takeUntil} from '../src/operators/TakeUntil'
import {fromArray} from '../src/sources/FromArray'
import {array, IDeferred, run} from './lib'

const a = array(10)
const a = array(1e4)

export function bm_takeUntil (suite: Suite) {
return suite.add(
Expand All @@ -18,7 +18,7 @@ export function bm_takeUntil (suite: Suite) {
const src = multicast(fromArray(a))
return run(takeUntil(
src,
slice(5, Infinity, src)
slice(5000, Infinity, src)
), d)
},
{defer: true}
Expand Down
57 changes: 33 additions & 24 deletions src/operators/TakeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,47 @@ import {IScheduler} from '../lib/Scheduler'
import {CompositeSubscription, ISubscription} from '../lib/Subscription'
import {curry} from '../lib/Utils'

class SourceObserver<T> implements IObserver<T> {
constructor (private sink: IObserver<T>, private sub: ISubscription) {}

next (val: T): void {
this.sink.next(val)
}

error (err: Error): void {
this.sink.error(err)
}

complete (): void {
this.sink.complete()
this.sub.unsubscribe()
}
}
class SignalObserver<T> implements IObserver<any> {
constructor (private sink: IObserver<T>,
private sub: ISubscription) {}

next (val: any): void {
this.sub.unsubscribe()
this.sink.complete()
}

error (err: Error): void {
this.sink.error(err)
}

complete (): void {
}
}
class TakeUntil<T> implements IObservable<T> {
constructor (private source: IObservable<T>,
private signal: IObservable<T>) {
}

subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
const cSub = new CompositeSubscription()
cSub.add(this.source.subscribe({
next (value: T) {
observer.next(value)
},
complete () {
observer.complete()
cSub.unsubscribe()
},
error (err: Error) {
observer.error(err)
}
}, scheduler))

cSub.add(this.signal.subscribe({
next () {
cSub.unsubscribe()
observer.complete()
},
complete () {},
error (err: Error) {
observer.error(err)
}
}, scheduler))

cSub.add(this.source.subscribe(new SourceObserver(observer, cSub), scheduler))
cSub.add(this.signal.subscribe(new SignalObserver(observer, cSub), scheduler))
return cSub
}
}
Expand Down

0 comments on commit 24b35f3

Please sign in to comment.