Skip to content

Commit

Permalink
Merge 47321e7 into 643c44a
Browse files Browse the repository at this point in the history
  • Loading branch information
NaridaL committed May 15, 2020
2 parents 643c44a + 47321e7 commit 7cc657c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 28 deletions.
57 changes: 32 additions & 25 deletions src/observable-stream.ts
@@ -1,4 +1,4 @@
import { computed, observable, IObservableValue, action, runInAction } from "mobx"
import { computed, observable, action, runInAction } from "mobx"

declare var Symbol: any

Expand All @@ -11,18 +11,18 @@ function self() {
}

export interface IStreamObserver<T> {
next(value: T): void
error(error: any): void
complete(): void
next?(value: T): void
error?(error: any): void
complete?(): void
}

export interface ISubscription {
unsubscribe(): void
}

export interface IObservableStream<T> {
subscribe(observer: IStreamObserver<T>): ISubscription
subscribe(observer: (value: T) => void): ISubscription
subscribe(observer?: IStreamObserver<T> | null): ISubscription
subscribe(observer?: ((value: T) => void) | null): ISubscription
// [Symbol.observable](): IObservable;
}

Expand Down Expand Up @@ -55,23 +55,34 @@ export function toStream<T>(
): IObservableStream<T> {
const computedValue = computed(expression)
return {
subscribe(observer: any): ISubscription {
subscribe(observer?: IStreamObserver<T> | ((value: T) => void) | null): ISubscription {
if ("function" === typeof observer) {
return {
unsubscribe: computedValue.observe(
({ newValue }: { newValue: T }) => observer(newValue),
fireImmediately
),
}
}
if (observer && "object" === typeof observer && observer.next) {
return {
unsubscribe: computedValue.observe(
({ newValue }: { newValue: T }) => observer.next!(newValue),
fireImmediately
),
}
}
return {
unsubscribe: computedValue.observe(
typeof observer === "function"
? ({ newValue }: { newValue: T }) => observer(newValue)
: ({ newValue }: { newValue: T }) => observer.next(newValue),
fireImmediately
),
unsubscribe: () => {},
}
},
[observableSymbol()]: self,
}
}

class StreamListener<T> implements IStreamObserver<T> {
@observable.ref current: T = undefined
subscription: ISubscription
@observable.ref current!: T
subscription!: ISubscription

constructor(observable: IObservableStream<T>, initialValue: T) {
runInAction(() => {
Expand Down Expand Up @@ -104,7 +115,6 @@ class StreamListener<T> implements IStreamObserver<T> {
}

/**
*
* Converts a subscribable, observable stream (TC 39 observable / RxJS stream)
* into an object which stores the current value (as `current`). The subscription can be cancelled through the `dispose` method.
* Takes an initial value as second optional argument
Expand All @@ -119,18 +129,15 @@ class StreamListener<T> implements IStreamObserver<T> {
* autorun(() => {
* console.log("distance moved", debouncedClickDelta.current)
* })
*
* @export
* @template T
* @param {IObservableStream<T>} observable
* @returns {{
* current: T;
* dispose(): void;
* }}
*/
export function fromStream<T>(observable: IObservableStream<T>): IStreamListener<T | undefined>
export function fromStream<T, I>(
observable: IObservableStream<T>,
initialValue: I
): IStreamListener<T | I>
export function fromStream<T>(
observable: IObservableStream<T>,
initialValue: T = undefined
initialValue: T = undefined as any
): IStreamListener<T> {
return new StreamListener(observable, initialValue)
}
Expand Down
22 changes: 19 additions & 3 deletions test/observable-stream.ts
Expand Up @@ -11,7 +11,7 @@ test("to observable - should push the initial value by default", () => {

mobx.configure({ enforceActions: "never" })

let values = []
let values: string[] = []

const sub = from(utils.toStream(() => user.firstName + user.lastName, true))
.pipe(map((x) => x.toUpperCase()))
Expand Down Expand Up @@ -39,7 +39,7 @@ test("to observable - should not push the initial value", () => {

mobx.configure({ enforceActions: "never" })

let values = []
let values: string[] = []

const sub = from(utils.toStream(() => user.firstName + user.lastName))
.pipe(map((x) => x.toUpperCase()))
Expand All @@ -62,7 +62,7 @@ test("to observable - should not push the initial value", () => {
test("from observable", (done) => {
mobx.configure({ enforceActions: "observed" })
const fromStream = utils.fromStream(interval(10), -1)
const values = []
const values: number[] = []
const d = mobx.autorun(() => {
values.push(fromStream.current)
})
Expand All @@ -85,3 +85,19 @@ test("from observable", (done) => {
done()
}, 35)
})

test("from observable with initialValue of a different type", async () => {
mobx.configure({ enforceActions: "observed" })
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))

const fromStream = utils.fromStream(interval(10), "start")
const values: (number | string)[] = []
const stopAutorun = mobx.autorun(() => values.push(fromStream.current))

await sleep(35)
expect(fromStream.current).toBe(2)
expect(values).toEqual(["start", 0, 1, 2])
fromStream.dispose()
stopAutorun()
mobx.configure({ enforceActions: "never" })
})

0 comments on commit 7cc657c

Please sign in to comment.