diff --git a/package-lock.json b/package-lock.json index 95061a0a9..6658760de 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "packages/interceptors-opentelemetry", "packages/nexus", "packages/nyc-test-coverage", + "packages/plugin", "packages/proto", "packages/test", "packages/testing", @@ -2797,6 +2798,10 @@ "resolved": "packages/nyc-test-coverage", "link": true }, + "node_modules/@temporalio/plugin": { + "resolved": "packages/plugin", + "link": true + }, "node_modules/@temporalio/proto": { "resolved": "packages/proto", "link": true @@ -18485,6 +18490,14 @@ "webpack": "^5.94.0" } }, + "packages/plugin": { + "name": "@temporalio/plugin", + "version": "1.13.0", + "license": "MIT", + "engines": { + "node": ">= 18.0.0" + } + }, "packages/proto": { "name": "@temporalio/proto", "version": "1.13.1", @@ -20637,6 +20650,9 @@ "webpack": "^5.94.0" } }, + "@temporalio/plugin": { + "version": "file:packages/plugin" + }, "@temporalio/proto": { "version": "file:packages/proto", "requires": { diff --git a/package.json b/package.json index 93d406579..6a8a4ba87 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "@temporalio/interceptors-opentelemetry": "file:packages/interceptors-opentelemetry", "@temporalio/nexus": "file:packages/nexus", "@temporalio/nyc-test-coverage": "file:packages/nyc-test-coverage", + "@temporalio/plugin": "file:packages/plugin", "@temporalio/proto": "file:packages/proto", "@temporalio/test": "file:packages/test", "@temporalio/testing": "file:packages/testing", @@ -90,6 +91,7 @@ "packages/interceptors-opentelemetry", "packages/nexus", "packages/nyc-test-coverage", + "packages/plugin", "packages/proto", "packages/test", "packages/testing", diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 8de7daa68..7f97500e4 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -6,6 +6,7 @@ import { ScheduleClient } from './schedule-client'; import { QueryRejectCondition, WorkflowService } from './types'; import { WorkflowClient } from './workflow-client'; import { TaskQueueClient } from './task-queue-client'; +import { ClientPlugin } from './plugin'; export interface ClientOptions extends BaseClientOptions { /** @@ -15,6 +16,14 @@ export interface ClientOptions extends BaseClientOptions { */ interceptors?: ClientInterceptors; + /** + * List of plugins to register with the client. + * + * Plugins allow you to extend and customize the behavior of Temporal clients through a chain of + * responsibility pattern. They can intercept and modify client creation. + */ + plugins?: ClientPlugin[]; + workflow?: { /** * Should a query be rejected by closed and failed workflows @@ -32,6 +41,7 @@ export type LoadedClientOptions = LoadedWithDefaults; */ export class Client extends BaseClient { public readonly options: LoadedClientOptions; + /** * Workflow sub-client - use to start and interact with Workflows */ @@ -52,9 +62,21 @@ export class Client extends BaseClient { public readonly taskQueue: TaskQueueClient; constructor(options?: ClientOptions) { + options = options ?? {}; + + // Add client plugins from the connection + options.plugins = (options.plugins ?? []).concat(options.connection?.plugins ?? []); + + // Process plugins first to allow them to modify connect configuration + for (const plugin of options.plugins) { + if (plugin.configureClient !== undefined) { + options = plugin.configureClient(options); + } + } + super(options); - const { interceptors, workflow, ...commonOptions } = options ?? {}; + const { interceptors, workflow, plugins, ...commonOptions } = options; this.workflow = new WorkflowClient({ ...commonOptions, @@ -95,6 +117,7 @@ export class Client extends BaseClient { workflow: { queryRejectCondition: this.workflow.options.queryRejectCondition, }, + plugins: plugins ?? [], }; } diff --git a/packages/client/src/connection.ts b/packages/client/src/connection.ts index 4e18e430b..34b021ffc 100644 --- a/packages/client/src/connection.ts +++ b/packages/client/src/connection.ts @@ -130,6 +130,15 @@ export interface ConnectionOptions { * @default 10 seconds */ connectTimeout?: Duration; + + /** + * List of plugins to register with the connection. + * + * Plugins allow you to configure the connection options. + * + * Any plugins provided will also be passed to any client built from this connection. + */ + plugins?: ConnectionPlugin[]; } export type ConnectionOptionsWithDefaults = Required< @@ -172,6 +181,7 @@ function addDefaults(options: ConnectionOptions): ConnectionOptionsWithDefaults interceptors: interceptors ?? [makeGrpcRetryInterceptor(defaultGrpcRetryOptions())], metadata: {}, connectTimeoutMs: msOptionalToNumber(connectTimeout) ?? 10_000, + plugins: [], ...filterNullAndUndefined(rest), }; } @@ -182,8 +192,8 @@ function addDefaults(options: ConnectionOptions): ConnectionOptionsWithDefaults * - Add default port to address if port not specified * - Set `Authorization` header based on {@link ConnectionOptions.apiKey} */ -function normalizeGRPCConfig(options?: ConnectionOptions): ConnectionOptions { - const { tls: tlsFromConfig, credentials, callCredentials, ...rest } = options || {}; +function normalizeGRPCConfig(options: ConnectionOptions): ConnectionOptions { + const { tls: tlsFromConfig, credentials, callCredentials, ...rest } = options; if (rest.apiKey) { if (rest.metadata?.['Authorization']) { throw new TypeError( @@ -322,10 +332,12 @@ export class Connection { */ public readonly healthService: HealthService; + public readonly plugins: ConnectionPlugin[]; + readonly callContextStorage: AsyncLocalStorage; private readonly apiKeyFnRef: { fn?: () => string }; - protected static createCtorOptions(options?: ConnectionOptions): ConnectionCtorOptions { + protected static createCtorOptions(options: ConnectionOptions): ConnectionCtorOptions { const normalizedOptions = normalizeGRPCConfig(options); const apiKeyFnRef: { fn?: () => string } = {}; if (normalizedOptions.apiKey) { @@ -441,6 +453,12 @@ export class Connection { * This method does not verify connectivity with the server. We recommend using {@link connect} instead. */ static lazy(options?: ConnectionOptions): Connection { + options = options ?? {}; + for (const plugin of options.plugins ?? []) { + if (plugin.configureConnection !== undefined) { + options = plugin.configureConnection(options); + } + } return new this(this.createCtorOptions(options)); } @@ -474,6 +492,7 @@ export class Connection { this.healthService = healthService; this.callContextStorage = callContextStorage; this.apiKeyFnRef = apiKeyFnRef; + this.plugins = options.plugins ?? []; } protected static generateRPCImplementation({ @@ -529,7 +548,7 @@ export class Connection { * this will locally result in the request call throwing a {@link grpc.ServiceError|ServiceError} * with code {@link grpc.status.DEADLINE_EXCEEDED|DEADLINE_EXCEEDED}; see {@link isGrpcDeadlineError}. * - * It is stronly recommended to explicitly set deadlines. If no deadline is set, then it is + * It is strongly recommended to explicitly set deadlines. If no deadline is set, then it is * possible for the client to end up waiting forever for a response. * * @param deadline a point in time after which the request will be considered as failed; either a @@ -685,3 +704,18 @@ export class Connection { return wrapper as WorkflowService; } } + +/** + * Plugin to control the configuration of a connection. + */ +export interface ConnectionPlugin { + /** + * Gets the name of this plugin. + */ + get name(): string; + + /** + * Hook called when creating a connection to allow modification of configuration. + */ + configureConnection?(options: ConnectionOptions): ConnectionOptions; +} diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 7911b1f44..34b594d3c 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -30,12 +30,19 @@ export * from '@temporalio/common/lib/interfaces'; export * from '@temporalio/common/lib/workflow-handle'; export * from './async-completion-client'; export * from './client'; -export { Connection, ConnectionOptions, ConnectionOptionsWithDefaults, LOCAL_TARGET } from './connection'; +export { + Connection, + ConnectionOptions, + ConnectionOptionsWithDefaults, + ConnectionPlugin, + LOCAL_TARGET, +} from './connection'; export * from './errors'; export * from './grpc-retry'; export * from './interceptors'; export * from './types'; export * from './workflow-client'; +export { ClientPlugin } from './plugin'; export * from './workflow-options'; export * from './schedule-types'; export * from './schedule-client'; diff --git a/packages/client/src/plugin.ts b/packages/client/src/plugin.ts new file mode 100644 index 000000000..de44495d0 --- /dev/null +++ b/packages/client/src/plugin.ts @@ -0,0 +1,19 @@ +import type { ClientOptions } from './client'; + +/** + * Plugin to control the configuration of a native connection. + */ +export interface ClientPlugin { + /** + * Gets the name of this plugin. + */ + get name(): string; + + /** + * Hook called when creating a client to allow modification of configuration. + * + * This method is called during client creation and allows plugins to modify + * the client configuration before the client is fully initialized. + */ + configureClient?(options: ClientOptions): ClientOptions; +} diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index c478222fd..4ce6f9032 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -3,6 +3,7 @@ import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue, Pri import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import * as proto from '@temporalio/proto'; import { Replace } from '@temporalio/common/lib/type-helpers'; +import type { ConnectionPlugin } from './connection'; export interface WorkflowExecution { workflowId: string; @@ -122,6 +123,7 @@ export interface CallContext { */ export interface ConnectionLike { workflowService: WorkflowService; + plugins: ConnectionPlugin[]; close(): Promise; ensureConnected(): Promise; diff --git a/packages/plugin/package.json b/packages/plugin/package.json new file mode 100644 index 000000000..6200d2be6 --- /dev/null +++ b/packages/plugin/package.json @@ -0,0 +1,35 @@ +{ + "name": "@temporalio/plugin", + "version": "1.13.0", + "description": "Library for plugin creation", + "main": "lib/index.js", + "types": "./lib/index.d.ts", + "keywords": [ + "temporal", + "workflow", + "worker", + "plugin" + ], + "author": "Temporal Technologies Inc. ", + "license": "MIT", + "dependencies": {}, + "bugs": { + "url": "https://github.com/temporalio/sdk-typescript/issues" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/temporalio/sdk-typescript.git", + "directory": "packages/plugin" + }, + "homepage": "https://github.com/temporalio/sdk-typescript/tree/main/packages/plugin", + "publishConfig": { + "access": "public" + }, + "engines": { + "node": ">= 18.0.0" + }, + "files": [ + "src", + "lib" + ] +} diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts new file mode 100644 index 000000000..9146de0c5 --- /dev/null +++ b/packages/plugin/src/index.ts @@ -0,0 +1 @@ +export { SimplePlugin, SimplePluginOptions } from './plugin'; diff --git a/packages/plugin/src/plugin.ts b/packages/plugin/src/plugin.ts new file mode 100644 index 000000000..4567e6881 --- /dev/null +++ b/packages/plugin/src/plugin.ts @@ -0,0 +1,198 @@ +import type * as nexus from 'nexus-rpc'; +import type { DataConverter } from '@temporalio/common'; +import { + ClientInterceptors, + ClientOptions, + ClientPlugin, + ConnectionPlugin, + ConnectionOptions, + WorkflowClientInterceptors, + WorkflowClientInterceptor, +} from '@temporalio/client'; +import type { + BundlerPlugin, + NativeConnectionPlugin, + WorkerInterceptors, + WorkerPlugin, + WorkflowBundleOption, + WorkerOptions, + ReplayWorkerOptions, + Worker, + BundleOptions, + NativeConnectionOptions, + TLSConfig, +} from '@temporalio/worker'; + +type PluginParameter = T | ((p: T | undefined) => T); + +export interface SimplePluginOptions { + readonly name: string; + readonly tls?: PluginParameter; + readonly apiKey?: PluginParameter; + readonly dataConverter?: PluginParameter; + readonly clientInterceptors?: PluginParameter; + readonly activities?: PluginParameter; + readonly nexusServices?: PluginParameter[]>; + readonly workflowsPath?: PluginParameter; + readonly workflowBundle?: PluginParameter; + readonly workerInterceptors?: PluginParameter; + readonly runContext?: (next: () => Promise) => Promise; +} + +export class SimplePlugin + implements WorkerPlugin, ClientPlugin, BundlerPlugin, ConnectionPlugin, NativeConnectionPlugin +{ + readonly name: string; + + constructor(protected readonly options: SimplePluginOptions) { + this.name = options.name; + } + + configureClient(options: ClientOptions): ClientOptions { + return { + ...options, + dataConverter: resolveParameter(options.dataConverter, this.options.dataConverter), + interceptors: resolveClientInterceptors(options.interceptors, this.options.clientInterceptors), + }; + } + + configureWorker(options: WorkerOptions): WorkerOptions { + return { + ...options, + dataConverter: resolveParameter(options.dataConverter, this.options.dataConverter), + activities: resolveAppendObjectParameter(options.activities, this.options.activities), + nexusServices: resolveAppendParameter(options.nexusServices, this.options.nexusServices), + workflowsPath: resolveParameter(options.workflowsPath, this.options.workflowsPath), + workflowBundle: resolveParameter(options.workflowBundle, this.options.workflowBundle), + interceptors: resolveWorkerInterceptors(options.interceptors, this.options.workerInterceptors), + }; + } + + configureReplayWorker(options: ReplayWorkerOptions): ReplayWorkerOptions { + return { + ...options, + dataConverter: resolveParameter(options.dataConverter, this.options.dataConverter), + workflowsPath: resolveParameter(options.workflowsPath, this.options.workflowsPath), + workflowBundle: resolveParameter(options.workflowBundle, this.options.workflowBundle), + interceptors: resolveWorkerInterceptors(options.interceptors, this.options.workerInterceptors), + }; + } + + async runWorker(worker: Worker, next: (w: Worker) => Promise): Promise { + if (this.options.runContext !== undefined) { + return this.options.runContext(() => next(worker)); + } + return next(worker); + } + + configureBundler(options: BundleOptions): BundleOptions { + return { + ...options, + workflowsPath: resolveRequiredParameter(options.workflowsPath, this.options.workflowsPath), + }; + } + + configureConnection(options: ConnectionOptions): ConnectionOptions { + const apiKey = typeof options.apiKey === 'function' ? options.apiKey : undefined; + return { + ...options, + tls: resolveParameter(options.tls, this.options.tls), + apiKey: apiKey ?? resolveParameter(options.apiKey as string | undefined, this.options.apiKey), + }; + } + + configureNativeConnection(options: NativeConnectionOptions): NativeConnectionOptions { + return { + ...options, + tls: resolveParameter(options.tls, this.options.tls), + apiKey: resolveParameter(options.apiKey, this.options.apiKey), + }; + } +} + +function resolveParameterWithResolution( + existing: T | undefined, + parameter: PluginParameter | undefined, + resolve: (existing: T, param: T) => T +): T | undefined { + if (parameter === undefined) { + return existing; + } + if (typeof parameter === 'function') { + // @ts-expect-error Can't infer that parameter is a function + return parameter(existing); + } + if (existing === undefined) { + return parameter; + } + return resolve(existing, parameter); +} + +function resolveRequiredParameter(existing: T, parameter?: PluginParameter): T { + return resolveParameterWithResolution(existing, parameter, (_existing, param) => param)!; +} + +function resolveParameter(existing?: T, parameter?: PluginParameter): T | undefined { + return resolveParameterWithResolution(existing as T, parameter, (_existing, param) => param); +} + +function resolveAppendParameter(existing?: T[], parameter?: PluginParameter): T[] | undefined { + if (parameter === undefined) { + return existing; + } + return resolveParameterWithResolution(existing ?? ([] as T[]), parameter, (existing, param) => + existing.concat(param) + ); +} + +function resolveAppendObjectParameter(existing?: object, parameter?: PluginParameter): object | undefined { + if (parameter === undefined) { + return existing; + } + return resolveParameterWithResolution(existing ?? {}, parameter, (existing, param) => ({ ...existing, ...param })); +} + +function resolveClientInterceptors( + existing?: ClientInterceptors, + parameter?: PluginParameter +): ClientInterceptors | undefined { + return resolveParameterWithResolution(existing, parameter, (existing, parameter) => ({ + workflow: tryConcat( + modernWorkflowInterceptors(existing?.workflow), + modernWorkflowInterceptors(parameter?.workflow) + ), + schedule: tryConcat(existing?.schedule, parameter?.schedule), + })); +} + +function resolveWorkerInterceptors( + existing?: WorkerInterceptors, + parameter?: PluginParameter +): WorkerInterceptors | undefined { + return resolveParameterWithResolution(existing, parameter, (existing, parameter) => ({ + client: resolveClientInterceptors(existing.client, parameter.client), + activity: resolveAppendParameter(existing.activity, parameter.activity), + nexus: resolveAppendParameter(existing.nexus, parameter.nexus), + workflowModules: resolveAppendParameter(existing.workflowModules, parameter.workflowModules), + })); +} + +// eslint-disable-next-line deprecation/deprecation +function modernWorkflowInterceptors( + interceptors: WorkflowClientInterceptors | WorkflowClientInterceptor[] | undefined +): WorkflowClientInterceptor[] | undefined { + if (interceptors === undefined || Array.isArray(interceptors)) { + return interceptors; + } + throw new Error("Simple plugin doesn't support old style workflow client interceptors"); +} + +function tryConcat(left: T[] | undefined, right: T[] | undefined): T[] | undefined { + if (right === undefined) { + return left; + } + if (left === undefined) { + return right; + } + return left.concat(right); +} diff --git a/packages/plugin/tsconfig.json b/packages/plugin/tsconfig.json new file mode 100644 index 000000000..1e09513a4 --- /dev/null +++ b/packages/plugin/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./lib", + "rootDir": "./src" + }, + "include": ["./src/**/*.ts"] +} diff --git a/packages/test/package.json b/packages/test/package.json index 128d39ddb..83d5bbe10 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -40,6 +40,7 @@ "@temporalio/interceptors-opentelemetry": "file:../interceptors-opentelemetry", "@temporalio/nexus": "file:../nexus", "@temporalio/nyc-test-coverage": "file:../nyc-test-coverage", + "@temporalio/plugin": "file:../plugin", "@temporalio/proto": "file:../proto", "@temporalio/testing": "file:../testing", "@temporalio/worker": "file:../worker", diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index e10f92550..ae29f7a6c 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -176,7 +176,7 @@ export class Worker extends RealWorker { taskQueue: opts.taskQueue, }); const nativeWorker = new MockNativeWorker(); - super(runtime, nativeWorker, workflowCreator, opts, logger, runtime.metricMeter); + super(runtime, nativeWorker, workflowCreator, opts, logger, runtime.metricMeter, opts.plugins ?? []); } public runWorkflows(...args: Parameters): Promise { diff --git a/packages/test/src/test-plugins.ts b/packages/test/src/test-plugins.ts new file mode 100644 index 000000000..1bf33e852 --- /dev/null +++ b/packages/test/src/test-plugins.ts @@ -0,0 +1,247 @@ +import { randomUUID } from 'crypto'; +import anyTest, { TestFn } from 'ava'; +import { Client, ClientOptions, ConnectionPlugin, ClientPlugin as ClientPlugin } from '@temporalio/client'; +import { + WorkerOptions, + WorkerPlugin as WorkerPlugin, + Worker, + BundlerPlugin, + BundleOptions, + bundleWorkflowCode, + NativeConnectionPlugin, +} from '@temporalio/worker'; +import { SimplePlugin } from '@temporalio/plugin'; +import { activityWorkflow, helloWorkflow } from './workflows/plugins'; +import { TestWorkflowEnvironment } from './helpers'; + +import * as activities from './activities'; + +interface Context { + testEnv: TestWorkflowEnvironment; +} +const test = anyTest as TestFn; + +test.before(async (t) => { + t.context = { + testEnv: await TestWorkflowEnvironment.createLocal(), + }; +}); + +test.after.always(async (t) => { + await t.context.testEnv?.teardown(); +}); + +export class ExamplePlugin + implements WorkerPlugin, ClientPlugin, BundlerPlugin, ConnectionPlugin, NativeConnectionPlugin +{ + readonly name: string = 'example-plugin'; + + constructor() {} + + configureClient(config: ClientOptions): ClientOptions { + console.log('ExamplePlugin: Configuring client'); + config.identity = 'Plugin Identity'; + return config; + } + + configureWorker(config: WorkerOptions): WorkerOptions { + console.log('ExamplePlugin: Configuring worker'); + config.taskQueue = 'plugin-task-queue'; + return config; + } + + configureBundler(config: BundleOptions): BundleOptions { + console.log('Configure bundler'); + config.workflowsPath = require.resolve('./workflows/plugins'); + return config; + } +} + +test('Basic plugin', async (t) => { + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const plugin = new ExamplePlugin(); + const bundle = await bundleWorkflowCode({ + workflowsPath: 'replaced', + plugins: [plugin], + }); + + const worker = await Worker.create({ + workflowBundle: bundle, + connection: t.context.testEnv.nativeConnection, + taskQueue: 'will be overridden', + plugins: [plugin], + }); + + await worker.runUntil(async () => { + t.is(worker.options.taskQueue, 'plugin-task-queue'); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: 'plugin-task-queue', + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); + +test('Bundler plugins are passed from worker', async (t) => { + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const worker = await Worker.create({ + workflowsPath: 'replaced', + connection: t.context.testEnv.nativeConnection, + taskQueue: 'will be overridden', + plugins: [new ExamplePlugin()], + }); + await worker.runUntil(async () => { + t.is(worker.options.taskQueue, 'plugin-task-queue'); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: 'plugin-task-queue', + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); + +test('Worker plugins are passed from native connection', async (t) => { + const env = await TestWorkflowEnvironment.createLocal({ plugins: [new ExamplePlugin()] }); + try { + const client = new Client({ connection: env.connection }); + + const worker = await Worker.create({ + workflowsPath: 'replaced', + connection: env.nativeConnection, + taskQueue: 'will be overridden', + }); + + t.is(worker.options.taskQueue, 'plugin-task-queue'); + + await worker.runUntil(async () => { + t.is(worker.options.taskQueue, 'plugin-task-queue'); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: 'plugin-task-queue', + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); + } finally { + await env.teardown(); + } +}); + +test('Client plugins are passed from connections', async (t) => { + const env = await TestWorkflowEnvironment.createLocal({ plugins: [new ExamplePlugin()] }); + try { + const client = new Client({ connection: env.connection }); + t.is(client.options.identity, 'Plugin Identity'); + + const clientNative = new Client({ connection: env.nativeConnection }); + t.is(clientNative.options.identity, 'Plugin Identity'); + } finally { + await env.teardown(); + } +}); + +test('Bundler plugins are passed from connections', async (t) => { + const plugin = new (class implements BundlerPlugin { + name: string = 'plugin'; + configureBundler(options: BundleOptions): BundleOptions { + return { ...options, workflowsPath: require.resolve('./workflows/plugins') }; + } + })(); + const env = await TestWorkflowEnvironment.createLocal({ plugins: [plugin] }); + try { + const client = new Client({ connection: env.connection }); + const worker = await Worker.create({ + workflowsPath: 'replaced', + connection: env.nativeConnection, + taskQueue: 'plugin-task-queue', + }); + + await worker.runUntil(async () => { + t.is(worker.options.taskQueue, 'plugin-task-queue'); + const result = await client.workflow.execute(helloWorkflow, { + taskQueue: 'plugin-task-queue', + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); + } finally { + await env.teardown(); + } +}); + +// SimplePlugin tests +test('SimplePlugin connection configurations', async (t) => { + const plugin = new SimplePlugin({ + name: 'test-simple-plugin', + tls: true, + apiKey: 'testApiKey', + }); + + const options = plugin.configureNativeConnection({}); + t.is(options.tls, true); + t.is(options.apiKey, 'testApiKey'); +}); + +test('SimplePlugin worker configurations', async (t) => { + const plugin = new SimplePlugin({ + name: 'test-simple-plugin', + activities, + workflowsPath: require.resolve('./workflows/plugins'), + }); + + const { connection } = t.context.testEnv; + const client = new Client({ connection }); + + const worker = await Worker.create({ + workflowsPath: 'replaced', + connection: t.context.testEnv.nativeConnection, + taskQueue: 'simple-plugin-queue', + plugins: [plugin], + }); + + await worker.runUntil(async () => { + const result = await client.workflow.execute(activityWorkflow, { + taskQueue: 'simple-plugin-queue', + workflowExecutionTimeout: '30 seconds', + workflowId: randomUUID(), + }); + + t.is(result, 'Hello'); + }); +}); + +test('SimplePlugin with activities merges them correctly', async (t) => { + const activity1 = async () => 'activity1'; + const activity2 = async () => 'activity2'; + + const plugin = new SimplePlugin({ + name: 'simple-test-plugin', + activities: { + pluginActivity: activity2, + }, + }); + + const worker = await Worker.create({ + connection: t.context.testEnv.nativeConnection, + taskQueue: 'simple-plugin-queue', + activities: { + existingActivity: activity1, + }, + plugins: [plugin], + }); + + t.truthy(worker.options.activities); + t.truthy(worker.options.activities.has('existingActivity')); + t.truthy(worker.options.activities.has('pluginActivity')); +}); diff --git a/packages/test/src/workflows/plugins.ts b/packages/test/src/workflows/plugins.ts new file mode 100644 index 000000000..f9194375d --- /dev/null +++ b/packages/test/src/workflows/plugins.ts @@ -0,0 +1,15 @@ +import { proxyActivities } from '@temporalio/workflow'; +import type * as activities from '../activities'; + +const { echo } = proxyActivities({ + startToCloseTimeout: '20s', + retry: { initialInterval: 5, maximumAttempts: 1, nonRetryableErrorTypes: ['NonRetryableError'] }, +}); + +export async function helloWorkflow(): Promise { + return 'Hello'; +} + +export async function activityWorkflow(): Promise { + return echo('Hello'); +} diff --git a/packages/test/tsconfig.json b/packages/test/tsconfig.json index 3e55850d4..cc60e626c 100644 --- a/packages/test/tsconfig.json +++ b/packages/test/tsconfig.json @@ -14,6 +14,7 @@ { "path": "../common" }, { "path": "../interceptors-opentelemetry" }, { "path": "../nexus" }, + { "path": "../plugin" }, { "path": "../testing" }, { "path": "../worker" }, { "path": "../workflow" }, diff --git a/packages/testing/src/testing-workflow-environment.ts b/packages/testing/src/testing-workflow-environment.ts index 5187f996f..054e86472 100644 --- a/packages/testing/src/testing-workflow-environment.ts +++ b/packages/testing/src/testing-workflow-environment.ts @@ -1,5 +1,12 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import -import { AsyncCompletionClient, Client, Connection, WorkflowClient } from '@temporalio/client'; +import { + AsyncCompletionClient, + Client, + ClientPlugin, + Connection, + ConnectionPlugin, + WorkflowClient, +} from '@temporalio/client'; import { ConnectionOptions, InternalConnectionOptions, @@ -7,7 +14,7 @@ import { } from '@temporalio/client/lib/connection'; import { Duration, TypedSearchAttributes } from '@temporalio/common'; import { msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time'; -import { NativeConnection, NativeConnectionOptions, Runtime } from '@temporalio/worker'; +import { NativeConnection, NativeConnectionPlugin, NativeConnectionOptions, Runtime } from '@temporalio/worker'; import { native } from '@temporalio/core-bridge'; import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { toNativeEphemeralServerConfig, DevServerConfig, TimeSkippingServerConfig } from './ephemeral-server'; @@ -19,6 +26,7 @@ import { ClientOptionsForTestEnv, TimeSkippingClient } from './client'; export type LocalTestWorkflowEnvironmentOptions = { server?: Omit; client?: ClientOptionsForTestEnv; + plugins?: (ClientPlugin | ConnectionPlugin | NativeConnectionPlugin)[]; }; /** @@ -27,6 +35,7 @@ export type LocalTestWorkflowEnvironmentOptions = { export type TimeSkippingTestWorkflowEnvironmentOptions = { server?: Omit; client?: ClientOptionsForTestEnv; + plugins?: (ClientPlugin | ConnectionPlugin | NativeConnectionPlugin)[]; }; /** @@ -38,6 +47,7 @@ export type ExistingServerTestWorkflowEnvironmentOptions = { /** If not set, defaults to default */ namespace?: string; client?: ClientOptionsForTestEnv; + plugins?: (ClientPlugin | ConnectionPlugin | NativeConnectionPlugin)[]; }; /** @@ -99,11 +109,13 @@ export class TestWorkflowEnvironment { ? new TimeSkippingClient({ connection, namespace: this.namespace, + plugins: options.plugins, ...options.client, }) : new Client({ connection, namespace: this.namespace, + plugins: options.plugins, ...options.client, }); this.asyncCompletionClient = this.client.activity; // eslint-disable-line deprecation/deprecation @@ -144,6 +156,7 @@ export class TestWorkflowEnvironment { return await this.create({ server: { type: 'time-skipping', ...opts?.server }, client: opts?.client, + plugins: opts?.plugins, supportsTimeSkipping: true, }); } @@ -173,6 +186,7 @@ export class TestWorkflowEnvironment { return await this.create({ server: { type: 'dev-server', ...opts?.server }, client: opts?.client, + plugins: opts?.plugins, namespace: opts?.server?.namespace, supportsTimeSkipping: false, }); @@ -188,6 +202,7 @@ export class TestWorkflowEnvironment { return await this.create({ server: { type: 'existing' }, client: opts?.client, + plugins: opts?.plugins, namespace: opts?.namespace ?? 'default', supportsTimeSkipping: false, address: opts?.address, @@ -231,10 +246,12 @@ export class TestWorkflowEnvironment { const nativeConnection = await NativeConnection.connect({ address, + plugins: opts.plugins, [InternalConnectionOptionsSymbol]: { supportsTestService: supportsTimeSkipping }, }); const connection = await Connection.connect({ address, + plugins: opts.plugins, [InternalConnectionOptionsSymbol]: { supportsTestService: supportsTimeSkipping }, }); @@ -335,6 +352,7 @@ export class TestWorkflowEnvironment { type TestWorkflowEnvironmentOptions = { server: DevServerConfig | TimeSkippingServerConfig | ExistingServerConfig; client?: ClientOptionsForTestEnv; + plugins?: (ClientPlugin | ConnectionPlugin | NativeConnectionPlugin)[]; }; type ExistingServerConfig = { type: 'existing' }; @@ -348,5 +366,6 @@ function addDefaults(opts: TestWorkflowEnvironmentOptions): TestWorkflowEnvironm server: { ...opts.server, }, + plugins: [], }; } diff --git a/packages/worker/src/connection-options.ts b/packages/worker/src/connection-options.ts index 41bdb80b4..02590774a 100644 --- a/packages/worker/src/connection-options.ts +++ b/packages/worker/src/connection-options.ts @@ -9,6 +9,7 @@ import { } from '@temporalio/common/lib/internal-non-workflow'; import type { Metadata } from '@temporalio/client'; import pkg from './pkg'; +import type { NativeConnectionPlugin } from './connection'; export { TLSConfig, ProxyConfig }; @@ -60,6 +61,15 @@ export interface NativeConnectionOptions { * @default false */ disableErrorCodeMetricTags?: boolean; + + /** + * List of plugins to register with the native connection. + * + * Plugins allow you to configure the native connection options. + * + * Any plugins provided will also be passed to any Worker, Client, or Bundler built from this connection. + */ + plugins?: NativeConnectionPlugin[]; } // Compile to Native /////////////////////////////////////////////////////////////////////////////// diff --git a/packages/worker/src/connection.ts b/packages/worker/src/connection.ts index 028706c34..a0c63788a 100644 --- a/packages/worker/src/connection.ts +++ b/packages/worker/src/connection.ts @@ -70,7 +70,8 @@ export class NativeConnection implements ConnectionLike { protected constructor( private readonly runtime: Runtime, private readonly nativeClient: native.Client, - private readonly enableTestService: boolean + private readonly enableTestService: boolean, + readonly plugins: NativeConnectionPlugin[] ) { this.workflowService = WorkflowService.create( this.sendRequest.bind(this, native.clientSendWorkflowServiceRequest.bind(undefined, this.nativeClient)), @@ -230,13 +231,20 @@ export class NativeConnection implements ConnectionLike { * Eagerly connect to the Temporal server and return a NativeConnection instance */ static async connect(options?: NativeConnectionOptions): Promise { + options = options ?? {}; + for (const plugin of options.plugins ?? []) { + if (plugin.configureNativeConnection !== undefined) { + options = plugin.configureNativeConnection(options); + } + } const internalOptions = (options as InternalConnectionOptions)?.[InternalConnectionOptionsSymbol] ?? {}; const enableTestService = internalOptions.supportsTestService ?? false; try { const runtime = Runtime.instance(); + const client = await runtime.createNativeClient(options); - return new this(runtime, client, enableTestService); + return new this(runtime, client, enableTestService, options.plugins ?? []); } catch (err) { if (err instanceof TransportError) { throw new TransportError(err.message); @@ -349,3 +357,18 @@ function tagMetadata(metadata: Metadata): Record { ]) ); } + +/** + * Plugin to control the configuration of a native connection. + */ +export interface NativeConnectionPlugin { + /** + * Gets the name of this plugin. + */ + get name(): string; + + /** + * Hook called when creating a native connection to allow modification of configuration. + */ + configureNativeConnection?(options: NativeConnectionOptions): NativeConnectionOptions; +} diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 3730aa233..b7cbcb3c1 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -8,7 +8,7 @@ * @module */ -export { NativeConnection } from './connection'; +export { NativeConnection, NativeConnectionPlugin } from './connection'; export { NativeConnectionOptions, TLSConfig } from './connection-options'; export { startDebugReplayer } from './debug-replayer'; export { IllegalStateError } from '@temporalio/common'; @@ -35,6 +35,7 @@ export { } from './runtime-options'; export * from './sinks'; export { DataConverter, defaultPayloadConverter, State, Worker, WorkerStatus } from './worker'; +export { WorkerPlugin } from './plugin'; export { CompiledWorkerOptions, ReplayWorkerOptions, @@ -44,7 +45,7 @@ export { WorkflowBundlePath, } from './worker-options'; export { ReplayError, ReplayHistoriesIterable, ReplayResult } from './replay'; -export { BundleOptions, bundleWorkflowCode, WorkflowBundleWithSourceMap } from './workflow/bundler'; +export { BundleOptions, bundleWorkflowCode, WorkflowBundleWithSourceMap, BundlerPlugin } from './workflow/bundler'; export { WorkerTuner, TunerHolder, diff --git a/packages/worker/src/plugin.ts b/packages/worker/src/plugin.ts new file mode 100644 index 000000000..f30a91223 --- /dev/null +++ b/packages/worker/src/plugin.ts @@ -0,0 +1,41 @@ +import type { ReplayWorkerOptions, WorkerOptions } from './worker-options'; +import type { Worker } from './worker'; + +/** + * Plugin interface for worker functionality. + * + * Plugins provide a way to extend and customize the behavior of Temporal workers. + * They allow you to intercept and modify worker configuration and worker execution. + */ +export interface WorkerPlugin { + /** + * Gets the name of this plugin. + */ + get name(): string; + + /** + * Hook called when creating a worker to allow modification of configuration. + * + * This method is called during worker creation and allows plugins to modify + * the worker configuration before the worker is fully initialized. Plugins + * can add activities, workflows, interceptors, or change other settings. + */ + configureWorker?(options: WorkerOptions): WorkerOptions; + + /** + * Hook called when creating a replay worker to allow modification of configuration. + * + * This method is called during worker creation and allows plugins to modify + * the worker configuration before the worker is fully initialized. Plugins + * can add activities, workflows, interceptors, or change other settings. + */ + configureReplayWorker?(options: ReplayWorkerOptions): ReplayWorkerOptions; + + /** + * Hook called when running a worker. + * + * This method is not called when running a replay worker, as activities will not be + * executed, and global state can't affect the workflow. + */ + runWorker?(worker: Worker, next: (w: Worker) => Promise): Promise; +} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index dc9dc6fb8..261b29392 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -27,6 +27,7 @@ import { InjectedSinks } from './sinks'; import { MiB } from './utils'; import { WorkflowBundleWithSourceMap } from './workflow/bundler'; import { asNativeTuner, WorkerTuner } from './worker-tuner'; +import { WorkerPlugin } from './plugin'; /** * Options to configure the {@link Worker} @@ -481,6 +482,19 @@ export interface WorkerOptions { */ interceptors?: WorkerInterceptors; + /** + * List of plugins to register with the worker. + * + * Plugins allow you to extend and customize the behavior of Temporal workers. + * They can intercept and modify worker creation, configuration, and execution. + * + * Worker plugins can be used to add custom activities, workflows, interceptors, or modify other + * worker settings before the worker is fully initialized. + * + * Any plugins provided will also be passed to the bundler if used. + */ + plugins?: WorkerPlugin[]; + /** * Registration of a {@link SinkFunction}, including per-sink-function options. * diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 47a392d94..716106f85 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -109,6 +109,7 @@ import { } from './errors'; import { constructNexusOperationContext, NexusHandler } from './nexus'; import { handlerErrorToProto } from './nexus/conversions'; +import { WorkerPlugin } from './plugin'; export { DataConverter, defaultPayloadConverter }; @@ -500,6 +501,12 @@ export class Worker { * This method initiates a connection to the server and will throw (asynchronously) on connection failure. */ public static async create(options: WorkerOptions): Promise { + options.plugins = (options.plugins ?? []).concat(options.connection?.plugins ?? []); + for (const plugin of options.plugins) { + if (plugin.configureWorker !== undefined) { + options = plugin.configureWorker(options); + } + } if (!options.taskQueue) { throw new TypeError('Task queue name is required'); } @@ -555,6 +562,7 @@ export class Worker { compiledOptionsWithBuildId, logger, metricMeter, + options.plugins || [], connection ); } @@ -697,6 +705,12 @@ export class Worker { } private static async constructReplayWorker(options: ReplayWorkerOptions): Promise<[Worker, native.HistoryPusher]> { + const plugins = options.plugins ?? []; + for (const plugin of plugins) { + if (plugin.configureReplayWorker !== undefined) { + options = plugin.configureReplayWorker(options); + } + } const nativeWorkerCtor: NativeWorkerConstructor = this.nativeWorkerCtor; const fixedUpOptions: WorkerOptions = { taskQueue: (options.replayName ?? 'fake_replay_queue') + '-' + this.replayWorkerCount, @@ -724,7 +738,17 @@ export class Worker { addBuildIdIfMissing(compiledOptions, bundle.code) ); return [ - new this(runtime, replayHandle.worker, workflowCreator, compiledOptions, logger, metricMeter, undefined, true), + new this( + runtime, + replayHandle.worker, + workflowCreator, + compiledOptions, + logger, + metricMeter, + plugins, + undefined, + true + ), replayHandle.historyPusher, ]; } @@ -770,6 +794,7 @@ export class Worker { payloadConverterPath: compiledOptions.dataConverter?.payloadConverterPath, ignoreModules: compiledOptions.bundlerOptions?.ignoreModules, webpackConfigHook: compiledOptions.bundlerOptions?.webpackConfigHook, + plugins: compiledOptions.plugins, }); const bundle = await bundler.createBundle(); return parseWorkflowCode(bundle.code); @@ -792,6 +817,7 @@ export class Worker { /** Logger bound to 'sdkComponent: worker' */ protected readonly logger: Logger, protected readonly metricMeter: MetricMeter, + protected readonly plugins: WorkerPlugin[], protected readonly connection?: NativeConnection, protected readonly isReplayWorker: boolean = false ) { @@ -1956,6 +1982,19 @@ export class Worker { * To stop polling, call {@link shutdown} or send one of {@link Runtime.options.shutdownSignals}. */ async run(): Promise { + if (this.isReplayWorker) { + return this.runInternal(); + } + let runWorker = (w: Worker) => w.runInternal(); + for (let i = this.plugins.length - 1; i >= 0; --i) { + const rw = runWorker; + const plugin = this.plugins[i]; + runWorker = (w: Worker) => plugin.runWorker?.(w, rw) ?? rw(w); + } + return runWorker(this); + } + + private async runInternal(): Promise { if (this.state !== 'INITIALIZED') { throw new IllegalStateError('Poller was already started'); } diff --git a/packages/worker/src/workflow/bundler.ts b/packages/worker/src/workflow/bundler.ts index 0361c1a7c..6a67e132b 100644 --- a/packages/worker/src/workflow/bundler.ts +++ b/packages/worker/src/workflow/bundler.ts @@ -53,16 +53,24 @@ export class WorkflowCodeBundler { protected readonly failureConverterPath?: string; protected readonly ignoreModules: string[]; protected readonly webpackConfigHook: (config: Configuration) => Configuration; + protected readonly plugins: BundlerPlugin[]; - constructor({ - logger, - workflowsPath, - payloadConverterPath, - failureConverterPath, - workflowInterceptorModules, - ignoreModules, - webpackConfigHook, - }: BundleOptions) { + constructor(options: BundleOptions) { + this.plugins = options.plugins ?? []; + for (const plugin of this.plugins) { + if (plugin.configureBundler !== undefined) { + options = plugin.configureBundler(options); + } + } + const { + logger, + workflowsPath, + payloadConverterPath, + failureConverterPath, + workflowInterceptorModules, + ignoreModules, + webpackConfigHook, + } = options; this.logger = logger ?? new DefaultLogger('INFO'); this.workflowsPath = workflowsPath; this.payloadConverterPath = payloadConverterPath; @@ -307,6 +315,21 @@ exports.importInterceptors = function importInterceptors() { } } +export interface BundlerPlugin { + /** + * Gets the name of this plugin. + * + * Returns: + * The name of the plugin. + */ + get name(): string; + + /** + * Hook called when creating a bundler to allow modification of configuration. + */ + configureBundler?(options: BundleOptions): BundleOptions; +} + /** * Options for bundling Workflow code using Webpack */ @@ -350,6 +373,11 @@ export interface BundleOptions { * {@link https://webpack.js.org/configuration/ | configuration} object so you can modify it. */ webpackConfigHook?: (config: Configuration) => Configuration; + + /** + * List of plugins to register with the bundler. + */ + plugins?: BundlerPlugin[]; } /**