-
Notifications
You must be signed in to change notification settings - Fork 5
/
use-observable-query-fn.ts
131 lines (117 loc) · 4.18 KB
/
use-observable-query-fn.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import { useRef } from 'react';
import { useQueryClient, hashQueryKey } from 'react-query';
import type {
QueryFunction,
QueryKey,
QueryFunctionContext,
} from 'react-query';
import { Observable, of, firstValueFrom } from 'rxjs';
import { catchError, finalize, share, tap, skip } from 'rxjs/operators';
import { storeSubscription, cleanupSubscription } from './subscription-storage';
export interface UseObservableQueryFnResult<
TSubscriptionFnData = unknown,
TSubscriptionKey extends QueryKey = QueryKey
> {
queryFn: QueryFunction<TSubscriptionFnData, TSubscriptionKey>;
clearErrors: () => void;
}
export function useObservableQueryFn<
TSubscriptionFnData = unknown,
TCacheData = TSubscriptionFnData,
TSubscriptionKey extends QueryKey = QueryKey
>(
subscriptionFn: (
context: QueryFunctionContext<TSubscriptionKey>
) => Observable<TSubscriptionFnData>,
dataUpdater: (
data: TSubscriptionFnData,
previousData: unknown,
pageParam: unknown | undefined
) => TCacheData
): UseObservableQueryFnResult<TSubscriptionFnData, TSubscriptionKey> {
const queryClient = useQueryClient();
// We cannot assume that this fn runs for this component.
// It might be a different observer associated to the same query key.
// https://github.com/tannerlinsley/react-query/blob/16b7d290c70639b627d9ada32951d211eac3adc3/src/core/query.ts#L376
// @todo: move from the component scope to queryCache
const failRefetchWith = useRef<false | Error>(false);
const queryFn: QueryFunction<TSubscriptionFnData, TSubscriptionKey> = (
context
) => {
const { queryKey: subscriptionKey, pageParam, signal } = context;
const hashedSubscriptionKey = hashQueryKey(subscriptionKey);
if (failRefetchWith.current) {
throw failRefetchWith.current;
}
type Result = Promise<TSubscriptionFnData> & { cancel?: () => void };
const stream$ = subscriptionFn(context).pipe(share());
const result: Result = firstValueFrom(stream$);
// Fixes scenario when component unmounts before first emit.
// If we do not invalidate the query, the hook will never re-subscribe,
// as data are otherwise marked as fresh.
function cancel() {
queryClient.invalidateQueries(subscriptionKey, undefined, {
cancelRefetch: false,
});
}
// `signal` is available on context from ReactQuery 3.30.0
// If `AbortController` is not available in the current runtime environment
// ReactQuery sets `signal` to `undefined`. In that case we fallback to
// old API, attaching `cancel` fn on promise.
// @see https://tanstack.com/query/v4/docs/guides/query-cancellation
if (signal) {
signal.addEventListener('abort', cancel);
} else {
/* istanbul ignore next */
result.cancel = cancel;
}
// @todo: Skip subscription for SSR
cleanupSubscription(
queryClient,
hashedSubscriptionKey,
pageParam ?? undefined
);
const subscription = stream$
.pipe(
skip(1),
tap((data) => {
queryClient.setQueryData(subscriptionKey, (previousData) =>
dataUpdater(data, previousData, pageParam)
);
}),
catchError((error) => {
failRefetchWith.current = error;
queryClient.setQueryData(subscriptionKey, (data) => data, {
// To make the retryOnMount work
// @see: https://github.com/tannerlinsley/react-query/blob/9e414e8b4f3118b571cf83121881804c0b58a814/src/core/queryObserver.ts#L727
updatedAt: 0,
});
return of(undefined);
}),
finalize(() => {
queryClient.invalidateQueries(subscriptionKey, undefined, {
cancelRefetch: false,
});
})
)
.subscribe();
// remember the current subscription
// see `cleanup` fn for more info
storeSubscription(
queryClient,
hashedSubscriptionKey,
subscription,
pageParam ?? undefined
);
return result;
};
return {
queryFn,
// @todo incorporate into `queryFn`?
clearErrors: () => {
// Once the error has been thrown, and a query result created (with error)
// cleanup the `failRefetchWith`.
failRefetchWith.current = false;
},
};
}