Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const NO_CONCURRENCY = 1;
export const NOT_SET = "not-set";
export const DEFAULT_RETRIES = 3;

export const VERSION = "v0.2.18";
export const VERSION = "v0.2.20";
export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`;

export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const;
Expand Down
91 changes: 91 additions & 0 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ describe("context tests", () => {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
token,
headers: {
"content-type": "application/json",
},
body: [
{
body: '"request-body"',
Expand Down Expand Up @@ -439,6 +442,94 @@ describe("context tests", () => {
},
});
});
test("should send context.call without parsing body if stringifyBody is false", async () => {
const context = new WorkflowContext({
qstashClient,
initialPayload: "my-payload",
steps: [],
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
});
await mockQStashServer({
execute: () => {
const throws = () =>
context.call("my-step", {
url,
body,
headers: { "my-header": "my-value" },
method: "PATCH",
stringifyBody: false,
});
expect(throws).toThrowError("Aborting workflow after executing step 'my-step'.");
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
token,
body: [
{
body: "request-body",
destination: url,
headers: {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": WORKFLOW_ENDPOINT,
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
"upstash-callback-forward-upstash-workflow-stepid": "1",
"upstash-callback-forward-upstash-workflow-stepname": "my-step",
"upstash-callback-forward-upstash-workflow-steptype": "Call",
"upstash-callback-workflow-calltype": "fromCallback",
"upstash-callback-workflow-init": "false",
"upstash-callback-workflow-runid": "wfr-id",
"upstash-callback-workflow-url": WORKFLOW_ENDPOINT,
"upstash-feature-set": "WF_NoDelete,InitialBody",
"upstash-forward-my-header": "my-value",
"upstash-method": "PATCH",
"upstash-retries": "0",
"upstash-workflow-calltype": "toCallback",
"upstash-workflow-init": "false",
"upstash-workflow-runid": "wfr-id",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
},
},
],
},
});
});

test("should throw error if stringifyBody is false and body is object", async () => {
const context = new WorkflowContext({
qstashClient,
initialPayload: "my-payload",
steps: [],
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
});

const throws = () =>
context.call("my-step", {
url,
body: { foo: "bar" },
headers: { "my-header": "my-value" },
method: "PATCH",
// @ts-expect-error testing error case
stringifyBody: false,
});
expect(throws).toThrowError(
new WorkflowError(
`When stringifyBody is false, body must be a string. Please check the body type of your call step.`
)
);
});
});

test("cancel should throw abort with cleanup: true", async () => {
Expand Down
15 changes: 9 additions & 6 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,11 @@ export class WorkflowContext<TInitialPayload = unknown> {
| CallSettings<TBody>
| (LazyInvokeStepParams<TBody, unknown> & Pick<CallSettings, "timeout">)
): Promise<CallResponse<TResult | { workflowRunId: string }>> {
let callStep: LazyCallStep<TResult | { workflowRunId: string }>;
let callStep: LazyCallStep<TResult | { workflowRunId: string }, typeof settings.body>;
if ("workflow" in settings) {
const url = getNewUrlFromWorkflowId(this.url, settings.workflow.workflowId);

callStep = new LazyCallStep<{ workflowRunId: string }>(
callStep = new LazyCallStep<{ workflowRunId: string }, typeof settings.body>(
stepName,
url,
"POST",
Expand All @@ -398,7 +398,8 @@ export class WorkflowContext<TInitialPayload = unknown> {
settings.retries || 0,
settings.retryDelay,
settings.timeout,
settings.flowControl ?? settings.workflow.options.flowControl
settings.flowControl ?? settings.workflow.options.flowControl,
settings.stringifyBody ?? true
);
} else {
const {
Expand All @@ -410,9 +411,10 @@ export class WorkflowContext<TInitialPayload = unknown> {
retryDelay,
timeout,
flowControl,
stringifyBody = true,
} = settings;

callStep = new LazyCallStep<TResult>(
callStep = new LazyCallStep<TResult, typeof body>(
stepName,
url,
method,
Expand All @@ -421,7 +423,8 @@ export class WorkflowContext<TInitialPayload = unknown> {
retries,
retryDelay,
timeout,
flowControl
flowControl,
stringifyBody
);
}

Expand Down Expand Up @@ -509,7 +512,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
stepName: string,
settings: LazyInvokeStepParams<TInitialPayload, TResult>
) {
return await this.addStep(new LazyInvokeStep(stepName, settings));
return await this.addStep(new LazyInvokeStep<TResult, TInitialPayload>(stepName, settings));
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/context/steps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ describe("test steps", () => {
14,
"1000",
30,
flowControl
flowControl,
true
);

test("should set correct fields", () => {
Expand Down
40 changes: 36 additions & 4 deletions src/context/steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ export class LazyCallStep<TResult = unknown, TBody = unknown> extends BaseLazySt
public readonly retryDelay?: string;
public readonly timeout?: number | Duration;
public readonly flowControl?: FlowControl;
private readonly stringifyBody: boolean;

stepType: StepType = "Call";
allowUndefinedOut = false;

Expand All @@ -345,7 +347,8 @@ export class LazyCallStep<TResult = unknown, TBody = unknown> extends BaseLazySt
retries: number,
retryDelay: string | undefined,
timeout: number | Duration | undefined,
flowControl: FlowControl | undefined
flowControl: FlowControl | undefined,
stringifyBody: boolean
) {
super(stepName);
this.url = url;
Expand All @@ -356,6 +359,7 @@ export class LazyCallStep<TResult = unknown, TBody = unknown> extends BaseLazySt
this.retryDelay = retryDelay;
this.timeout = timeout;
this.flowControl = flowControl;
this.stringifyBody = stringifyBody;
}

public getPlanStep(concurrent: number, targetStep: number): Step<undefined> {
Expand Down Expand Up @@ -495,10 +499,23 @@ export class LazyCallStep<TResult = unknown, TBody = unknown> extends BaseLazySt
}

async submitStep({ context, headers }: SubmitStepParams) {
let callBody: string;
if (this.stringifyBody) {
callBody = JSON.stringify(this.body);
} else {
if (typeof this.body === "string") {
callBody = this.body;
} else {
throw new WorkflowError(
"When stringifyBody is false, body must be a string. Please check the body type of your call step."
);
}
}

return (await context.qstashClient.batch([
{
headers,
body: JSON.stringify(this.body),
body: callBody,
method: this.method,
url: this.url,
retries: DEFAULT_RETRIES === this.retries ? undefined : this.retries,
Expand Down Expand Up @@ -650,7 +667,7 @@ export class LazyInvokeStep<TResult = unknown, TBody = unknown> extends BaseLazy
stepType: StepType = "Invoke";
params: RequiredExceptFields<
LazyInvokeStepParams<TBody, TResult>,
"retries" | "flowControl" | "retryDelay"
"retries" | "flowControl" | "retryDelay" | "body"
>;
protected allowUndefinedOut = false;
/**
Expand All @@ -668,6 +685,7 @@ export class LazyInvokeStep<TResult = unknown, TBody = unknown> extends BaseLazy
retries,
retryDelay,
flowControl,
stringifyBody = true,
}: LazyInvokeStepParams<TBody, TResult>
) {
super(stepName);
Expand All @@ -679,6 +697,7 @@ export class LazyInvokeStep<TResult = unknown, TBody = unknown> extends BaseLazy
retries,
retryDelay,
flowControl,
stringifyBody,
};

const { workflowId } = workflow;
Expand Down Expand Up @@ -740,8 +759,21 @@ export class LazyInvokeStep<TResult = unknown, TBody = unknown> extends BaseLazy
});
invokerHeaders["Upstash-Workflow-Runid"] = context.workflowRunId;

let invokeBody: string;
if (this.params.stringifyBody) {
invokeBody = JSON.stringify(this.params.body);
} else {
if (typeof this.params.body === "string") {
invokeBody = this.params.body;
} else {
throw new WorkflowError(
"When stringifyBody is false, body must be a string. Please check the body type of your invoke step."
);
}
}

const request: InvokeWorkflowRequest = {
body: JSON.stringify(this.params.body),
body: invokeBody,
headers: Object.fromEntries(
Object.entries(invokerHeaders).map((pairs) => [pairs[0], [pairs[1]]])
),
Expand Down
Loading