Skip to content

Commit

Permalink
feat(multicast): add multicast
Browse files Browse the repository at this point in the history
add multicast feature
  • Loading branch information
tusharmath committed Oct 24, 2016
1 parent c1c8d76 commit 446ec5d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
77 changes: 77 additions & 0 deletions src/operators/Multicast.ts
@@ -0,0 +1,77 @@
/**
* Created by tushar.mathur on 24/10/16.
*/

import {IObservable} from '../types/core/IObservable'
import {IObserver} from '../types/core/IObserver'
import {IScheduler} from '../types/IScheduler'
import {ISubscription} from '../types/core/ISubscription'
import {LinkedList, LinkedListNode} from '../lib/LinkedList'

export class MulticastSubscription<T> implements ISubscription {
closed = false
private node = this.sharedObserver.add(this.observer, this.scheduler)

constructor (private observer: IObserver<T>,
private scheduler: IScheduler,
private sharedObserver: SharedObserver<T>) {
}

unsubscribe (): void {
this.closed = true
this.sharedObserver.remove(this.node)
}
}

export class SharedObserver<T> implements IObserver<T> {
private observers = new LinkedList<IObserver<T>>()
private subscription: ISubscription

constructor (private source: IObservable<T>) {
}

add (observer: IObserver<T>, scheduler: IScheduler) {
const node = this.observers.add(observer)
if (this.observers.length === 1) {
this.subscription = this.source.subscribe(this, scheduler)
}
return node
}

remove (node: LinkedListNode<IObserver<T>>) {
this.observers.remove(node)
if (this.observers.length === 0) {
this.subscription.unsubscribe()
}
}

next (val: T): void {
this.observers.forEach(ob => ob.value.next(val))
}

error (err: Error): void {
this.observers.forEach(ob => ob.value.error(err))
}

complete (): void {
this.observers.forEach(ob => ob.value.complete())
}

}

export class Multicast<T> implements IObservable<T> {
constructor (private source: IObservable<T>) {
}

private sharedObserver = new SharedObserver(this.source)


subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {

return new MulticastSubscription(observer, scheduler, this.sharedObserver)
}
}

export function multicast<T> (source: IObservable<T>) {
return new Multicast(source)
}
37 changes: 37 additions & 0 deletions test/test.Multicast.ts
@@ -0,0 +1,37 @@
/**
* Created by tushar.mathur on 24/10/16.
*/

import test from 'ava'
import {TestScheduler} from '../src/testing/TestScheduler'
import {ReactiveEvents} from '../src/testing/ReactiveEvents'
import {map} from '../src/operators/Map'
import {TestObserver} from '../src/testing/TestObserver'
import {multicast} from '../src/operators/Multicast'

test(t => {
let i = 0
const results0: Array<number> = []
const results1: Array<number> = []
const sh = TestScheduler.of()
const ob0 = new TestObserver(sh)
const ob1 = new TestObserver(sh)
const t$ = multicast(map((x: {(): number}) => x(), sh.Hot([
ReactiveEvents.next(10, () => ++i),
ReactiveEvents.next(20, () => ++i),
ReactiveEvents.next(30, () => ++i),
ReactiveEvents.next(40, () => ++i),
ReactiveEvents.next(50, () => ++i)
])))
t$.subscribe(ob0, sh)
t$.subscribe(ob1, sh)
sh.advanceBy(50)
t.deepEqual(ob0, ob1)
t.deepEqual(ob0.results, [
ReactiveEvents.next(10, 1),
ReactiveEvents.next(20, 2),
ReactiveEvents.next(30, 3),
ReactiveEvents.next(40, 4),
ReactiveEvents.next(50, 5)
])
})

0 comments on commit 446ec5d

Please sign in to comment.