-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
async_caller.ts
156 lines (142 loc) Β· 4.93 KB
/
async_caller.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import pRetry from "p-retry";
import PQueueMod from "p-queue";
const STATUS_NO_RETRY = [
400, // Bad Request
401, // Unauthorized
402, // Payment Required
403, // Forbidden
404, // Not Found
405, // Method Not Allowed
406, // Not Acceptable
407, // Proxy Authentication Required
409, // Conflict
];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const defaultFailedAttemptHandler = (error: any) => {
if (
error.message.startsWith("Cancel") ||
error.message.startsWith("AbortError") ||
error.name === "AbortError"
) {
throw error;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((error as any)?.code === "ECONNABORTED") {
throw error;
}
const status =
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(error as any)?.response?.status ?? (error as any)?.status;
if (status && STATUS_NO_RETRY.includes(+status)) {
throw error;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((error as any)?.error?.code === "insufficient_quota") {
const err = new Error(error?.message);
err.name = "InsufficientQuotaError";
throw err;
}
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type FailedAttemptHandler = (error: any) => any;
export interface AsyncCallerParams {
/**
* The maximum number of concurrent calls that can be made.
* Defaults to `Infinity`, which means no limit.
*/
maxConcurrency?: number;
/**
* The maximum number of retries that can be made for a single call,
* with an exponential backoff between each attempt. Defaults to 6.
*/
maxRetries?: number;
/**
* Custom handler to handle failed attempts. Takes the originally thrown
* error object as input, and should itself throw an error if the input
* error is not retryable.
*/
onFailedAttempt?: FailedAttemptHandler;
}
export interface AsyncCallerCallOptions {
signal?: AbortSignal;
}
/**
* A class that can be used to make async calls with concurrency and retry logic.
*
* This is useful for making calls to any kind of "expensive" external resource,
* be it because it's rate-limited, subject to network issues, etc.
*
* Concurrent calls are limited by the `maxConcurrency` parameter, which defaults
* to `Infinity`. This means that by default, all calls will be made in parallel.
*
* Retries are limited by the `maxRetries` parameter, which defaults to 6. This
* means that by default, each call will be retried up to 6 times, with an
* exponential backoff between each attempt.
*/
export class AsyncCaller {
protected maxConcurrency: AsyncCallerParams["maxConcurrency"];
protected maxRetries: AsyncCallerParams["maxRetries"];
protected onFailedAttempt: AsyncCallerParams["onFailedAttempt"];
private queue: typeof import("p-queue")["default"]["prototype"];
constructor(params: AsyncCallerParams) {
this.maxConcurrency = params.maxConcurrency ?? Infinity;
this.maxRetries = params.maxRetries ?? 6;
this.onFailedAttempt =
params.onFailedAttempt ?? defaultFailedAttemptHandler;
const PQueue = "default" in PQueueMod ? PQueueMod.default : PQueueMod;
this.queue = new PQueue({ concurrency: this.maxConcurrency });
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
call<A extends any[], T extends (...args: A) => Promise<any>>(
callable: T,
...args: Parameters<T>
): Promise<Awaited<ReturnType<T>>> {
return this.queue.add(
() =>
pRetry(
() =>
callable(...args).catch((error) => {
// eslint-disable-next-line no-instanceof/no-instanceof
if (error instanceof Error) {
throw error;
} else {
throw new Error(error);
}
}),
{
onFailedAttempt: this.onFailedAttempt,
retries: this.maxRetries,
randomize: true,
// If needed we can change some of the defaults here,
// but they're quite sensible.
}
),
{ throwOnTimeout: true }
);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callWithOptions<A extends any[], T extends (...args: A) => Promise<any>>(
options: AsyncCallerCallOptions,
callable: T,
...args: Parameters<T>
): Promise<Awaited<ReturnType<T>>> {
// Note this doesn't cancel the underlying request,
// when available prefer to use the signal option of the underlying call
if (options.signal) {
return Promise.race([
this.call<A, T>(callable, ...args),
new Promise<never>((_, reject) => {
options.signal?.addEventListener("abort", () => {
reject(new Error("AbortError"));
});
}),
]);
}
return this.call<A, T>(callable, ...args);
}
fetch(...args: Parameters<typeof fetch>): ReturnType<typeof fetch> {
return this.call(() =>
fetch(...args).then((res) => (res.ok ? res : Promise.reject(res)))
);
}
}