Skip to content

Commit

Permalink
Run onSendEvent middleware hooks for step.invoke() payloads (#503)
Browse files Browse the repository at this point in the history
## Summary
<!-- Succinctly describe your change, providing context, what you've
changed, and why. -->

The `onSendEvent.transformInput()` middleware hook is used to intercept
events being sent and change either the shape or number of events being
sent. When invoking a function via `step.invoke()`, an _internal_
`inngest/function.invoked` event is sent, though the
`onSendEvent.transformInput()` hook is **not** currently run.

For middleware like `@inngest/middleware-encryption`, this poses a
challenge, as it can no longer encrypt fields when using `step.invoke()`
instead of `step.sendEvent()`.

This PR ensures that the hook is also used for events being sent via
`step.invoke()`, such that middleware can now appropriately affect the
payloads.

Some notes on design decisions here:

- The `onSendEvent` hook will be run for each individual
`step.invoke()`. This is the same functionality as calling
`step.sendEvent()` multiple times. While the `transformInput()` hook
supports batched `payloads`, this makes it difficult to determine which
payloads have been affected if a user has omitted some from the returned
array.
- Unlike `step.sendEvent()` or `inngest.send()`, we do not allow a
developer to omit the event to be sent by `step.invoke()`

## Checklist
<!-- Tick these items off as you progress. -->
<!-- If an item isn't applicable, ideally please strikeout the item by
wrapping it in "~~"" and suffix it with "N/A My reason for skipping
this." -->
<!-- e.g. "- [ ] ~~Added tests~~ N/A Only touches docs" -->

- [x] Added a [docs PR](https://github.com/inngest/website) that
references this PR
- [x] Added unit/integration tests
- [x] Added changesets if applicable
  • Loading branch information
jpwilliams committed Feb 28, 2024
1 parent 097ecc8 commit f6088e0
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 71 deletions.
5 changes: 5 additions & 0 deletions .changeset/rude-onions-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

Fix `onSendEvent.transformInput()` middleware hooks not running for `step.invoke()` payloads
38 changes: 1 addition & 37 deletions packages/inngest/src/components/InngestFunction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
type InngestExecutionOptions,
} from "@local/components/execution/InngestExecution";
import { _internals } from "@local/components/execution/v1";
import { ServerTiming } from "@local/helpers/ServerTiming";
import { internalEvents } from "@local/helpers/consts";
import {
ErrCode,
Expand All @@ -43,7 +42,7 @@ import {
import { fromPartial } from "@total-typescript/shoehorn";
import { type IsEqual } from "type-fest";
import { assertType } from "type-plus";
import { createClient } from "../test/helpers";
import { createClient, runFnWithStack } from "../test/helpers";

type TestEvents = {
foo: { data: { foo: string } };
Expand Down Expand Up @@ -95,8 +94,6 @@ const opts = (<T extends ClientOptions>(x: T): T => x)({

const inngest = createClient(opts);

const timer = new ServerTiming();

const matchError = (err: any) => {
const serializedErr = serializeError(err);
return expect.objectContaining({
Expand Down Expand Up @@ -223,39 +220,6 @@ describe("runFn", () => {
});

describe("step functions", () => {
const runFnWithStack = (
// eslint-disable-next-line @typescript-eslint/no-explicit-any
fn: InngestFunction.Any,
stepState: InngestExecutionOptions["stepState"],
opts?: {
executionVersion?: ExecutionVersion;
runStep?: string;
onFailure?: boolean;
event?: EventPayload;
stackOrder?: InngestExecutionOptions["stepCompletionOrder"];
disableImmediateExecution?: boolean;
}
) => {
const execution = fn["createExecution"]({
version: opts?.executionVersion ?? PREFERRED_EXECUTION_VERSION,
partialOptions: {
data: fromPartial({
event: opts?.event || { name: "foo", data: {} },
}),
runId: "run",
stepState,
stepCompletionOrder: opts?.stackOrder ?? Object.keys(stepState),
isFailureHandler: Boolean(opts?.onFailure),
requestedRunStep: opts?.runStep,
timer,
disableImmediateExecution: opts?.disableImmediateExecution,
reqArgs: [],
},
});

return execution.start();
};

const getHashDataSpy = () => jest.spyOn(_internals, "hashOp");
const getWarningSpy = () => jest.spyOn(console, "warn");
const getErrorSpy = () => jest.spyOn(console, "error");
Expand Down
250 changes: 250 additions & 0 deletions packages/inngest/src/components/InngestMiddleware.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { ExecutionVersion } from "@local/components/execution/InngestExecution";
import { Inngest } from "@local/components/Inngest";
import { referenceFunction } from "@local/components/InngestFunctionReference";
import { InngestMiddleware } from "@local/components/InngestMiddleware";
import { StepOpCode } from "@local/types";
import { type IsUnknown } from "type-fest";
import { assertType, type IsEqual } from "type-plus";
import { createClient, runFnWithStack, testClientId } from "../test/helpers";

describe("stacking and inference", () => {
describe("onFunctionRun", () => {
Expand Down Expand Up @@ -260,6 +265,251 @@ describe("stacking and inference", () => {
});

describe("onSendEvent", () => {
describe("transformInput", () => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const mockFetch = jest.fn(() =>
Promise.resolve({
status: 200,
json: () => Promise.resolve({ ids: [], status: 200 }),
text: () => Promise.resolve(""),
})
) as any;

beforeEach(() => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
mockFetch.mockClear();
});

describe("step.invoke()", () => {
test("returning a new payload overwrites the original", async () => {
const fn = createClient({
id: testClientId,
middleware: [
new InngestMiddleware({
name: "Test: onSendEvent.transformInput",
init() {
return {
onSendEvent() {
return {
transformInput() {
return {
payloads: [
{
name: "foo",
data: { dataFromMiddleware: true },
},
],
};
},
};
},
};
},
}),
],
}).createFunction(
{ id: "fn_id" },
{ event: "foo" },
async ({ step }) => {
await step.invoke("id", {
function: referenceFunction({
functionId: "some_fn_id",
data: { dataFromStep: true },
}),
});
}
);

const res = await runFnWithStack(
fn,
{},
{ executionVersion: ExecutionVersion.V1 }
);

expect(res).toMatchObject({
steps: [
expect.objectContaining({
op: StepOpCode.InvokeFunction,
opts: expect.objectContaining({
payload: {
data: { dataFromMiddleware: true },
},
}),
}),
],
});
});

test("returning no payload keeps the original", async () => {
const fn = createClient({
id: testClientId,
middleware: [
new InngestMiddleware({
name: "Test: onSendEvent.transformInput",
init() {
return {
onSendEvent() {
return {
transformInput() {
return {
payloads: [],
};
},
};
},
};
},
}),
],
}).createFunction(
{ id: "fn_id" },
{ event: "foo" },
async ({ step }) => {
await step.invoke("id", {
function: referenceFunction({
functionId: "some_fn_id",
}),
data: { dataFromStep: true },
});
}
);

const res = await runFnWithStack(
fn,
{},
{ executionVersion: ExecutionVersion.V1 }
);

expect(res).toMatchObject({
steps: [
expect.objectContaining({
op: StepOpCode.InvokeFunction,
opts: expect.objectContaining({
payload: {
data: { dataFromStep: true },
},
}),
}),
],
});
});

test("returning a partial payload merges with the original, preferring the new value", async () => {
const fn = createClient({
id: testClientId,
middleware: [
new InngestMiddleware({
name: "Test: onSendEvent.transformInput",
init() {
return {
onSendEvent() {
return {
transformInput() {
return {
payloads: [
{
name: "foo",
user: { userFromMiddleware: true },
},
],
};
},
};
},
};
},
}),
],
}).createFunction(
{ id: "fn_id" },
{ event: "foo" },
async ({ step }) => {
await step.invoke("id", {
function: referenceFunction({
functionId: "some_fn_id",
}),
data: { dataFromStep: true },
});
}
);

const res = await runFnWithStack(
fn,
{},
{ executionVersion: ExecutionVersion.V1 }
);

expect(res).toMatchObject({
steps: [
expect.objectContaining({
op: StepOpCode.InvokeFunction,
opts: expect.objectContaining({
payload: {
data: {
dataFromStep: true,
},
user: {
userFromMiddleware: true,
},
},
}),
}),
],
});
});

test("hook runs once per invocation", async () => {
const transformInputSpy = jest.fn(() => undefined);

const onSendEventSpy = jest.fn(() => ({
transformInput: transformInputSpy,
}));

const fn = createClient({
id: testClientId,
middleware: [
new InngestMiddleware({
name: "Test: onSendEvent.transformInput",
init() {
return {
onSendEvent: onSendEventSpy,
};
},
}),
],
}).createFunction(
{ id: "fn_id" },
{ event: "foo" },
async ({ step }) => {
await Promise.all([
step.invoke("id", {
function: referenceFunction({
functionId: "some_fn_id",
data: { dataFromStep: true },
}),
}),
step.invoke("id", {
function: referenceFunction({
functionId: "some_fn_id",
data: { dataFromStep: true },
}),
}),
]);
}
);

await runFnWithStack(
fn,
{},
{ executionVersion: ExecutionVersion.V1 }
);

expect(onSendEventSpy).toHaveBeenCalledTimes(2);
expect(transformInputSpy).toHaveBeenCalledTimes(2);
});
});
});

describe("transformOutput", () => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const mockFetch = jest.fn(() =>
Expand Down
Loading

0 comments on commit f6088e0

Please sign in to comment.