Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/ci/app/ci/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ export const TEST_TIMEOUT_DURATION = (
export const CI_RANDOM_ID_HEADER = "Ci-Test-Id"
export const CI_ROUTE_HEADER = `Ci-Test-Route`

/**
* a label header set in the SDK itself to set context.label via client.trigger
*/
export const WORKFLOW_LABEL_HEADER = "upstash-label"

export const BASE_URL = process.env.VERCEL_URL
? `https://${process.env.VERCEL_URL}`
: process.env.UPSTASH_WORKFLOW_URL
Expand Down
14 changes: 11 additions & 3 deletions examples/ci/app/test-routes/failureFunction/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { serve } from "@upstash/workflow/nextjs";
import { BASE_URL } from "app/ci/constants";
import { BASE_URL, WORKFLOW_LABEL_HEADER } from "app/ci/constants";
import { testServe, expect } from "app/ci/utils";
import { saveResult } from "app/ci/upstash/redis"
import { WorkflowContext } from "@upstash/workflow";
Expand All @@ -10,7 +10,7 @@ const authHeaderValue = `Bearer super-secret-token`

const errorMessage = `my-error`
const payload = undefined

const label = "my-label"

export const { POST, GET } = testServe(
serve<string>(
Expand All @@ -20,6 +20,8 @@ export const { POST, GET } = testServe(
expect(typeof input, typeof payload);
expect(input, payload);
expect(context.headers.get(header)!, headerValue)
expect(context.headers.get(WORKFLOW_LABEL_HEADER)!, label)
expect(context.label, label)

await context.run("step1", () => {
throw new Error(errorMessage);
Expand All @@ -33,6 +35,8 @@ export const { POST, GET } = testServe(
expect(context.requestPayload, payload);
expect(typeof context.requestPayload, typeof payload);
expect(context.headers.get("authentication")!, authHeaderValue);
expect(context.headers.get(WORKFLOW_LABEL_HEADER)!, label)
expect(context.label, label)

await saveResult(
context as WorkflowContext,
Expand All @@ -46,7 +50,11 @@ export const { POST, GET } = testServe(
payload,
headers: {
[ header ]: headerValue,
"authentication": authHeaderValue
"authentication": authHeaderValue,
/**
* client trigger sets this header
*/
[`upstash-forward-${WORKFLOW_LABEL_HEADER}`]: label
}
}
)
11 changes: 9 additions & 2 deletions src/client/dlq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils";
import { Client } from ".";
import { nanoid } from "../utils";

const WORKFLOW_LABEL = "some-label";
const MOCK_DLQ_MESSAGES = [
{
dlqId: `dlq-${nanoid()}`,
Expand All @@ -25,6 +26,7 @@ const MOCK_DLQ_MESSAGES = [
responseHeaders: { "content-type": ["application/json"] },
},
failureCallback: "https://example.com/failure-callback",
label: WORKFLOW_LABEL,
},
{
dlqId: `dlq-${nanoid()}`,
Expand Down Expand Up @@ -75,8 +77,13 @@ describe("DLQ", () => {

await mockQStashServer({
execute: async () => {
const result = await client.dlq.list({ cursor, count });
const result = await client.dlq.list({
cursor,
count,
filter: { label: WORKFLOW_LABEL },
});
expect(result.messages).toEqual([MOCK_DLQ_MESSAGES[0]]);
expect(result.messages[0].label).toBe(WORKFLOW_LABEL);
expect(result.cursor).toBe(nextCursor);
},
responseFields: {
Expand All @@ -85,7 +92,7 @@ describe("DLQ", () => {
},
receivesRequest: {
method: "GET",
url: `${MOCK_QSTASH_SERVER_URL}/v2/dlq?cursor=${cursor}&count=${count}&source=workflow`,
url: `${MOCK_QSTASH_SERVER_URL}/v2/dlq?cursor=${cursor}&count=${count}&label=${WORKFLOW_LABEL}&source=workflow`,
token,
},
});
Expand Down
6 changes: 6 additions & 0 deletions src/client/dlq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type DLQFilterOptions = Pick<
workflowRunId?: string;
workflowCreatedAt?: string;
failureFunctionState?: FailureCallbackInfo["state"];
label?: string;
};

type FailureCallbackInfo = {
Expand Down Expand Up @@ -50,6 +51,10 @@ type DLQMessage = {
* status of the failure callback
*/
failureCallbackInfo?: FailureCallbackInfo;
/**
* label passed when triggering workflow
*/
label?: string;
};

type PublicDLQMessage = Pick<
Expand All @@ -69,6 +74,7 @@ type PublicDLQMessage = Pick<
| "dlqId"
| "failureCallback"
| "failureCallbackInfo"
| "label"
>;

export class DLQ {
Expand Down
7 changes: 5 additions & 2 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ describe("workflow client", () => {
retries: 15,
retryDelay: "1000",
delay: 1,
label: "test-label",
});
},
responseFields: {
Expand All @@ -237,6 +238,8 @@ describe("workflow client", () => {
"upstash-delay": "1s",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-forward-upstash-label": "test-label",
"upstash-label": "test-label",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
Expand Down Expand Up @@ -451,7 +454,7 @@ describe("workflow client", () => {
});
});

test(
test.skip(
"should get logs - live",
async () => {
const qstashClient = new QStashClient({
Expand Down Expand Up @@ -529,7 +532,7 @@ describe("workflow client", () => {
],
});
},
{ timeout: 30_000, interval: 100 }
{ timeout: 1000, interval: 100 }
);

await liveClient.cancel({ ids: workflowRunId });
Expand Down
12 changes: 10 additions & 2 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { triggerFirstInvocation } from "../workflow-requests";
import { WorkflowContext } from "../context";
import { DLQ } from "./dlq";
import { TriggerOptions, WorkflowRunLog, WorkflowRunLogs } from "./types";
import { SDK_TELEMETRY } from "../constants";
import { SDK_TELEMETRY, WORKFLOW_LABEL_HEADER } from "../constants";

type ClientConfig = ConstructorParameters<typeof QStashClient>[0];

Expand Down Expand Up @@ -246,7 +246,10 @@ export class Client {
const context = new WorkflowContext({
qstashClient: this.client,
// @ts-expect-error header type mismatch because of bun
headers: new Headers((option.headers ?? {})),
headers: new Headers({
...(option.headers ?? {}),
...(option.label ? { [WORKFLOW_LABEL_HEADER]: option.label } : {}),
}),
initialPayload: option.body,
steps: [],
url: option.url,
Expand All @@ -256,6 +259,7 @@ export class Client {
telemetry: { sdk: SDK_TELEMETRY },
flowControl: option.flowControl,
failureUrl,
label: option.label,
});

return {
Expand Down Expand Up @@ -313,6 +317,7 @@ export class Client {
state?: WorkflowRunLog["workflowState"];
workflowUrl?: WorkflowRunLog["workflowUrl"];
workflowCreatedAt?: WorkflowRunLog["workflowRunCreatedAt"];
label?: WorkflowRunLog["label"];
}): Promise<WorkflowRunLogs> {
const { workflowRunId, cursor, count, state, workflowUrl, workflowCreatedAt } = params ?? {};

Expand All @@ -335,6 +340,9 @@ export class Client {
if (workflowCreatedAt) {
urlParams.append("workflowCreatedAt", workflowCreatedAt.toString());
}
if (params?.label) {
urlParams.append("label", params.label);
}

const result = await this.client.http.request<WorkflowRunLogs>({
path: ["v2", "workflows", `events?${urlParams.toString()}`],
Expand Down
10 changes: 10 additions & 0 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ export type WorkflowRunLog = {
* If the workflow run has failed, id of the run in DLQ
*/
dlqId?: string;
/**
* Label of the workflow run
*/
label?: string;
};

export type WorkflowRunLogs = {
Expand Down Expand Up @@ -399,6 +403,12 @@ export type TriggerOptions = {
* Delay to apply before triggering the workflow.
*/
delay?: PublishRequest["delay"];
/**
* Label to apply to the workflow run.
*
* Can be used to filter the workflow run logs.
*/
label?: string;
} & (
| {
/**
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
export const WORKFLOW_FAILURE_HEADER = "Upstash-Workflow-Is-Failure";
export const WORKFLOW_FEATURE_HEADER = "Upstash-Feature-Set";
export const WORKFLOW_INVOKE_COUNT_HEADER = "Upstash-Workflow-Invoke-Count";
export const WORKFLOW_LABEL_HEADER = "Upstash-Label";

export const WORKFLOW_PROTOCOL_VERSION = "1";
export const WORKFLOW_PROTOCOL_VERSION_HEADER = "Upstash-Workflow-Sdk-Version";
Expand Down
14 changes: 14 additions & 0 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ import {
import { upstash } from "@upstash/qstash";

describe("context tests", () => {
test("should set label in context and headers", () => {
const label = "my-label";
const context = new WorkflowContext({
qstashClient,
initialPayload: "my-payload",
steps: [],
url: WORKFLOW_ENDPOINT,
headers: new Headers({ "upstash-label": label }) as Headers,
workflowRunId: "wfr-id",
label,
});
expect(context.label).toBe(label);
expect(context.headers.get("upstash-label")).toBe(label);
});
const token = nanoid();
const qstashClient = new Client({
baseUrl: MOCK_QSTASH_SERVER_URL,
Expand Down
20 changes: 20 additions & 0 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,23 @@ export class WorkflowContext<TInitialPayload = unknown> {
*/
public readonly flowControl?: FlowControl;

/**
* Label to apply to the workflow run.
*
* Can be used to filter the workflow run logs.
*
* Can be set by passing a `label` parameter when triggering the workflow
* with `client.trigger`:
*
* ```ts
* await client.trigger({
* url: "https://workflow-endpoint.com",
* label: "my-label"
* });
* ```
*/
public readonly label?: string;

constructor({
qstashClient,
workflowRunId,
Expand All @@ -208,6 +225,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
telemetry,
invokeCount,
flowControl,
label,
}: {
qstashClient: WorkflowClient;
workflowRunId: string;
Expand All @@ -223,6 +241,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
telemetry?: Telemetry;
invokeCount?: number;
flowControl?: FlowControl;
label?: string;
}) {
this.qstashClient = qstashClient;
this.workflowRunId = workflowRunId;
Expand All @@ -235,6 +254,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
this.retries = retries ?? DEFAULT_RETRIES;
this.retryDelay = retryDelay;
this.flowControl = flowControl;
this.label = label;

this.executor = new AutoExecutor(this, this.steps, telemetry, invokeCount, debug);
}
Expand Down
2 changes: 1 addition & 1 deletion src/context/steps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ export class LazyCallStep<TResult = unknown, TBody = unknown> extends BaseLazySt
if (this.retryDelay) {
headers["Upstash-Retry-Delay"] = this.retryDelay;
}

// WF_DetectTrigger is not included because these requests are going to external endpoints
headers[WORKFLOW_FEATURE_HEADER] = "WF_NoDelete,InitialBody";

Expand Down
1 change: 1 addition & 0 deletions src/serve/authorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export class DisabledWorkflowContext<
retries: context.retries,
retryDelay: context.retryDelay,
flowControl: context.flowControl,
label: context.label,
});

try {
Expand Down
3 changes: 3 additions & 0 deletions src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { makeCancelRequest } from "../client/utils";
import {
SDK_TELEMETRY,
WORKFLOW_INVOKE_COUNT_HEADER,
WORKFLOW_LABEL_HEADER,
WORKFLOW_PROTOCOL_VERSION,
WORKFLOW_PROTOCOL_VERSION_HEADER,
} from "../constants";
Expand Down Expand Up @@ -152,6 +153,7 @@ export const serveBase = <
}

const invokeCount = Number(request.headers.get(WORKFLOW_INVOKE_COUNT_HEADER) ?? "0");
const label = request.headers.get(WORKFLOW_LABEL_HEADER) ?? undefined;

// create context
const workflowContext = new WorkflowContext<TInitialPayload>({
Expand All @@ -169,6 +171,7 @@ export const serveBase = <
telemetry,
invokeCount,
flowControl,
label,
});

// attempt running routeFunction until the first step
Expand Down
Loading
Loading