-
Notifications
You must be signed in to change notification settings - Fork 34
/
InngestStepTools.ts
589 lines (548 loc) · 17.8 KB
/
InngestStepTools.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
import canonicalize from "canonicalize";
import { sha1 } from "hash.js";
import { type Jsonify } from "type-fest";
import { ErrCode, functionStoppedRunningErr } from "../helpers/errors";
import { timeStr } from "../helpers/strings";
import {
type ObjectPaths,
type PartialK,
type SendEventPayload,
type SingleOrArray,
type ValueOf,
} from "../helpers/types";
import {
StepOpCode,
type EventPayload,
type HashedOp,
type Op,
} from "../types";
import { type Inngest } from "./Inngest";
import { NonRetriableError } from "./NonRetriableError";
export interface TickOp extends HashedOp {
fn?: (...args: unknown[]) => unknown;
fulfilled: boolean;
resolve: (value: unknown | PromiseLike<unknown>) => void;
reject: (reason?: unknown) => void;
}
/**
* Create a new set of step function tools ready to be used in a step function.
* This function should be run and a fresh set of tools provided every time a
* function is run.
*
* An op stack (function state) is passed in as well as some mutable properties
* that the tools can use to submit a new op.
*/
export const createStepTools = <
Events extends Record<string, EventPayload>,
TriggeringEvent extends keyof Events & string
>(
client: Inngest<Events>
) => {
const state: {
/**
* The tree of all found ops in the entire invocation.
*/
allFoundOps: Record<string, TickOp>;
/**
* All synchronous operations found in this particular tick. The array is
* reset every tick.
*/
tickOps: Record<string, TickOp>;
/**
* A hash of operations found within this tick, with keys being the hashed
* ops themselves (without a position) and the values being the number of
* times that op has been found.
*
* This is used to provide some mutation resilience to the op stack,
* allowing us to survive same-tick mutations of code by ensuring per-tick
* hashes are based on uniqueness rather than order.
*/
tickOpHashes: Record<string, number>;
/**
* Tracks the current operation being processed. This can be used to
* understand the contextual parent of any recorded operations.
*/
currentOp: TickOp | undefined;
/**
* If we've found a user function to run, we'll store it here so a component
* higher up can invoke and await it.
*/
userFnToRun?: (...args: unknown[]) => unknown;
/**
* A boolean to represent whether the user's function is using any step
* tools.
*
* If the function survives an entire tick of the event loop and hasn't
* touched any tools, we assume that it is a single-step async function and
* should be awaited as usual.
*/
hasUsedTools: boolean;
/**
* A function that should be used to reset the state of the tools after a
* tick has completed.
*/
reset: () => void;
/**
* If `true`, any use of step tools will, by default, throw an error. We do
* this when we detect that a function may be mixing step and non-step code.
*
* Created step tooling can decide how to manually handle this on a
* case-by-case basis.
*
* In the future, we can provide a way for a user to override this if they
* wish to and understand the danger of side-effects.
*
* Defaults to `false`.
*/
nonStepFnDetected: boolean;
} = {
allFoundOps: {},
tickOps: {},
tickOpHashes: {},
currentOp: undefined,
hasUsedTools: false,
reset: () => {
state.tickOpHashes = {};
state.allFoundOps = { ...state.allFoundOps, ...state.tickOps };
},
nonStepFnDetected: false,
};
// Start referencing everything
state.tickOps = state.allFoundOps;
/**
* Create a unique hash of an operation using only a subset of the operation's
* properties; will never use `data` and will guarantee the order of the object
* so we don't rely on individual tools for that.
*/
const hashOp = (
/**
* The op to generate a hash from. We only use a subset of the op's properties
* when creating the hash.
*/
op: Op
): HashedOp => {
const obj = {
parent: state.currentOp?.id ?? null,
op: op.op,
name: op.name,
opts: op.opts ?? null,
};
const collisionHash = _internals.hashData(obj);
const pos = (state.tickOpHashes[collisionHash] =
(state.tickOpHashes[collisionHash] ?? -1) + 1);
return {
...op,
id: _internals.hashData({ pos, ...obj }),
};
};
/**
* A local helper used to create tools that can be used to submit an op.
*
* When using this function, a generic type should be provided which is the
* function signature exposed to the user.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const createTool = <T extends (...args: any[]) => Promise<unknown>>(
/**
* A function that returns an ID for this op. This is used to ensure that
* the op stack is correctly filled, submitted, and retrieved with the same
* ID.
*
* It is passed the arguments passed by the user.
*
* Most simple tools will likely only need to define this.
*/
matchOp: (
/**
* Arguments passed by the user.
*/
...args: Parameters<T>
) => Omit<Op, "data" | "error">,
opts?: {
/**
* Optionally, we can also provide a function that will be called when
* Inngest tells us to run this operation.
*
* If this function is defined, the first time the tool is used it will
* report the desired operation (including options) to the Inngest. Inngest
* will then call back to the function to tell it to run the step and then
* retrieve data.
*
* We do this in order to allow functionality such as per-step retries; this
* gives the SDK the opportunity to tell Inngest what it wants to do before
* it does it.
*
* This function is passed the arguments passed by the user. It will be run
* when we receive an operation matching this one that does not contain a
* `data` property.
*/
fn?: (...args: Parameters<T>) => unknown;
/**
* If `true` and we have detected that this is a non-step function, the
* provided `fn` will be called and the result returned immediately
* instead of being executed later.
*
* If no `fn` is provided to the tool, this will throw the same error as
* if this setting was `false`.
*/
nonStepExecuteInline?: boolean;
}
): T => {
return ((...args: Parameters<T>): Promise<unknown> => {
if (state.nonStepFnDetected) {
if (opts?.nonStepExecuteInline && opts.fn) {
return Promise.resolve(opts.fn(...args));
}
throw new NonRetriableError(
functionStoppedRunningErr(ErrCode.STEP_USED_AFTER_ASYNC)
);
}
state.hasUsedTools = true;
const opId = hashOp(matchOp(...args));
return new Promise<unknown>((resolve, reject) => {
state.tickOps[opId.id] = {
...opId,
...(opts?.fn ? { fn: () => opts.fn?.(...args) } : {}),
resolve,
reject,
fulfilled: false,
};
});
}) as T;
};
/**
* Define the set of tools the user has access to for their step functions.
*
* Each key is the function name and is expected to run `createTool` and pass
* a generic type for that function as it will appear in the user's code.
*/
const tools = {
/**
* Send one or many events to Inngest. Should always be used in place of
* `inngest.send()` to ensure that the event send is successfully retried
* and not sent multiple times due to memoisation.
*
* @example
* ```ts
* await step.sendEvent("app/user.created", { data: { id: 123 } });
*
* await step.sendEvent({ name: "app/user.created", data: { id: 123 } });
*
* await step.sendEvent([
* {
* name: "app/user.created",
* data: { id: 123 },
* },
* {
* name: "app/user.feed.created",
* data: { id: 123 },
* },
* ]);
* ```
*
* Returns a promise that will resolve once the event has been sent.
*/
sendEvent: createTool<{
<Payload extends SendEventPayload<Events>>(payload: Payload): ReturnType<
Inngest["send"]
>;
<Event extends keyof Events & string>(
name: Event,
payload: SingleOrArray<
PartialK<Omit<Events[Event], "name" | "v">, "ts">
>
): ReturnType<Inngest["send"]>;
}>(
(nameOrPayload, maybePayload) => {
let payloads: ValueOf<Events>[];
if (typeof nameOrPayload === "string") {
/**
* Add our payloads and ensure they all have a name.
*/
payloads = (Array.isArray(maybePayload)
? maybePayload
: maybePayload
? [maybePayload]
: []
).map((payload) => ({
...payload,
name: nameOrPayload,
})) as unknown as typeof payloads;
} else {
/**
* Grab our payloads straight from the args.
*/
payloads = (
Array.isArray(nameOrPayload)
? nameOrPayload
: nameOrPayload
? [nameOrPayload]
: []
) as typeof payloads;
}
return {
op: StepOpCode.StepPlanned,
name: payloads[0]?.name || "sendEvent",
};
},
{
nonStepExecuteInline: true,
fn: (nameOrPayload, maybePayload) => {
return client.send(nameOrPayload, maybePayload);
},
}
),
/**
* Wait for a particular event to be received before continuing. When the
* event is received, it will be returned.
*
* You can also provide options to control the particular event that is
* received, for example to ensure that a user ID matches between two
* events, or to only wait a maximum amount of time before giving up and
* returning `null` instead of any event data.
*/
waitForEvent: createTool<
<IncomingEvent extends keyof Events | EventPayload>(
event: IncomingEvent extends keyof Events
? IncomingEvent
: IncomingEvent extends EventPayload
? IncomingEvent["name"]
: never,
opts:
| string
| ((IncomingEvent extends keyof Events
? WaitForEventOpts<Events[TriggeringEvent], Events[IncomingEvent]>
: IncomingEvent extends EventPayload
? WaitForEventOpts<Events[TriggeringEvent], IncomingEvent>
: never) & {
if?: never;
})
| ((IncomingEvent extends keyof Events
? WaitForEventOpts<Events[TriggeringEvent], Events[IncomingEvent]>
: IncomingEvent extends EventPayload
? WaitForEventOpts<Events[TriggeringEvent], IncomingEvent>
: never) & {
match?: never;
})
) => Promise<
IncomingEvent extends keyof Events
? Events[IncomingEvent] | null
: IncomingEvent | null
>
>(
(
/**
* The event name to wait for.
*/
event,
/**
* Options to control the event we're waiting for.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
opts: WaitForEventOpts<any, any> | string
) => {
const matchOpts: { timeout: string; if?: string } = {
timeout: timeStr(typeof opts === "string" ? opts : opts.timeout),
};
if (typeof opts !== "string") {
if (opts?.match) {
matchOpts.if = `event.${opts.match} == async.${opts.match}`;
} else if (opts?.if) {
matchOpts.if = opts.if;
}
}
return {
op: StepOpCode.WaitForEvent,
name: event as string,
opts: matchOpts,
};
}
),
/**
* Use this tool to run business logic. Each call to `run` will be retried
* individually, meaning you can compose complex workflows that safely
* retry dependent asynchronous actions.
*
* The function you pass to `run` will be called only when this "step" is to
* be executed and can be synchronous or asynchronous.
*
* In either case, the return value of the function will be the return value
* of the `run` tool, meaning you can return and reason about return data
* for next steps.
*/
run: createTool<
<T extends () => unknown>(
/**
* The name of this step as it will appear in the Inngest Cloud UI. This
* is also used as a unique identifier for the step and should not match
* any other steps within this step function.
*/
name: string,
/**
* The function to run when this step is executed. Can be synchronous or
* asynchronous.
*
* The return value of this function will be the return value of this
* call to `run`, meaning you can return and reason about return data
* for next steps.
*/
fn: T
) => Promise<
Jsonify<
T extends () => Promise<infer U>
? Awaited<U extends void ? null : U>
: ReturnType<T> extends void
? null
: ReturnType<T>
>
>
>(
(name) => {
return {
op: StepOpCode.StepPlanned,
name,
};
},
{ fn: (_, fn) => fn() }
),
/**
* Wait a specified amount of time before continuing.
*
* The time to wait can be specified using a `number` of milliseconds or an
* `ms`-compatible time string like `"1 hour"`, `"30 mins"`, or `"2.5d"`.
*
* {@link https://npm.im/ms}
*
* To wait until a particular date, use `sleepUntil` instead.
*/
sleep: createTool<
(
/**
* The amount of time to wait before continuing.
*/
time: number | string
) => Promise<void>
>((time) => {
/**
* The presence of this operation in the returned stack indicates that the
* sleep is over and we should continue execution.
*/
return {
op: StepOpCode.Sleep,
name: timeStr(time),
};
}),
/**
* Wait until a particular date before continuing by passing a `Date`.
*
* To wait for a particular amount of time from now, always use `sleep`
* instead.
*/
sleepUntil: createTool<
(
/**
* The date to wait until before continuing.
*/
time: Date | string
) => Promise<void>
>((time) => {
const date = typeof time === "string" ? new Date(time) : time;
/**
* The presence of this operation in the returned stack indicates that the
* sleep is over and we should continue execution.
*/
try {
return {
op: StepOpCode.Sleep,
name: date.toISOString(),
};
} catch (err) {
/**
* If we're here, it's because the date is invalid. We'll throw a custom
* error here to standardise this response.
*/
// TODO PrettyError
console.warn("Invalid date or date string passed to sleepUntil;", err);
// TODO PrettyError
throw new Error(
`Invalid date or date string passed to sleepUntil: ${time.toString()}`
);
}
}),
};
return [tools, state] as [typeof tools, typeof state];
};
/**
* A set of optional parameters given to a `waitForEvent` call to control how
* the event is handled.
*/
interface WaitForEventOpts<
TriggeringEvent extends EventPayload,
IncomingEvent extends EventPayload
> {
/**
* The step function will wait for the event for a maximum of this time, at
* which point the event will be returned as `null` instead of any event data.
*
* The time to wait can be specified using a `number` of milliseconds, an
* `ms`-compatible time string like `"1 hour"`, `"30 mins"`, or `"2.5d"`, or
* a `Date` object.
*
* {@link https://npm.im/ms}
*/
timeout: number | string | Date;
/**
* If provided, the step function will wait for the incoming event to match
* particular criteria. If the event does not match, it will be ignored and
* the step function will wait for another event.
*
* It must be a string of a dot-notation field name within both events to
* compare, e.g. `"data.id"` or `"user.email"`.
*
* ```
* // Wait for an event where the `user.email` field matches
* match: "user.email"
* ```
*
* All of these are helpers for the `if` option, which allows you to specify
* a custom condition to check. This can be useful if you need to compare
* multiple fields or use a more complex condition.
*
* See the Inngest expressions docs for more information.
*
* {@link https://www.inngest.com/docs/functions/expressions}
*/
match?: ObjectPaths<TriggeringEvent> & ObjectPaths<IncomingEvent>;
/**
* If provided, the step function will wait for the incoming event to match
* the given condition. If the event does not match, it will be ignored and
* the step function will wait for another event.
*
* The condition is a string of Google's Common Expression Language. For most
* simple cases, you might prefer to use `match` instead.
*
* See the Inngest expressions docs for more information.
*
* {@link https://www.inngest.com/docs/functions/expressions}
*/
if?: string;
}
/**
* An operation ready to hash to be used to memoise step function progress.
*
* @internal
*/
export type UnhashedOp = {
name: string;
op: StepOpCode;
opts: Record<string, unknown> | null;
parent: string | null;
pos?: number;
};
const hashData = (op: UnhashedOp): string => {
return sha1().update(canonicalize(op)).digest("hex");
};
/**
* Exported for testing.
*/
export const _internals = { hashData };