Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0614472
feat: implement work item filters for GetWorkItems RPC
YunchuWang Mar 12, 2026
5784c24
fix: address PR review comments
YunchuWang Mar 12, 2026
a92d643
test: add e2e tests and example for work item filters
YunchuWang Mar 12, 2026
6616759
refactor: remove redundant explicit filter e2e test
YunchuWang Mar 12, 2026
3ef9afe
test: add e2e tests for work item filters with versions
YunchuWang Mar 12, 2026
9d4cbbd
test: unskip filter enforcement tests and add version mismatch test
YunchuWang Mar 12, 2026
5bebc64
test: add e2e test for registered-but-excluded orchestration
YunchuWang Mar 12, 2026
60bdc63
refactor: remove redundant auto-generated filter e2e tests
YunchuWang Mar 12, 2026
b907a6c
test: skip filter enforcement e2e tests for CI (emulator limitation)
YunchuWang Mar 12, 2026
d550e55
fix: normalize entity names to lowercase in gRPC filter conversion
YunchuWang Mar 12, 2026
70b1c0a
refactor: remove redundant disabled-filters example scenario
YunchuWang Mar 12, 2026
7beefcf
chore: sync proto with upstream durabletask-protobuf
YunchuWang Mar 13, 2026
b75ea07
fix: disable auto-generated filters in versioning FailureStrategy tests
YunchuWang Mar 13, 2026
9f6cd8b
fix: remove flaky slowActivityCounter assertion from whenAll fail-fas…
YunchuWang Mar 13, 2026
4b30d19
fix: mark _buildGetWorkItemsRequest as private per review feedback
YunchuWang Mar 18, 2026
ed22a5d
fix: update test comment to explain why 2s wait is unnecessary
YunchuWang Mar 18, 2026
38d855b
fix: change work item filters from auto opt-in to explicit opt-in
YunchuWang Mar 18, 2026
1f07185
fix: remove leftover null workItemFilters test from old opt-out seman…
YunchuWang Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions examples/work-item-filters/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* This example demonstrates Work Item Filters for Durable Task workers.
*
* Work Item Filters allow a worker to tell the sidecar which orchestrations,
* activities, and entities it is configured to handle. The sidecar then only
* dispatches matching work items to that worker, enabling efficient routing.
*
* Key concepts demonstrated:
* - Auto-generated filters from the worker's registry (default behavior)
* - Explicit filters via useWorkItemFilters()
*
* This example runs against:
* DTS Emulator:
* docker run --name dts-emulator -i -p 8080:8080 -d --rm mcr.microsoft.com/dts/dts-emulator:latest
* Then:
* npx ts-node --swc examples/work-item-filters/index.ts
*/

import {
ActivityContext,
OrchestrationContext,
TOrchestrator,
WorkItemFilters,
} from "@microsoft/durabletask-js";
import {
DurableTaskAzureManagedClientBuilder,
DurableTaskAzureManagedWorkerBuilder,
} from "@microsoft/durabletask-js-azuremanaged";

const endpoint = process.env.ENDPOINT || "localhost:8080";
const taskHub = process.env.TASKHUB || "default";

// ============================================================================
// Step 1: Define activities and orchestrators
// ============================================================================

const greet = async (_: ActivityContext, name: string): Promise<string> => {
return `Hello, ${name}!`;
};

const add = async (_: ActivityContext, input: { a: number; b: number }): Promise<number> => {
return input.a + input.b;
};

const greetingOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
name: string,
): Promise<any> {
const result = yield ctx.callActivity(greet, name);
return result;
};

const mathOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
input: { a: number; b: number },
): Promise<any> {
const result = yield ctx.callActivity(add, input);
return result;
};

// ============================================================================
// Step 2: Demonstrate different work item filter configurations
// ============================================================================

async function runWithAutoGeneratedFilters() {
console.log("\n=== Scenario 1: Auto-Generated Filters (Default) ===");
console.log("The worker auto-generates filters from its registered orchestrators and activities.");
console.log("Only matching work items will be dispatched to this worker.\n");

const client = new DurableTaskAzureManagedClientBuilder()
.endpoint(endpoint, taskHub, null)
.build();

// No explicit filters — they are auto-generated from addOrchestrator/addActivity
const worker = new DurableTaskAzureManagedWorkerBuilder()
.endpoint(endpoint, taskHub, null)
.addOrchestrator(greetingOrchestrator)
.addActivity(greet)
.build();

await worker.start();
console.log("Worker started with auto-generated filters for: greetingOrchestrator, greet");

const id = await client.scheduleNewOrchestration(greetingOrchestrator, "Auto-Filters");
console.log(`Scheduled orchestration: ${id}`);

const state = await client.waitForOrchestrationCompletion(id, undefined, 30);
console.log(`Result: ${state?.serializedOutput}`);

await worker.stop();
await client.stop();
}

async function runWithExplicitFilters() {
console.log("\n=== Scenario 2: Explicit Filters ===");
console.log("The worker uses explicitly provided filters instead of auto-generating them.");
console.log("This is useful when you want fine-grained control over which work items to accept.\n");

const client = new DurableTaskAzureManagedClientBuilder()
.endpoint(endpoint, taskHub, null)
.build();

// Provide explicit filters — these override auto-generation
const filters: WorkItemFilters = {
orchestrations: [{ name: "mathOrchestrator" }],
activities: [{ name: "add" }],
};

const worker = new DurableTaskAzureManagedWorkerBuilder()
.endpoint(endpoint, taskHub, null)
.addOrchestrator(mathOrchestrator)
.addActivity(add)
.useWorkItemFilters(filters)
.build();

await worker.start();
console.log("Worker started with explicit filters for: mathOrchestrator, add");

const id = await client.scheduleNewOrchestration(mathOrchestrator, { a: 17, b: 25 });
console.log(`Scheduled orchestration: ${id}`);

const state = await client.waitForOrchestrationCompletion(id, undefined, 30);
console.log(`Result: ${state?.serializedOutput}`);

await worker.stop();
await client.stop();
}

// ============================================================================
// Step 3: Run all scenarios
// ============================================================================

(async () => {
console.log(`Connecting to DTS emulator at ${endpoint}, taskHub: ${taskHub}`);

try {
await runWithAutoGeneratedFilters();
await runWithExplicitFilters();

console.log("\n=== All scenarios completed successfully! ===");
} catch (error) {
console.error("Error:", error);
process.exit(1);
}

process.exit(0);
})();
17 changes: 17 additions & 0 deletions examples/work-item-filters/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "work-item-filters-example",
"version": "1.0.0",
"description": "Example demonstrating work item filters for Durable Task workers",
"private": true,
"scripts": {
"start": "ts-node --swc index.ts",
"start:emulator": "ENDPOINT=localhost:8080 TASKHUB=default ts-node --swc index.ts"
},
"dependencies": {
"@microsoft/durabletask-js": "workspace:*",
"@microsoft/durabletask-js-azuremanaged": "workspace:*"
},
"engines": {
"node": ">=22.0.0"
}
}
2 changes: 1 addition & 1 deletion internal/protocol/SOURCE_COMMIT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
026329c53fe6363985655857b9ca848ec7238bd2
1caadbd7ecfdf5f2309acbeac28a3e36d16aa156
21 changes: 21 additions & 0 deletions internal/protocol/protos/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ message GetWorkItemsRequest {
int32 maxConcurrentEntityWorkItems = 3;

repeated WorkerCapability capabilities = 10;
WorkItemFilters workItemFilters = 11;
}

enum WorkerCapability {
Expand All @@ -844,6 +845,26 @@ enum WorkerCapability {
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}

message WorkItemFilters {
repeated OrchestrationFilter orchestrations = 1;
repeated ActivityFilter activities = 2;
repeated EntityFilter entities = 3;
}

message OrchestrationFilter {
string name = 1;
repeated string versions = 2;
}

message ActivityFilter {
string name = 1;
repeated string versions = 2;
}

message EntityFilter {
string name = 1;
}

message WorkItem {
oneof request {
OrchestratorRequest orchestratorRequest = 1;
Expand Down
17 changes: 0 additions & 17 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions packages/durabletask-js-azuremanaged/src/worker-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Logger,
ConsoleLogger,
VersioningOptions,
WorkItemFilters,
} from "@microsoft/durabletask-js";

/**
Expand All @@ -27,6 +28,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
private _logger: Logger = new ConsoleLogger();
private _shutdownTimeoutMs?: number;
private _versioning?: VersioningOptions;
private _workItemFilters?: WorkItemFilters | "auto";

/**
* Creates a new instance of DurableTaskAzureManagedWorkerBuilder.
Expand Down Expand Up @@ -220,6 +222,21 @@ export class DurableTaskAzureManagedWorkerBuilder {
return this;
}

/**
* Enables work item filters for the worker.
* When called without arguments, filters are auto-generated from the registered
* orchestrations, activities, and entities.
* When called with a WorkItemFilters object, those specific filters are used.
* By default (when not called), no filters are sent and the worker processes all work items.
*
* @param filters Optional explicit filters. Omit to auto-generate from registry.
* @returns This builder instance.
*/
useWorkItemFilters(filters?: WorkItemFilters): DurableTaskAzureManagedWorkerBuilder {
this._workItemFilters = filters ?? "auto";
return this;
}

/**
* Builds and returns a configured TaskHubGrpcWorker.
*
Expand Down Expand Up @@ -251,6 +268,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
logger: this._logger,
shutdownTimeoutMs: this._shutdownTimeoutMs,
versioning: this._versioning,
workItemFilters: this._workItemFilters,
});

// Register all orchestrators
Expand Down
Loading
Loading