Skip to content

Commit

Permalink
refactor(mergeMap): add mergeMap() operator
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Sep 1, 2017
1 parent 8021e17 commit 0aafe65
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
93 changes: 93 additions & 0 deletions src/operators/MergeMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {LinkedListNode} from '../lib/LinkedList'
/**
* Created by tushar on 31/08/17.
*/
import {IObservable} from '../lib/Observable'
import {IObserver} from '../lib/Observer'
import {IScheduler} from '../lib/Scheduler'
import {CompositeSubscription, ISubscription} from '../lib/Subscription'

type Project<T, S> = (t: T) => IObservable<S>

class MergeMapInnerObserver<T, S> implements IObserver<S> {
node: LinkedListNode<ISubscription>
constructor(private parent: MergeMapOuterObserver<T, S>) {}

next(val: S): void {
this.parent.sink.next(val)
}

error(err: Error): void {
this.parent.sink.error(err)
}

setup(node: LinkedListNode<ISubscription>) {
this.node = node
}

complete(): void {
this.parent.cSub.remove(this.node)
this.parent.checkComplete()
}
}

class MergeMapOuterObserver<T, S> implements IObserver<T> {
private __completed: boolean = false

constructor(
readonly conc: number,
readonly proj: Project<T, S>,
readonly sink: IObserver<S>,
readonly cSub: CompositeSubscription,
readonly sh: IScheduler
) {}

next(val: T): void {
if (this.conc + 1 - this.cSub.length()) {
const innerObserver = new MergeMapInnerObserver(this)
const node = this.cSub.add(
this.proj(val).subscribe(innerObserver, this.sh)
)
innerObserver.setup(node)
}
}

error(err: Error): void {
this.sink.error(err)
}

complete(): void {
this.__completed = true
this.checkComplete()
}

checkComplete() {
if (this.cSub.length() === 1 && this.__completed) this.sink.complete()
}
}

class MergeMap<T, S> implements IObservable<S> {
constructor(
private conc: number,
private proj: Project<T, S>,
private src: IObservable<T>
) {}
subscribe(observer: IObserver<S>, scheduler: IScheduler): ISubscription {
const cSub = new CompositeSubscription()
const outerObserver = new MergeMapOuterObserver<T, S>(
this.conc,
this.proj,
observer,
cSub,
scheduler
)
cSub.add(this.src.subscribe(outerObserver, scheduler))
return cSub
}
}

export const mergeMap = <T, S>(
conc: number,
project: Project<T, S>,
source: IObservable<T>
): IObservable<S> => new MergeMap(conc, project, source)
51 changes: 51 additions & 0 deletions test/test.MergeMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Created by tushar on 31/08/17.
*/
import {test} from 'ava'
import {mergeMap} from '../src/operators/MergeMap'
import {EVENT} from '../src/testing/Events'
import {TestScheduler} from '../src/testing/TestScheduler'

const {next, complete} = EVENT
test('should work like flatMap() when concurrency is Infinity', t => {
const sh = TestScheduler.of()
const sa$$ = sh.Cold([
next(10, 'A0'),
next(20, 'A1'),
next(30, 'A2'),
complete(40)
])
const sb$$ = sh.Cold([
next(10, 'B0'),
next(20, 'B1'),
next(30, 'B2'),
complete(40)
])
const s$$ = sh.Cold([next(10, sa$$), next(20, sb$$), complete(100)])
const {results} = sh.start<number>(() => mergeMap(Infinity, i => i, s$$))

t.deepEqual(results, [
next(220, 'A0'),
next(230, 'A1'),
next(230, 'B0'),
next(240, 'A2'),
next(240, 'B1'),
next(250, 'B2'),
complete(300)
])
})

test('should work like concatMap() when concurrency is 1', t => {
const sh = TestScheduler.of()
const s$ = [
sh.Cold([next(10, 'A0'), complete(50)]),
sh.Cold([next(10, 'B0'), complete(50)]),
sh.Cold([next(10, 'C0'), complete(50)])
]

const s$$ = sh.Hot([next(210, 0), next(250, 1), next(300, 2), complete(800)])
const {results} = sh.start<number>(() =>
mergeMap(1, (i: number) => s$[i], s$$)
)
t.deepEqual(results, [next(220, 'A0'), next(310, 'C0'), complete(800)])
})

0 comments on commit 0aafe65

Please sign in to comment.