Skip to content

Commit

Permalink
Typing initial value correctly doesn't require a second type parameter.
Browse files Browse the repository at this point in the history
I enabled strict mode to resolve all the null errors in observable-stream.ts and test/observable-stream.ts
  • Loading branch information
NaridaL committed May 15, 2020
1 parent a12a84f commit c5d3f89
Showing 1 changed file with 35 additions and 21 deletions.
56 changes: 35 additions & 21 deletions src/observable-stream.ts
Expand Up @@ -11,18 +11,18 @@ function self() {
}

export interface IStreamObserver<T> {
next(value: T): void
error(error: any): void
complete(): void
next?(value: T): void
error?(error: any): void
complete?(): void
}

export interface ISubscription {
unsubscribe(): void
}

export interface IObservableStream<T> {
subscribe(observer: IStreamObserver<T>): ISubscription
subscribe(observer: (value: T) => void): ISubscription
subscribe(observer?: IStreamObserver<T> | null): ISubscription
subscribe(observer?: ((value: T) => void) | null): ISubscription
// [Symbol.observable](): IObservable;
}

Expand Down Expand Up @@ -55,26 +55,38 @@ export function toStream<T>(
): IObservableStream<T> {
const computedValue = computed(expression)
return {
subscribe(observer: any): ISubscription {
subscribe(observer?: IStreamObserver<T> | ((value: T) => void) | null): ISubscription {
if ("function" === typeof observer) {
return {
unsubscribe: computedValue.observe(
({ newValue }: { newValue: T }) => observer(newValue),
fireImmediately
),
}
}
if (observer && "object" === typeof observer && observer.next) {
return {
unsubscribe: computedValue.observe(
({ newValue }: { newValue: T }) => observer.next!(newValue),
fireImmediately
),
}
}
return {
unsubscribe: computedValue.observe(
typeof observer === "function"
? ({ newValue }: { newValue: T }) => observer(newValue)
: ({ newValue }: { newValue: T }) => observer.next(newValue),
fireImmediately
),
unsubscribe: () => {},
}
},
[observableSymbol()]: self,
}
}

class StreamListener<T, I> implements IStreamObserver<T | I> {
@observable.ref current: T | I = this.initialValue
subscription: ISubscription
class StreamListener<T> implements IStreamObserver<T> {
@observable.ref current!: T
subscription!: ISubscription

constructor(observable: IObservableStream<T>, private readonly initialValue: I) {
constructor(observable: IObservableStream<T>, initialValue: T) {
runInAction(() => {
this.current = initialValue
this.subscription = observable.subscribe(this)
})
}
Expand Down Expand Up @@ -118,14 +130,16 @@ class StreamListener<T, I> implements IStreamObserver<T | I> {
* console.log("distance moved", debouncedClickDelta.current)
* })
*/
export function fromStream<T, I = undefined>(
export function fromStream<T>(observable: IObservableStream<T>): IStreamListener<T | undefined>
export function fromStream<T>(observable: IObservableStream<T>, initialValue: T): IStreamListener<T>
export function fromStream<T>(
observable: IObservableStream<T>,
initialValue: T | I = undefined
): IStreamListener<T, I> {
initialValue: T = undefined as any
): IStreamListener<T> {
return new StreamListener(observable, initialValue)
}

export interface IStreamListener<T, I = undefined> {
current: T | I
export interface IStreamListener<T> {
current: T
dispose(): void
}

0 comments on commit c5d3f89

Please sign in to comment.