/
SkipRepeats.ts
58 lines (47 loc) · 1.58 KB
/
SkipRepeats.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Created by niranjan on 12/10/16.
*/
import {IObservable} from '../types/core/IObservable'
import {IObserver} from '../types/core/IObserver'
import {ISubscription} from '../types/core/ISubscription'
import {IScheduler} from '../types/IScheduler'
import {Curry} from '../lib/Curry'
export type THasher<T, R> = (value: T) => R
export type TSource<T> = IObservable<T>
export type TResult<T> = IObservable<T>
class SkipRepeatsObserver <T, H> implements IObserver<T> {
private hash: H | void = undefined
private init = true
constructor (private hasher: {(a: T): H}, private sink: IObserver<T>) {
}
next (val: T) {
const hash = this.hasher(val)
if (this.init) {
this.init = false
this.sink.next(val)
this.hash = hash
}
else if (this.hash !== hash) {
this.sink.next(val)
this.hash = hash
}
}
error (err: Error) {
this.sink.error(err)
}
complete (): void {
this.sink.complete()
}
}
export class SkipRepeatsObservable <T, H> implements TResult <T> {
constructor (private hashFunction: THasher<T, H>, private source: TSource<T>) {
}
subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
return this.source.subscribe(new SkipRepeatsObserver(this.hashFunction, observer), scheduler)
}
}
export const skipRepeats = Curry(function (hashFunction: {(t: any): any}, source: IObservable<any>) {
return new SkipRepeatsObservable(hashFunction, source)
}) as Function &
{<T, R> (mapper: THasher<T, R>, source: TSource<T>): TResult<T>} &
{<T, R> (mapper: THasher<T, R>): {(source: TSource<T>): TResult<T>}}