From c5d3f89c34b77f670c54d8a01217ef269afdd843 Mon Sep 17 00:00:00 2001 From: Adrian Leonhard Date: Fri, 15 May 2020 17:50:03 +0200 Subject: [PATCH] Typing initial value correctly doesn't require a second type parameter. I enabled strict mode to resolve all the null errors in observable-stream.ts and test/observable-stream.ts --- src/observable-stream.ts | 56 +++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/src/observable-stream.ts b/src/observable-stream.ts index 7b41053..c197320 100644 --- a/src/observable-stream.ts +++ b/src/observable-stream.ts @@ -11,9 +11,9 @@ function self() { } export interface IStreamObserver { - next(value: T): void - error(error: any): void - complete(): void + next?(value: T): void + error?(error: any): void + complete?(): void } export interface ISubscription { @@ -21,8 +21,8 @@ export interface ISubscription { } export interface IObservableStream { - subscribe(observer: IStreamObserver): ISubscription - subscribe(observer: (value: T) => void): ISubscription + subscribe(observer?: IStreamObserver | null): ISubscription + subscribe(observer?: ((value: T) => void) | null): ISubscription // [Symbol.observable](): IObservable; } @@ -55,26 +55,38 @@ export function toStream( ): IObservableStream { const computedValue = computed(expression) return { - subscribe(observer: any): ISubscription { + subscribe(observer?: IStreamObserver | ((value: T) => void) | null): ISubscription { + if ("function" === typeof observer) { + return { + unsubscribe: computedValue.observe( + ({ newValue }: { newValue: T }) => observer(newValue), + fireImmediately + ), + } + } + if (observer && "object" === typeof observer && observer.next) { + return { + unsubscribe: computedValue.observe( + ({ newValue }: { newValue: T }) => observer.next!(newValue), + fireImmediately + ), + } + } return { - unsubscribe: computedValue.observe( - typeof observer === "function" - ? ({ newValue }: { newValue: T }) => observer(newValue) - : ({ newValue }: { newValue: T }) => observer.next(newValue), - fireImmediately - ), + unsubscribe: () => {}, } }, [observableSymbol()]: self, } } -class StreamListener implements IStreamObserver { - @observable.ref current: T | I = this.initialValue - subscription: ISubscription +class StreamListener implements IStreamObserver { + @observable.ref current!: T + subscription!: ISubscription - constructor(observable: IObservableStream, private readonly initialValue: I) { + constructor(observable: IObservableStream, initialValue: T) { runInAction(() => { + this.current = initialValue this.subscription = observable.subscribe(this) }) } @@ -118,14 +130,16 @@ class StreamListener implements IStreamObserver { * console.log("distance moved", debouncedClickDelta.current) * }) */ -export function fromStream( +export function fromStream(observable: IObservableStream): IStreamListener +export function fromStream(observable: IObservableStream, initialValue: T): IStreamListener +export function fromStream( observable: IObservableStream, - initialValue: T | I = undefined -): IStreamListener { + initialValue: T = undefined as any +): IStreamListener { return new StreamListener(observable, initialValue) } -export interface IStreamListener { - current: T | I +export interface IStreamListener { + current: T dispose(): void }