Skip to content

Commit

Permalink
feat(infra): livedata effect
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Mar 23, 2024
1 parent 116edfb commit 8c0693a
Show file tree
Hide file tree
Showing 6 changed files with 728 additions and 490 deletions.
118 changes: 118 additions & 0 deletions packages/common/infra/src/livedata/effect/__tests__/effect.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import type { Subscriber } from 'rxjs';
import { from, Observable, switchMap } from 'rxjs';
import { beforeEach, describe, expect, test, vi } from 'vitest';

import {
catchErrorInto,
effect,
endOn,
LiveData,
mapInto,
startOn,
} from '../../';

describe('example', () => {
type User = {
id: number;
name: string;
};

const fetchUser = vi.fn<[number], Promise<User>>();

const user$ = new LiveData<User | null>(null);
const isLoading$ = new LiveData<boolean>(false);
const error$ = new LiveData<Error | null>(null);

const loadUser = effect(
switchMap((id: number) =>
from(fetchUser(id)).pipe(
mapInto(user$),
catchErrorInto(error$),
startOn(() => isLoading$.next(true)),
endOn(() => isLoading$.next(false))
)
)
);

beforeEach(() => {
fetchUser.mockClear();

user$.next(null);
isLoading$.next(false);
error$.next(null);
});

test('basic', async () => {
fetchUser.mockImplementation(async id => ({ id, name: 'John' }));
loadUser(1);
await vi.waitFor(() =>
expect(user$.value).toStrictEqual({ id: 1, name: 'John' })
);
expect(fetchUser).toHaveBeenCalledOnce();
expect(fetchUser).toHaveBeenCalledWith(1);
});

test('error', async () => {
fetchUser.mockRejectedValue(new Error('some error'));
loadUser(1);
await vi.waitFor(() => expect(error$.value).toBeInstanceOf(Error));
});

test('isLoading', async () => {
let resolveFn: (value: User) => void = null!;
fetchUser.mockReturnValue(
new Promise(resolve => {
resolveFn = resolve;
})
);
loadUser(1);
await vi.waitFor(() => expect(isLoading$.value).toBe(true));
expect(fetchUser).toHaveBeenCalledOnce();
resolveFn({ id: 1, name: 'John' });
await vi.waitFor(() => expect(isLoading$.value).toBe(false));
});

test('switchMap', async () => {
let fetch1: Subscriber<User> = null!;
let fetch1Canceled = false;
fetchUser.mockReturnValue(
new Observable<User>(subscriber => {
fetch1 = subscriber;
return () => {
fetch1Canceled = true;
};
}) as any
);

loadUser(1);
await vi.waitFor(() => expect(fetch1).toBeTruthy());
expect(isLoading$.value).toBe(true);

// start fetch2, should cancel fetch1
let fetch2: Subscriber<User> = null!;
fetchUser.mockReturnValue(
new Observable<User>(subscriber => {
fetch2 = subscriber;
}) as any
);
loadUser(2);
await vi.waitFor(() => expect(fetch1Canceled).toBe(true));
expect(isLoading$.value).toBe(true);

// fetch1 fail, should not affect fetch2
fetch1.error(new Error('some error'));
expect(error$.value).toBe(null);

// make fetch2 complete
fetch2.next({ id: 2, name: 'John' });
fetch2.complete();

// should update user$ to fetch2 result
await vi.waitFor(() =>
expect(user$.value).toStrictEqual({ id: 2, name: 'John' })
);
// should not have error
expect(error$.value).toBe(null);
expect(isLoading$.value).toBe(false);
});
});
64 changes: 64 additions & 0 deletions packages/common/infra/src/livedata/effect/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { DebugLogger } from '@affine/debug';
import { Unreachable } from '@affine/env/constant';
import { type OperatorFunction, Subject } from 'rxjs';

const logger = new DebugLogger('effect');

export interface Effect<T> {
(value: T): void;
}

export function effect<T, A>(op1: OperatorFunction<T, A>): Effect<T>;
export function effect<T, A, B>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>
): Effect<T>;
export function effect<T, A, B, C>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>
): Effect<T>;
export function effect<T, A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Effect<T>;
export function effect<T, A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Effect<T>;
export function effect<T, A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Effect<T>;
export function effect(...args: any[]) {
const subject$ = new Subject<any>();

// eslint-disable-next-line prefer-spread
subject$.pipe.apply(subject$, args as any).subscribe({
next(value) {
logger.error('effect should not emit value', value);
throw new Unreachable('effect should not emit value');
},
complete() {
logger.error('effect unexpected complete');
throw new Unreachable('effect unexpected complete');
},
error(error) {
logger.error('effect uncatched error', error);
throw new Unreachable('effect uncatched error');
},
});

return ((value: unknown) => {
subject$.next(value);
}) as never;
}
Loading

0 comments on commit 8c0693a

Please sign in to comment.