diff --git a/src/constants.ts b/src/constants.ts index c7eb6fa..401bd97 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -17,7 +17,7 @@ export const NO_CONCURRENCY = 1; export const NOT_SET = "not-set"; export const DEFAULT_RETRIES = 3; -export const VERSION = "v0.2.18"; +export const VERSION = "v0.2.20"; export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`; export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const; diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 2ba3538..2c69eeb 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -337,6 +337,9 @@ describe("context tests", () => { method: "POST", url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, token, + headers: { + "content-type": "application/json", + }, body: [ { body: '"request-body"', @@ -439,6 +442,94 @@ describe("context tests", () => { }, }); }); + test("should send context.call without parsing body if stringifyBody is false", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + await mockQStashServer({ + execute: () => { + const throws = () => + context.call("my-step", { + url, + body, + headers: { "my-header": "my-value" }, + method: "PATCH", + stringifyBody: false, + }); + expect(throws).toThrowError("Aborting workflow after executing step 'my-step'."); + }, + responseFields: { + status: 200, + body: "msgId", + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, + token, + body: [ + { + body: "request-body", + destination: url, + headers: { + "upstash-workflow-sdk-version": "1", + "content-type": "application/json", + "upstash-callback": WORKFLOW_ENDPOINT, + "upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger", + "upstash-callback-forward-upstash-workflow-callback": "true", + "upstash-callback-forward-upstash-workflow-concurrent": "1", + "upstash-callback-forward-upstash-workflow-contenttype": "application/json", + "upstash-callback-forward-upstash-workflow-stepid": "1", + "upstash-callback-forward-upstash-workflow-stepname": "my-step", + "upstash-callback-forward-upstash-workflow-steptype": "Call", + "upstash-callback-workflow-calltype": "fromCallback", + "upstash-callback-workflow-init": "false", + "upstash-callback-workflow-runid": "wfr-id", + "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, + "upstash-feature-set": "WF_NoDelete,InitialBody", + "upstash-forward-my-header": "my-value", + "upstash-method": "PATCH", + "upstash-retries": "0", + "upstash-workflow-calltype": "toCallback", + "upstash-workflow-init": "false", + "upstash-workflow-runid": "wfr-id", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + }, + ], + }, + }); + }); + + test("should throw error if stringifyBody is false and body is object", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + + const throws = () => + context.call("my-step", { + url, + body: { foo: "bar" }, + headers: { "my-header": "my-value" }, + method: "PATCH", + // @ts-expect-error testing error case + stringifyBody: false, + }); + expect(throws).toThrowError( + new WorkflowError( + `When stringifyBody is false, body must be a string. Please check the body type of your call step.` + ) + ); + }); }); test("cancel should throw abort with cleanup: true", async () => { diff --git a/src/context/context.ts b/src/context/context.ts index 4d5fca8..c4a02ce 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -385,11 +385,11 @@ export class WorkflowContext { | CallSettings | (LazyInvokeStepParams & Pick) ): Promise> { - let callStep: LazyCallStep; + let callStep: LazyCallStep; if ("workflow" in settings) { const url = getNewUrlFromWorkflowId(this.url, settings.workflow.workflowId); - callStep = new LazyCallStep<{ workflowRunId: string }>( + callStep = new LazyCallStep<{ workflowRunId: string }, typeof settings.body>( stepName, url, "POST", @@ -398,7 +398,8 @@ export class WorkflowContext { settings.retries || 0, settings.retryDelay, settings.timeout, - settings.flowControl ?? settings.workflow.options.flowControl + settings.flowControl ?? settings.workflow.options.flowControl, + settings.stringifyBody ?? true ); } else { const { @@ -410,9 +411,10 @@ export class WorkflowContext { retryDelay, timeout, flowControl, + stringifyBody = true, } = settings; - callStep = new LazyCallStep( + callStep = new LazyCallStep( stepName, url, method, @@ -421,7 +423,8 @@ export class WorkflowContext { retries, retryDelay, timeout, - flowControl + flowControl, + stringifyBody ); } @@ -509,7 +512,7 @@ export class WorkflowContext { stepName: string, settings: LazyInvokeStepParams ) { - return await this.addStep(new LazyInvokeStep(stepName, settings)); + return await this.addStep(new LazyInvokeStep(stepName, settings)); } /** diff --git a/src/context/steps.test.ts b/src/context/steps.test.ts index 37e6d0c..195245c 100644 --- a/src/context/steps.test.ts +++ b/src/context/steps.test.ts @@ -159,7 +159,8 @@ describe("test steps", () => { 14, "1000", 30, - flowControl + flowControl, + true ); test("should set correct fields", () => { diff --git a/src/context/steps.ts b/src/context/steps.ts index a4154ba..535a248 100644 --- a/src/context/steps.ts +++ b/src/context/steps.ts @@ -333,6 +333,8 @@ export class LazyCallStep extends BaseLazySt public readonly retryDelay?: string; public readonly timeout?: number | Duration; public readonly flowControl?: FlowControl; + private readonly stringifyBody: boolean; + stepType: StepType = "Call"; allowUndefinedOut = false; @@ -345,7 +347,8 @@ export class LazyCallStep extends BaseLazySt retries: number, retryDelay: string | undefined, timeout: number | Duration | undefined, - flowControl: FlowControl | undefined + flowControl: FlowControl | undefined, + stringifyBody: boolean ) { super(stepName); this.url = url; @@ -356,6 +359,7 @@ export class LazyCallStep extends BaseLazySt this.retryDelay = retryDelay; this.timeout = timeout; this.flowControl = flowControl; + this.stringifyBody = stringifyBody; } public getPlanStep(concurrent: number, targetStep: number): Step { @@ -495,10 +499,23 @@ export class LazyCallStep extends BaseLazySt } async submitStep({ context, headers }: SubmitStepParams) { + let callBody: string; + if (this.stringifyBody) { + callBody = JSON.stringify(this.body); + } else { + if (typeof this.body === "string") { + callBody = this.body; + } else { + throw new WorkflowError( + "When stringifyBody is false, body must be a string. Please check the body type of your call step." + ); + } + } + return (await context.qstashClient.batch([ { headers, - body: JSON.stringify(this.body), + body: callBody, method: this.method, url: this.url, retries: DEFAULT_RETRIES === this.retries ? undefined : this.retries, @@ -650,7 +667,7 @@ export class LazyInvokeStep extends BaseLazy stepType: StepType = "Invoke"; params: RequiredExceptFields< LazyInvokeStepParams, - "retries" | "flowControl" | "retryDelay" + "retries" | "flowControl" | "retryDelay" | "body" >; protected allowUndefinedOut = false; /** @@ -668,6 +685,7 @@ export class LazyInvokeStep extends BaseLazy retries, retryDelay, flowControl, + stringifyBody = true, }: LazyInvokeStepParams ) { super(stepName); @@ -679,6 +697,7 @@ export class LazyInvokeStep extends BaseLazy retries, retryDelay, flowControl, + stringifyBody, }; const { workflowId } = workflow; @@ -740,8 +759,21 @@ export class LazyInvokeStep extends BaseLazy }); invokerHeaders["Upstash-Workflow-Runid"] = context.workflowRunId; + let invokeBody: string; + if (this.params.stringifyBody) { + invokeBody = JSON.stringify(this.params.body); + } else { + if (typeof this.params.body === "string") { + invokeBody = this.params.body; + } else { + throw new WorkflowError( + "When stringifyBody is false, body must be a string. Please check the body type of your invoke step." + ); + } + } + const request: InvokeWorkflowRequest = { - body: JSON.stringify(this.params.body), + body: invokeBody, headers: Object.fromEntries( Object.entries(invokerHeaders).map((pairs) => [pairs[0], [pairs[1]]]) ), diff --git a/src/serve/serve-many.test.ts b/src/serve/serve-many.test.ts index 317933f..860cdb3 100644 --- a/src/serve/serve-many.test.ts +++ b/src/serve/serve-many.test.ts @@ -125,11 +125,39 @@ describe("serveMany", () => { } ); + const workflowFour = createWorkflow(async (context) => { + await context.invoke("invoke workflow two", { + workflow: workflowTwo, + body: "hello world", + stringifyBody: false, + }); + }); + + const workflowFive = createWorkflow(async (context) => { + await context.invoke("invoke workflow two", { + workflow: workflowTwo, + body: "hello world", + stringifyBody: true, + }); + }); + + const workflowSix = createWorkflow(async (context) => { + await context.invoke("invoke workflow two", { + workflow: workflowTwo, + stringifyBody: false, + // @ts-expect-error object body isn't accepted because stringifyBody is false + body: { hello: "world" }, + }); + }); + const { POST: handler } = serveMany( { "workflow-one": workflowOne, "workflow-two": workflowTwo, "workflow-three": workflowThree, + "workflow-four": workflowFour, + "workflow-five": workflowFive, + "workflow-six": workflowSix, }, { qstashClient, @@ -305,6 +333,119 @@ describe("serveMany", () => { }, }); }); + + test("should not stringify body if stringifyBody: false", async () => { + const request = getRequest( + `${WORKFLOW_ENDPOINT}/workflow-four`, + "wfr_id", + "initial-payload", + [] + ); + + await mockQStashServer({ + execute: async () => { + const response = await handler(request); + expect(response.status).toBe(200); + }, + responseFields: { body: "msgId", status: 200 }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}/workflow-two`, + token, + body: { + body: "hello world", + headers: { + "Upstash-Feature-Set": ["LazyFetch,InitialBody,WF_DetectTrigger"], + "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], + "Upstash-Telemetry-Framework": ["nextjs"], + "Upstash-Telemetry-Runtime": [expect.any(String)], + "Upstash-Telemetry-Sdk": [expect.stringMatching(/^@upstash\/workflow@v0\.2\./)], + "Upstash-Workflow-Init": ["false"], + "Upstash-Workflow-RunId": ["wfr_id"], + "Upstash-Workflow-Runid": ["wfr_id"], + "Upstash-Workflow-Sdk-Version": ["1"], + "Upstash-Workflow-Url": ["https://requestcatcher.com/api/workflow-four"], + "content-type": ["application/json"], + }, + workflowRunId: expect.any(String), + workflowUrl: "https://requestcatcher.com/api/workflow-four", + step: { + stepId: 1, + concurrent: 1, + stepName: "invoke workflow two", + stepType: "Invoke", + }, + }, + }, + }); + }); + + test("should stringify body if stringifyBody: true", async () => { + const request = getRequest( + `${WORKFLOW_ENDPOINT}/workflow-five`, + "wfr_id", + "initial-payload", + [] + ); + + await mockQStashServer({ + execute: async () => { + const response = await handler(request); + expect(response.status).toBe(200); + }, + responseFields: { body: "msgId", status: 200 }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}/workflow-two`, + token, + body: { + body: '"hello world"', + headers: { + "Upstash-Feature-Set": ["LazyFetch,InitialBody,WF_DetectTrigger"], + "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], + "Upstash-Telemetry-Framework": ["nextjs"], + "Upstash-Telemetry-Runtime": [expect.any(String)], + "Upstash-Telemetry-Sdk": [expect.stringMatching(/^@upstash\/workflow@v0\.2\./)], + "Upstash-Workflow-Init": ["false"], + "Upstash-Workflow-RunId": ["wfr_id"], + "Upstash-Workflow-Runid": ["wfr_id"], + "Upstash-Workflow-Sdk-Version": ["1"], + "Upstash-Workflow-Url": ["https://requestcatcher.com/api/workflow-five"], + "content-type": ["application/json"], + }, + workflowRunId: expect.any(String), + workflowUrl: "https://requestcatcher.com/api/workflow-five", + step: { + stepId: 1, + concurrent: 1, + stepName: "invoke workflow two", + stepType: "Invoke", + }, + }, + }, + }); + }); + + test("should get an error if body isn't a string but stringifyBody is false", async () => { + const request = getRequest( + `${WORKFLOW_ENDPOINT}/workflow-six`, + "wfr_id", + "initial-payload", + [] + ); + + await mockQStashServer({ + execute: async () => { + const response = await handler(request); + expect(response.status).toBe(500); + expect((await response.json()).message).toBe( + "When stringifyBody is false, body must be a string. Please check the body type of your invoke step." + ); + }, + responseFields: { body: "msgId", status: 200 }, + receivesRequest: false, + }); + }); }); describe("getNewUrlFromWorkflowId", () => { diff --git a/src/types.ts b/src/types.ts index 8e93951..4a2a797 100644 --- a/src/types.ts +++ b/src/types.ts @@ -443,15 +443,30 @@ export interface WaitEventOptions { timeout?: number | Duration; } +export type StringifyBody = TBody extends string ? boolean : true; + export type CallSettings = { url: string; method?: HTTPMethods; + /** + * Request body. + * + * By default, the body is stringified with `JSON.stringify`. If you want + * to send a string body without stringifying it, you need to set + * `stringifyBody` to false. + */ body?: TBody; headers?: Record; retries?: number; retryDelay?: string; timeout?: Duration | number; flowControl?: FlowControl; + /** + * Whether the body field should be stringified when making the request. + * + * @default true + */ + stringifyBody?: StringifyBody; }; export type HeaderParams = { @@ -571,9 +586,12 @@ export type LazyInvokeStepParams = { InvokableWorkflow, "routeFunction" | "workflowId" | "options" >; - body: TInitiaPayload; // TODO make optional + body: TInitiaPayload; // tried to make this optional but didn't work so nicely workflowRunId?: string; -} & Pick; +} & Pick< + CallSettings, + "retries" | "headers" | "flowControl" | "retryDelay" | "stringifyBody" +>; export type InvokeStepResponse = { body: TBody; diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 5d99858..bc745c7 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -622,7 +622,8 @@ describe("Workflow Requests", () => { rate: 5, parallelism: 6, period: 30, - } + }, + true ); const { headers } = lazyStep.getHeaders({ context: new WorkflowContext({