diff --git a/docs/logging.md b/docs/logging.md index e8f71a436..765248664 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -13,7 +13,7 @@ import { Worker, DefaultLogger } from '@temporalio/worker'; const logger = new DefaultLogger('WARNING', (severity, message, meta) => { /* Implement this in order to generate custom log output */ }); -const worker = new Worker(__dirname, { logger }); +const worker = await Worker.create(__dirname, { logger }); ``` #### BYOL - Bring your own logger @@ -27,5 +27,5 @@ const logger = winston.createLogger({ format: winston.format.json(), transports: [new winston.transports.Console()], }); -const worker = new Worker(__dirname, { logger }); +const worker = await Worker.create(__dirname, { logger }); ``` diff --git a/packages/create-project/samples/worker.ts b/packages/create-project/samples/worker.ts index 3a9ffbcb5..ea72509fd 100644 --- a/packages/create-project/samples/worker.ts +++ b/packages/create-project/samples/worker.ts @@ -2,7 +2,7 @@ import { Worker } from '@temporalio/worker'; async function run() { // Automatically locate and register activities and workflows - const worker = new Worker(__dirname); + const worker = await Worker.create(__dirname); // Bind to the `tutorial` queue and start accepting tasks await worker.run('tutorial'); } diff --git a/packages/meta/README.md b/packages/meta/README.md index e4a16d444..d5107f1ea 100644 --- a/packages/meta/README.md +++ b/packages/meta/README.md @@ -121,7 +121,7 @@ import { Worker } from '@temporalio/worker'; (async () => { // Automatically locate and register activities and workflows - const worker = new Worker(__dirname); + const worker = await Worker.create(__dirname); // Bind to the `tutorial` queue and start accepting tasks await worker.run('tutorial'); })(); diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index dcbcdb8a6..a8af6ff04 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -2,22 +2,47 @@ import { v4 as uuid4 } from 'uuid'; import { coresdk } from '@temporalio/proto'; import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/data-converter'; -import { Worker as RealWorker, NativeWorkerLike } from '@temporalio/worker/lib/worker'; +import { Worker as RealWorker, NativeWorkerLike, WorkerOptions } from '@temporalio/worker/lib/worker'; import { sleep } from '@temporalio/worker/lib/utils'; +export type Task = + | { workflow: coresdk.workflow_activation.IWFActivation } + | { activity: coresdk.activity_task.IActivityTask }; + export class MockNativeWorker implements NativeWorkerLike { - tasks: Array> = []; - reject?: (err: Error) => void; - completionCallback?: (arr: ArrayBuffer) => void; + activityTasks: Array> = []; + workflowActivations: Array> = []; + activityCompletionCallback?: (arr: ArrayBuffer) => void; + workflowCompletionCallback?: (arr: ArrayBuffer) => void; activityHeartbeatCallback?: (activityId: string, details: any) => void; + reject?: (err: Error) => void; + + public static async create(): Promise { + return new this(); + } + + public async breakLoop(): Promise { + // Nothing to break from + } public shutdown(): void { - this.tasks.unshift(Promise.reject(new Error('[Core::shutdown]'))); + this.activityTasks.unshift(Promise.reject(new Error('Core is shut down'))); + this.workflowActivations.unshift(Promise.reject(new Error('Core is shut down'))); + } + + public async pollWorkflowActivation(_queueName: string): Promise { + for (;;) { + const task = this.workflowActivations.pop(); + if (task !== undefined) { + return task; + } + await sleep(1); + } } - public async poll(_queueName: string): Promise { + public async pollActivityTask(_queueName: string): Promise { for (;;) { - const task = this.tasks.pop(); + const task = this.activityTasks.pop(); if (task !== undefined) { return task; } @@ -25,29 +50,52 @@ export class MockNativeWorker implements NativeWorkerLike { } } - public completeTask(result: ArrayBuffer): void { - this.completionCallback!(result); - this.completionCallback = undefined; + public async completeWorkflowActivation(result: ArrayBuffer): Promise { + this.workflowCompletionCallback!(result); + this.workflowCompletionCallback = undefined; + } + + public async completeActivityTask(result: ArrayBuffer): Promise { + this.activityCompletionCallback!(result); + this.activityCompletionCallback = undefined; + } + + public emit(task: Task): void { + if ('workflow' in task) { + const arr = coresdk.workflow_activation.WFActivation.encode(task.workflow).finish(); + const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); + this.workflowActivations.unshift(Promise.resolve(buffer)); + } else { + const arr = coresdk.activity_task.ActivityTask.encode(task.activity).finish(); + const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); + this.activityTasks.unshift(Promise.resolve(buffer)); + } } - public emit(task: coresdk.ITask): void { - const arr = coresdk.Task.encode(task).finish(); + public async runWorkflowActivation( + activation: coresdk.workflow_activation.IWFActivation + ): Promise { + activation = { ...activation, taskToken: activation.taskToken ?? Buffer.from(uuid4()) }; + const arr = coresdk.workflow_activation.WFActivation.encode(activation).finish(); const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); - this.tasks.unshift(Promise.resolve(buffer)); + const result = await new Promise((resolve) => { + this.workflowCompletionCallback = resolve; + this.workflowActivations.unshift(Promise.resolve(buffer)); + }); + return coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(new Uint8Array(result)); } - public async runAndWaitCompletion(task: coresdk.ITask): Promise { - task = { ...task, taskToken: task.taskToken ?? Buffer.from(uuid4()) }; - const arr = coresdk.Task.encode(task).finish(); + public async runActivityTask(task: coresdk.activity_task.IActivityTask): Promise { + const arr = coresdk.activity_task.ActivityTask.encode(task).finish(); const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); const result = await new Promise((resolve) => { - this.completionCallback = resolve; - this.tasks.unshift(Promise.resolve(buffer)); + this.activityCompletionCallback = resolve; + this.activityTasks.unshift(Promise.resolve(buffer)); }); - return coresdk.TaskCompletion.decodeDelimited(new Uint8Array(result)); + return coresdk.ActivityTaskCompletion.decodeDelimited(new Uint8Array(result)); } - sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { + public async sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): Promise { const payload = details && coresdk.common.Payload.decode(new Uint8Array(details)); const arg = payload ? defaultDataConverter.fromPayload(payload) : undefined; this.activityHeartbeatCallback!(activityId, arg); @@ -70,4 +118,9 @@ export class Worker extends RealWorker { public get native(): MockNativeWorker { return this.nativeWorker as MockNativeWorker; } + + public constructor(pwd: string, opts?: WorkerOptions) { + const nativeWorker = new MockNativeWorker(); + super(nativeWorker, pwd, opts); + } } diff --git a/packages/test/src/run-a-worker.ts b/packages/test/src/run-a-worker.ts index 313a6194d..35e81a640 100644 --- a/packages/test/src/run-a-worker.ts +++ b/packages/test/src/run-a-worker.ts @@ -1,7 +1,7 @@ import { Worker } from '@temporalio/worker'; async function main() { - const worker = new Worker(__dirname, { + const worker = await Worker.create(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib`, activitiesPath: `${__dirname}/../../test-activities/lib`, }); diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index ccc429c71..3b1981777 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -1,5 +1,5 @@ /* eslint @typescript-eslint/no-non-null-assertion: 0 */ -import test from 'ava'; +import anyTest, { TestInterface } from 'ava'; import { v4 as uuid4 } from 'uuid'; import { Connection } from '@temporalio/client'; import { tsToMs } from '@temporalio/workflow/commonjs/time'; @@ -23,21 +23,28 @@ const { const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); +export interface Context { + worker: Worker; +} + +const test = anyTest as TestInterface; + if (RUN_INTEGRATION_TESTS) { - const worker = new Worker(__dirname, { - workflowsPath: `${__dirname}/../../test-workflows/lib`, - activitiesPath: `${__dirname}/../../test-activities/lib`, - logger: new DefaultLogger('DEBUG'), - }); + test.before(async (t) => { + const worker = await Worker.create(__dirname, { + workflowsPath: `${__dirname}/../../test-workflows/lib`, + activitiesPath: `${__dirname}/../../test-activities/lib`, + logger: new DefaultLogger('DEBUG'), + }); + t.context = { worker }; - test.before((t) => { worker.run('test').catch((err) => { console.error(err); t.fail(`Failed to run worker: ${err}`); }); }); - test.after.always(() => { - worker.shutdown(); + test.after.always((t) => { + t.context.worker.shutdown(); }); test('Workflow not found results in failure', async (t) => { @@ -75,12 +82,11 @@ if (RUN_INTEGRATION_TESTS) { t.throwsAsync(promise, { message: /just because/, instanceOf: WorkflowExecutionFailedError }); }); - // Activities not yet properly implemented - test.skip('http', async (t) => { + test('http', async (t) => { const client = new Connection(); const workflow = client.workflow('http', { taskQueue: 'test' }); const res = await workflow.start(); - t.is(res, [await httpGet('https://google.com'), await httpGet('http://example.com')]); + t.deepEqual(res, [await httpGet('https://google.com'), await httpGet('http://example.com')]); }); test('set-timeout', async (t) => { diff --git a/packages/test/src/test-worker-activities.ts b/packages/test/src/test-worker-activities.ts index 2fbae94f2..68b7df279 100644 --- a/packages/test/src/test-worker-activities.ts +++ b/packages/test/src/test-worker-activities.ts @@ -31,10 +31,13 @@ test.beforeEach((t) => { function compareCompletion( t: ExecutionContext, - actual: coresdk.TaskCompletion, - expected: coresdk.ITaskCompletion + actual: coresdk.activity_result.IActivityResult | null | undefined, + expected: coresdk.activity_result.IActivityResult ) { - t.deepEqual(actual.toJSON(), coresdk.TaskCompletion.create(expected).toJSON()); + t.deepEqual( + coresdk.activity_result.ActivityResult.create(actual || undefined).toJSON(), + coresdk.activity_result.ActivityResult.create(expected).toJSON() + ); } test('Worker runs an activity and reports completion', async (t) => { @@ -42,19 +45,16 @@ test('Worker runs an activity and reports completion', async (t) => { await runWorker(t, async () => { const taskToken = Buffer.from(uuid4()); const url = 'https://temporal.io'; - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - start: { - activityType: JSON.stringify(['@activities', 'httpGet']), - input: defaultDataConverter.toPayloads(url), - }, + activityId: 'abc', + start: { + activityType: JSON.stringify(['@activities', 'httpGet']), + input: defaultDataConverter.toPayloads(url), }, }); - compareCompletion(t, completion, { - taskToken, - activity: { completed: { result: defaultDataConverter.toPayloads(await httpGet(url)) } }, + compareCompletion(t, completion.result, { + completed: { result: defaultDataConverter.toPayload(await httpGet(url)) }, }); }); }); @@ -64,19 +64,16 @@ test('Worker runs an activity and reports failure', async (t) => { await runWorker(t, async () => { const taskToken = Buffer.from(uuid4()); const message = ':('; - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - start: { - activityType: JSON.stringify(['@activities', 'throwAnError']), - input: defaultDataConverter.toPayloads(message), - }, + activityId: 'abc', + start: { + activityType: JSON.stringify(['@activities', 'throwAnError']), + input: defaultDataConverter.toPayloads(message), }, }); - compareCompletion(t, completion, { - taskToken, - activity: { failed: { failure: { message } } }, + compareCompletion(t, completion.result, { + failed: { failure: { message } }, }); }); }); @@ -85,8 +82,8 @@ test('Worker cancels activity and reports cancellation', async (t) => { const { worker } = t.context; await runWorker(t, async () => { worker.native.emit({ - taskToken: Buffer.from(uuid4()), activity: { + taskToken: Buffer.from(uuid4()), activityId: 'abc', start: { activityType: JSON.stringify(['@activities', 'waitForCancellation']), @@ -95,16 +92,13 @@ test('Worker cancels activity and reports cancellation', async (t) => { }, }); const taskToken = Buffer.from(uuid4()); - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - cancel: {}, - }, + activityId: 'abc', + cancel: {}, }); - compareCompletion(t, completion, { - taskToken, - activity: { canceled: {} }, + compareCompletion(t, completion.result, { + canceled: {}, }); }); }); @@ -113,8 +107,8 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => { const { worker } = t.context; await runWorker(t, async () => { worker.native.emit({ - taskToken: Buffer.from(uuid4()), activity: { + taskToken: Buffer.from(uuid4()), activityId: 'abc', start: { activityType: JSON.stringify(['@activities', 'cancellableFetch']), @@ -123,16 +117,13 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => { }, }); const taskToken = Buffer.from(uuid4()); - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - cancel: {}, - }, + activityId: 'abc', + cancel: {}, }); - compareCompletion(t, completion, { - taskToken, - activity: { canceled: {} }, + compareCompletion(t, completion.result, { + canceled: {}, }); }); }); @@ -141,22 +132,19 @@ test('Activity Context heartbeat is sent to core', async (t) => { const { worker } = t.context; await runWorker(t, async () => { const taskToken = Buffer.from(uuid4()); - const completionPromise = worker.native.runAndWaitCompletion({ + const completionPromise = worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - start: { - activityType: JSON.stringify(['@activities', 'progressiveSleep']), - input: defaultDataConverter.toPayloads(), - }, + activityId: 'abc', + start: { + activityType: JSON.stringify(['@activities', 'progressiveSleep']), + input: defaultDataConverter.toPayloads(), }, }); t.is(await worker.native.untilHeartbeat('abc'), 1); t.is(await worker.native.untilHeartbeat('abc'), 2); t.is(await worker.native.untilHeartbeat('abc'), 3); - compareCompletion(t, await completionPromise, { - taskToken, - activity: { completed: { result: defaultDataConverter.toPayloads(undefined) } }, + compareCompletion(t, (await completionPromise).result, { + completed: { result: defaultDataConverter.toPayload(undefined) }, }); }); }); diff --git a/packages/test/src/test-worker-lifecycle.ts b/packages/test/src/test-worker-lifecycle.ts index 330538c6f..29cb661dc 100644 --- a/packages/test/src/test-worker-lifecycle.ts +++ b/packages/test/src/test-worker-lifecycle.ts @@ -9,8 +9,8 @@ import { Worker as MockedWorker } from './mock-native-worker'; import { RUN_INTEGRATION_TESTS } from './helpers'; if (RUN_INTEGRATION_TESTS) { - test.serial('run shuts down gracefully', async (t) => { - const worker = new Worker(__dirname, { shutdownGraceTime: '500ms', activitiesPath: null }); + test.serial.skip('run shuts down gracefully', async (t) => { + const worker = await Worker.create(__dirname, { shutdownGraceTime: '500ms', activitiesPath: null }); t.is(worker.getState(), 'INITIALIZED'); const p = worker.run('shutdown-test'); t.is(worker.getState(), 'RUNNING'); @@ -42,7 +42,7 @@ test.serial('Mocked run throws if not shut down gracefully', async (t) => { worker.native.shutdown = () => undefined; // Make sure shutdown does not emit core shutdown process.emit('SIGINT', 'SIGINT'); await t.throwsAsync(p, { - message: 'Timed out waiting while waiting for worker to shutdown gracefully', + message: 'Timed out while waiting for worker to shutdown gracefully', }); t.is(worker.getState(), 'FAILED'); await t.throwsAsync(worker.run('shutdown-test'), { message: 'Poller was aleady started' }); @@ -55,8 +55,8 @@ test('Mocked worker suspends and resumes', async (t) => { worker.suspendPolling(); t.is(worker.getState(), 'SUSPENDED'); // Worker finishes its polling before suspension - await worker.native.runAndWaitCompletion({ workflow: { runId: 'abc' } }); - const completion = worker.native.runAndWaitCompletion({ workflow: { runId: 'abc' } }); + await worker.native.runWorkflowActivation({ runId: 'abc' }); + const completion = worker.native.runWorkflowActivation({ runId: 'abc' }); await t.throwsAsync( Promise.race([ sleep(10).then(() => { diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index ba7c2755e..faad4ff2e 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -35,19 +35,20 @@ test.beforeEach(async (t) => { async function activate(t: ExecutionContext, activation: coresdk.workflow_activation.IWFActivation) { const taskToken = u8(`${Math.random()}`); const arr = await t.context.workflow.activate(taskToken, activation); - const req = coresdk.TaskCompletion.decodeDelimited(arr); + const req = coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(arr); t.deepEqual(req.taskToken, taskToken); - t.is(req.variant, 'workflow'); return req; } function compareCompletion( t: ExecutionContext, - req: coresdk.TaskCompletion, + req: coresdk.workflow_completion.WFActivationCompletion, expected: coresdk.workflow_completion.IWFActivationCompletion ) { - const actual = req.toJSON().workflow; - t.deepEqual(actual, coresdk.workflow_completion.WFActivationCompletion.create(expected).toJSON()); + t.deepEqual( + req.toJSON(), + coresdk.workflow_completion.WFActivationCompletion.create({ ...expected, taskToken: req.taskToken }).toJSON() + ); } function makeSuccess( @@ -131,12 +132,10 @@ function makeSignalWorkflow( return makeActivation(timestamp, { signalWorkflow: { signalName, input: args } }); } -function makeCompleteWorkflowExecution( - ...payloads: coresdk.common.IPayload[] -): coresdk.workflow_commands.IWorkflowCommand { - if (payloads.length === 0) payloads = [{ metadata: { encoding: u8('binary/null') } }]; +function makeCompleteWorkflowExecution(result?: coresdk.common.IPayload): coresdk.workflow_commands.IWorkflowCommand { + result ??= { metadata: { encoding: u8('binary/null') } }; return { - completeWorkflowExecution: { result: payloads }, + completeWorkflowExecution: { result }, }; } @@ -217,11 +216,11 @@ function cleanStackTrace(stack: string) { return stack.replace(/\bat (\S+) \(.*\)/g, (_, m0) => `at ${m0}`); } -function cleanWorkflowFailureStackTrace(req: coresdk.TaskCompletion, commandIndex = 0) { +function cleanWorkflowFailureStackTrace(req: coresdk.workflow_completion.WFActivationCompletion, commandIndex = 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - req.workflow!.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace = cleanStackTrace( + req.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace = cleanStackTrace( // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - req.workflow!.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace! + req.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace! ); return req; } @@ -769,7 +768,7 @@ test('http', async (t) => { { const req = await activate( t, - makeResolveActivity('0', { completed: { result: defaultDataConverter.toPayloads(result) } }) + makeResolveActivity('0', { completed: { result: defaultDataConverter.toPayload(result) } }) ); compareCompletion( t, diff --git a/packages/worker/README.md b/packages/worker/README.md index c3ac560bc..ec755c2a9 100644 --- a/packages/worker/README.md +++ b/packages/worker/README.md @@ -15,7 +15,7 @@ async function run() => { // (assuming package was bootstrapped with `npm init @temporalio`). // Worker connects to localhost by default and uses console error for logging. // Customize the worker by passing options a second parameter to the constructor. - const worker = new Worker(__dirname); + const worker = await Worker.create(__dirname); // Bind to the `tutorial` queue and start accepting tasks await worker.run('tutorial'); } diff --git a/packages/worker/native/Cargo.lock b/packages/worker/native/Cargo.lock index 71da846c5..035142ce6 100644 --- a/packages/worker/native/Cargo.lock +++ b/packages/worker/native/Cargo.lock @@ -206,17 +206,6 @@ dependencies = [ "syn", ] -[[package]] -name = "displaydoc" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc2ab4d5a16117f9029e9a6b5e4e79f4c67f6519bc134210d4d4a04ba31f41b" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "either" version = "1.6.1" @@ -1185,7 +1174,6 @@ dependencies = [ "crossbeam", "dashmap", "derive_more", - "displaydoc", "futures", "itertools 0.10.0", "once_cell", @@ -1202,6 +1190,7 @@ dependencies = [ "tonic", "tonic-build", "tracing", + "tracing-futures", "tracing-opentelemetry", "tracing-subscriber", "url", @@ -1218,6 +1207,7 @@ dependencies = [ "prost", "prost-types", "temporal-sdk-core", + "tokio", ] [[package]] @@ -1298,9 +1288,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.1.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6714d663090b6b0acb0fa85841c6d66233d150cdb2602c8f9b8abb03370beb3f" +checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" dependencies = [ "autocfg", "bytes", @@ -1315,9 +1305,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" dependencies = [ "proc-macro2", "quote", diff --git a/packages/worker/native/Cargo.toml b/packages/worker/native/Cargo.toml index d67de1be7..b8c03c083 100644 --- a/packages/worker/native/Cargo.toml +++ b/packages/worker/native/Cargo.toml @@ -16,6 +16,7 @@ futures = { version = "0.3", features = ["executor"] } neon = { version = "0.7", default-features = false, features = ["napi-4", "event-queue-api"] } prost = "0.7" prost-types = "0.7" +tokio = "1.4.0" [dependencies.temporal-sdk-core] version = "*" diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 6fa0dc51c..889aa89a9 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -25,8 +25,23 @@ export interface ServerOptions { export interface Worker {} export declare type PollCallback = (err?: Error, result: ArrayBuffer) => void; -export declare function newWorker(serverOptions: ServerOptions): Worker; +export declare type WorkerCallback = (err?: Error, result: Worker) => void; +export declare type VoidCallback = (err?: Error, result: void) => void; + +export declare function newWorker(serverOptions: ServerOptions, callback: WorkerCallback): void; export declare function workerShutdown(worker: Worker): void; -export declare function workerPoll(worker: Worker, queueName: string, callback: PollCallback): void; -export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): void; -export declare function workerSendActivityHeartbeat(worker: Worker, activityId: string, details?: ArrayBuffer): void; +export declare function workerBreakLoop(worker: Worker, callback: VoidCallback): void; +export declare function workerPollWorkflowActivation(worker: Worker, queueName: string, callback: PollCallback): void; +export declare function workerCompleteWorkflowActivation( + worker: Worker, + result: ArrayBuffer, + callback: VoidCallback +): void; +export declare function workerPollActivityTask(worker: Worker, queueName: string, callback: PollCallback): void; +export declare function workerCompleteActivityTask(worker: Worker, result: ArrayBuffer, callback: VoidCallback): void; +export declare function workerSendActivityHeartbeat( + worker: Worker, + activityId: string, + details?: ArrayBuffer, + callback: VoidCallback +): void; diff --git a/packages/worker/native/sdk-core b/packages/worker/native/sdk-core index 95573783c..6180a4c2d 160000 --- a/packages/worker/native/sdk-core +++ b/packages/worker/native/sdk-core @@ -1 +1 @@ -Subproject commit 95573783ce045cb4c04c50bd2835c9489e6926b2 +Subproject commit 6180a4c2dd36f3242c86f4fba8c1abd6e2a84e16 diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index b9a4fb970..ba8c1cb2e 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -1,64 +1,289 @@ use neon::{prelude::*, register_module}; use prost::Message; -use std::{ - sync::{ - mpsc::{sync_channel, Receiver, SyncSender}, - Arc, - }, - time::Duration, -}; +use std::{fmt::Display, future::Future, sync::Arc, time::Duration}; use temporal_sdk_core::{ - init, - protos::coresdk::{self, TaskCompletion}, - Core, CoreInitOptions, ServerGatewayOptions, Url, + init, protos::coresdk::workflow_completion::WfActivationCompletion, + protos::coresdk::ActivityHeartbeat, protos::coresdk::ActivityTaskCompletion, Core, + CoreInitOptions, ServerGatewayOptions, Url, }; +use tokio::sync::mpsc::{channel, Sender}; -type BoxedWorker = JsBox>; - -/// A request from lang to poll a queue -pub struct PollRequest { - /// Name of queue to poll - queue_name: String, - /// Used to send the poll result back into lang - callback: Root, +/// A request from JS to bridge to core +pub enum Request { + /// A request to break from the thread loop, should be sent from JS when it + /// encounters a CoreError::ShuttingDown and there are no outstanding + /// completions + BreakLoop { callback: Root }, + /// A request to shutdown core, JS should wait on CoreError::ShuttingDown + /// before exiting to allow draining of pending tasks + Shutdown, + /// A request to poll for workflow activations + PollWorkflowActivation { + /// Name of queue to poll + queue_name: String, + /// Used to send the result back into JS + callback: Root, + }, + /// A request to complete a single workflow activation + CompleteWorkflowActivation { + completion: WfActivationCompletion, + /// Used to send the result back into JS + callback: Root, + }, + /// A request to poll for activity tasks + PollActivityTask { + /// Name of queue to poll + queue_name: String, + /// Used to report completion or error back into JS + callback: Root, + }, + /// A request to complete a single activity task + CompleteActivityTask { + completion: ActivityTaskCompletion, + /// Used to send the result back into JS + callback: Root, + }, + /// A request to send a heartbeat from a running activity + SendActivityHeartbeat { + heartbeat: ActivityHeartbeat, + /// Used to send the result back into JS + callback: Root, + }, } +/// Worker struct, hold a reference for the channel sender responsible for sending requests from +/// JS to a bridge thread which forwards them to core pub struct Worker { - core: Box, - sender: SyncSender, + sender: Sender, } +/// Box it so we can use Worker from JS +type BoxedWorker = JsBox; + impl Finalize for Worker {} -impl Worker { - pub fn new(gateway_opts: ServerGatewayOptions) -> (Self, Receiver) { - // Set capacity to 1 because we only poll from a single thread - let (sender, receiver) = sync_channel::(1); - let core = init(CoreInitOptions { gateway_opts }).unwrap(); +/// Send a result to JS via callback using an [EventQueue] +fn send_result(queue: Arc, callback: Root, res_fn: F) +where + F: for<'a> FnOnce(&mut TaskContext<'a>) -> NeonResult> + Send + 'static, + T: Value, +{ + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = cx.undefined(); + let result = res_fn(&mut cx)?; + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); +} - let worker = Worker { - core: Box::new(core), - sender, - }; +/// Send an error to JS via callback using an [EventQueue] +fn send_error(queue: Arc, callback: Root, error: T) +where + T: Display + Send + 'static, +{ + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + callback_with_error(&mut cx, callback, error) + }); +} - (worker, receiver) +/// Call [callback] with given error +fn callback_with_error<'a, T>( + cx: &mut impl Context<'a>, + callback: Handle, + error: T, +) -> NeonResult<()> +where + T: Display + Send + 'static, +{ + let this = cx.undefined(); + // TODO: create better JS error types + let error = JsError::error(cx, format!("{}", error))?; + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(cx, this, args)?; + Ok(()) +} + +/// When Future completes, call given JS callback using a neon::EventQueue with either error or +/// undefined +async fn void_future_to_js(queue: Arc, callback: Root, f: F) -> () +where + E: Display + Send + 'static, + F: Future> + Send + 'static, +{ + match f.await { + Ok(()) => { + send_result(queue, callback, |cx| Ok(cx.undefined())); + } + Err(err) => { + send_error(queue, callback, err); + } } +} - pub fn poll(&self, queue_name: String) -> ::temporal_sdk_core::Result { - self.core.poll_task(&queue_name) +/// Builds a tokio runtime and starts polling on [Request]s via an internal channel. +/// Bridges requests from JS to core and sends responses back to JS using a neon::EventQueue. +/// Blocks current thread until a [BreakPoller] request is received in channel. +fn start_bridge_loop( + core_init_options: CoreInitOptions, + queue: Arc, + callback: Root, +) { + // TODO: make capacity configurable + let (sender, mut receiver) = channel::(1000); + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + match init(core_init_options).await { + Err(err) => { + send_error(queue.clone(), callback, err); + } + Ok(result) => { + send_result( + queue.clone(), + callback, + |cx| Ok(cx.boxed(Worker { sender })), + ); + let core = Arc::new(result); + loop { + // TODO: handle this error + let request = receiver.recv().await.unwrap(); + let core = core.clone(); + let queue = queue.clone(); + + match request { + Request::Shutdown => { + core.shutdown(); + } + Request::BreakLoop { callback } => { + send_result(queue, callback, |cx| Ok(cx.undefined())); + break; + } + Request::PollWorkflowActivation { + queue_name, + callback, + } => { + tokio::spawn(handle_poll_workflow_activation_request( + core, queue, queue_name, callback, + )); + } + Request::PollActivityTask { + queue_name, + callback, + } => { + tokio::spawn(handle_poll_activity_task_request( + core, queue, queue_name, callback, + )); + } + Request::CompleteWorkflowActivation { + completion, + callback, + } => { + tokio::spawn(void_future_to_js(queue, callback, async move { + core.complete_workflow_task(completion).await + })); + } + Request::CompleteActivityTask { + completion, + callback, + } => { + tokio::spawn(void_future_to_js(queue, callback, async move { + core.complete_activity_task(completion).await + })); + } + Request::SendActivityHeartbeat { + heartbeat, + callback, + } => { + tokio::spawn(void_future_to_js(queue, callback, async move { + // send_activity_heartbeat returns Result<(), ()> and () + // doesn't implement Display, syntesize an error + match core.send_activity_heartbeat(heartbeat).await { + Ok(()) => Ok(()), + Err(()) => Err("Failed to send activity heartbeat"), + } + })); + } + } + } + } + } + }) +} + +/// Called within the poll loop thread, calls core and triggers JS callback with result +async fn handle_poll_workflow_activation_request( + core: Arc, + queue: Arc, + queue_name: String, + callback: Root, +) { + match core.poll_workflow_task(&queue_name).await { + Ok(task) => { + send_result(queue, callback, move |cx| { + let len = task.encoded_len(); + let mut result = JsArrayBuffer::new(cx, len as u32)?; + cx.borrow_mut(&mut result, |data| { + let mut slice = data.as_mut_slice::(); + if let Err(_) = task.encode(&mut slice) { + panic!("Failed to encode task") + }; + }); + Ok(result) + }); + } + Err(err) => { + send_error(queue, callback, err); + } + } +} + +/// Called within the poll loop thread, calls core and triggers JS callback with result +async fn handle_poll_activity_task_request( + core: Arc, + queue: Arc, + queue_name: String, + callback: Root, +) { + match core.poll_activity_task(&queue_name).await { + Ok(task) => { + send_result(queue, callback, move |cx| { + let len = task.encoded_len(); + let mut result = JsArrayBuffer::new(cx, len as u32)?; + cx.borrow_mut(&mut result, |data| { + let mut slice = data.as_mut_slice::(); + if let Err(_) = task.encode(&mut slice) { + panic!("Failed to encode task") + }; + }); + Ok(result) + }); + } + Err(err) => { + send_error(queue, callback, err); + } } } // Below are functions exported to JS -/// Create a new worker. -/// Immediately spawns a poller thread that will block on [PollRequest]s -fn worker_new(mut cx: FunctionContext) -> JsResult { +/// Create a new worker asynchronously. +/// Immediately spawns a poller thread that will block on [Request]s +/// Worker is returned to JS using supplied callback +fn worker_new(mut cx: FunctionContext) -> JsResult { let options = cx.argument::(0)?; let url = options .get(&mut cx, "url")? .downcast_or_throw::(&mut cx)? .value(&mut cx); + let callback = cx.argument::(1)?.root(&mut cx); let gateway_opts = ServerGatewayOptions { target_url: Url::parse(&url).unwrap(), @@ -81,98 +306,131 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .value(&mut cx) as u64, ), }; - let (worker, receiver) = Worker::new(gateway_opts); - let worker = Arc::new(worker); - let queue = cx.queue(); - let cloned_worker = Arc::clone(&worker); - - std::thread::spawn(move || loop { - let item = receiver.recv().unwrap(); - let queue_name = item.queue_name; - let callback = item.callback; - let result = worker.poll(queue_name); - match result { - Ok(task) => { - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = cx.undefined(); - let len = task.encoded_len(); - let mut result = JsArrayBuffer::new(&mut cx, len as u32)?; - cx.borrow_mut(&mut result, |data| { - let mut slice = data.as_mut_slice::(); - if let Err(_) = task.encode(&mut slice) { - panic!("Failed to encode task") - }; - }); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); - } - Err(err) => { - // TODO: on the JS side we consider all errors fatal, revise this later - let should_break = match err { - temporal_sdk_core::CoreError::ShuttingDown => true, - _ => false, - }; - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = JsError::error(&mut cx, format!("{}", err))?; - let result = cx.undefined(); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); - if should_break { - break; - } - } - } + + let queue = Arc::new(cx.queue()); + std::thread::spawn(move || { + start_bridge_loop(CoreInitOptions { gateway_opts }, queue, callback) }); - Ok(cx.boxed(cloned_worker)) + Ok(cx.undefined()) +} + +/// Cause the bridge loop to break, freeing up the thread +fn worker_break_loop(mut cx: FunctionContext) -> JsResult { + let worker = cx.argument::(0)?; + let callback = cx.argument::(1)?; + let request = Request::BreakLoop { + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; + Ok(cx.undefined()) } -/// Initiate a single poll request. -/// Will block if a poll request is already in-flight -fn worker_poll(mut cx: FunctionContext) -> JsResult { +/// Initiate a single workflow activation poll request. +/// There should be only one concurrent poll request for this type. +fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; let queue_name = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let item = PollRequest { + let callback = cx.argument::(2)?; + let request = Request::PollWorkflowActivation { queue_name, - callback, + callback: callback.root(&mut cx), }; - match worker.sender.send(item) { - Ok(_) => Ok(cx.undefined()), - Err(err) => { - let error = JsError::error(&mut cx, format!("{}", err))?; - cx.throw(error) - } + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; } + Ok(cx.undefined()) } -/// Submit a task completion to core. -fn worker_complete_task(mut cx: FunctionContext) -> JsResult { +/// Initiate a single activity task poll request. +/// There should be only one concurrent poll request for this type. +fn worker_poll_activity_task(mut cx: FunctionContext) -> JsResult { + let worker = cx.argument::(0)?; + let queue_name = cx.argument::(1)?.value(&mut cx); + let callback = cx.argument::(2)?; + let request = Request::PollActivityTask { + queue_name, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + } + Ok(cx.undefined()) +} + +/// Submit a workflow activation completion to core. +fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; let completion = cx.argument::(1)?; + let callback = cx.argument::(2)?; let result = cx.borrow(&completion, |data| { - TaskCompletion::decode_length_delimited(data.as_slice::()) + WfActivationCompletion::decode_length_delimited(data.as_slice::()) }); match result { Ok(completion) => { - // TODO: submit from background thread (using neon::Task)? - if let Err(err) = worker.core.complete_task(completion) { - let error = JsError::error(&mut cx, format!("{}", err))?; - cx.throw(error) - } else { - Ok(cx.undefined()) - } + let request = Request::CompleteWorkflowActivation { + completion, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; } - Err(_) => cx.throw_type_error("Cannot decode Completion from buffer"), - } + Err(_) => callback_with_error(&mut cx, callback, "Cannot decode Completion from buffer")?, + }; + Ok(cx.undefined()) +} + +/// Submit an activity task completion to core. +fn worker_complete_activity_task(mut cx: FunctionContext) -> JsResult { + let worker = cx.argument::(0)?; + let result = cx.argument::(1)?; + let callback = cx.argument::(2)?; + let result = cx.borrow(&result, |data| { + ActivityTaskCompletion::decode_length_delimited(data.as_slice::()) + }); + match result { + Ok(completion) => { + let request = Request::CompleteActivityTask { + completion, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; + } + Err(_) => callback_with_error(&mut cx, callback, "Cannot decode Completion from buffer")?, + }; + Ok(cx.undefined()) +} + +/// Submit an activity heartbeat to core. +fn worker_send_activity_heartbeat(mut cx: FunctionContext) -> JsResult { + let worker = cx.argument::(0)?; + let result = cx.argument::(1)?; + let callback = cx.argument::(2)?; + let result = cx.borrow(&result, |data| { + ActivityHeartbeat::decode_length_delimited(data.as_slice::()) + }); + match result { + Ok(heartbeat) => { + let request = Request::SendActivityHeartbeat { + heartbeat, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; + } + Err(_) => callback_with_error( + &mut cx, + callback, + "Cannot decode ActivityHeartbeat from buffer", + )?, + }; + Ok(cx.undefined()) } /// Request shutdown of the worker. @@ -180,19 +438,29 @@ fn worker_complete_task(mut cx: FunctionContext) -> JsResult { /// shutdown. fn worker_shutdown(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; - match worker.core.shutdown() { - Ok(_) => Ok(cx.undefined()), - Err(err) => { - let error = JsError::error(&mut cx, format!("{}", err))?; - cx.throw(error) - } + match worker.sender.blocking_send(Request::Shutdown) { + Err(err) => cx.throw_error(format!("{}", err)), + _ => Ok(cx.undefined()), } } register_module!(mut cx, { cx.export_function("newWorker", worker_new)?; cx.export_function("workerShutdown", worker_shutdown)?; - cx.export_function("workerPoll", worker_poll)?; - cx.export_function("workerCompleteTask", worker_complete_task)?; + cx.export_function("workerBreakLoop", worker_break_loop)?; + cx.export_function( + "workerPollWorkflowActivation", + worker_poll_workflow_activation, + )?; + cx.export_function("workerPollActivityTask", worker_poll_activity_task)?; + cx.export_function( + "workerCompleteWorkflowActivation", + worker_complete_workflow_activation, + )?; + cx.export_function("workerCompleteActivityTask", worker_complete_activity_task)?; + cx.export_function( + "workerSendActivityHeartbeat", + worker_send_activity_heartbeat, + )?; Ok(()) }); diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 3400a165a..0a0a389f7 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -40,7 +40,7 @@ export class Activity { return { canceled: {} }; } console.log('completed activity', { result }); - return { completed: { result: this.dataConverter.toPayloads(result) } }; + return { completed: { result: this.dataConverter.toPayload(result) } }; } catch (err) { if (this.cancelRequested) { return { canceled: {} }; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index c2b2bc049..7882d342c 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -2,18 +2,7 @@ import { basename, extname, resolve } from 'path'; import os from 'os'; import { readdirSync } from 'fs'; import { promisify } from 'util'; -import { - BehaviorSubject, - merge, - Observable, - OperatorFunction, - Subject, - partition, - pipe, - EMPTY, - throwError, - of, -} from 'rxjs'; +import { BehaviorSubject, merge, Observable, OperatorFunction, Subject, pipe, EMPTY, throwError, of } from 'rxjs'; import { catchError, concatMap, @@ -24,7 +13,6 @@ import { map, mergeMap, repeat, - share, takeUntil, tap, } from 'rxjs/operators'; @@ -203,48 +191,56 @@ export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledW export type State = 'INITIALIZED' | 'RUNNING' | 'STOPPED' | 'STOPPING' | 'FAILED' | 'SUSPENDED'; -type TaskForWorkflow = Required<{ taskToken: Uint8Array; workflow: coresdk.workflow_activation.WFActivation }>; -type TaskForActivity = Required<{ taskToken: Uint8Array; activity: coresdk.activity_task.ActivityTask }>; - +type ExtractToPromise = T extends (err: any, result: infer R) => void ? Promise : never; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +type Last = T extends [...infer I, infer L] ? L : never; +type LastParameter any> = Last>; type OmitFirst = T extends [any, ...infer REST] ? REST : never; -type RestParams = T extends (...args: any[]) => any ? OmitFirst> : never; -type OmitFirstParam = T extends (...args: any[]) => any ? (...args: RestParams) => ReturnType : never; +type OmitLast = T extends [...infer REST, any] ? REST : never; +type OmitFirstParam = T extends (...args: any[]) => any + ? (...args: OmitFirst>) => ReturnType + : never; +type Promisify = T extends (...args: any[]) => void + ? (...args: OmitLast>) => ExtractToPromise> + : never; export interface NativeWorkerLike { shutdown: OmitFirstParam; - poll(queueName: string): Promise; - completeTask: OmitFirstParam; - sendActivityHeartbeat: OmitFirstParam; + breakLoop: Promisify>; + pollWorkflowActivation: Promisify>; + pollActivityTask: Promisify>; + completeWorkflowActivation: Promisify>; + completeActivityTask: Promisify>; + sendActivityHeartbeat: Promisify>; } export interface WorkerConstructor { - new (options?: ServerOptions): NativeWorkerLike; + create(options?: ServerOptions): Promise; } export class NativeWorker implements NativeWorkerLike { - protected readonly native: native.Worker; - protected readonly pollFn: (worker: native.Worker, queueName: string) => Promise; - - public constructor(options?: ServerOptions) { + public readonly pollWorkflowActivation: Promisify>; + public readonly pollActivityTask: Promisify>; + public readonly completeWorkflowActivation: Promisify>; + public readonly completeActivityTask: Promisify>; + public readonly sendActivityHeartbeat: Promisify>; + public readonly breakLoop: Promisify>; + public readonly shutdown: OmitFirstParam; + + public static async create(options?: ServerOptions): Promise { const compiledOptions = compileServerOptions({ ...getDefaultServerOptions(), ...options }); - this.native = native.newWorker(compiledOptions); - this.pollFn = promisify(native.workerPoll); - } - - public shutdown(): void { - return native.workerShutdown(this.native); - } - - public poll(queueName: string): Promise { - return this.pollFn(this.native, queueName); + const nativeWorker = await promisify(native.newWorker)(compiledOptions); + return new NativeWorker(nativeWorker); } - public completeTask(result: ArrayBuffer): void { - return native.workerCompleteTask(this.native, result); - } - - public sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { - return native.workerSendActivityHeartbeat(this.native, activityId, details); + protected constructor(nativeWorker: native.Worker) { + this.pollWorkflowActivation = promisify(native.workerPollWorkflowActivation).bind(undefined, nativeWorker); + this.pollActivityTask = promisify(native.workerPollActivityTask).bind(undefined, nativeWorker); + this.completeWorkflowActivation = promisify(native.workerCompleteWorkflowActivation).bind(undefined, nativeWorker); + this.completeActivityTask = promisify(native.workerCompleteActivityTask).bind(undefined, nativeWorker); + this.sendActivityHeartbeat = promisify(native.workerSendActivityHeartbeat).bind(undefined, nativeWorker); + this.breakLoop = promisify(native.workerBreakLoop).bind(undefined, nativeWorker); + this.shutdown = native.workerShutdown.bind(undefined, nativeWorker); } } @@ -259,7 +255,6 @@ export class Worker { activityId: string; details?: any; }>(); - protected readonly pollSubject = new Subject(); protected stateSubject: BehaviorSubject = new BehaviorSubject('INITIALIZED'); protected readonly nativeWorker: NativeWorkerLike; @@ -267,13 +262,21 @@ export class Worker { /** * Create a new Worker. - * This method immediately connects to the server and will throw on connection failure. + * This method initiates a connection to the server and will throw (asynchronously) on connection failure. + * @param pwd - Used to resolve relative paths for locating and importing activities and workflows. + */ + public static async create(pwd: string, options?: WorkerOptions): Promise { + const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor; + const nativeWorker = await nativeWorkerCtor.create(options?.serverOptions); + return new this(nativeWorker, pwd, options); + } + + /** + * Create a new Worker from nativeWorker. * @param pwd - Used to resolve relative paths for locating and importing activities and workflows. */ - constructor(public readonly pwd: string, options?: WorkerOptions) { - // Typescript derives the type of `this.constructor` as Function, work around it by casting to any - const nativeWorkerCtor: WorkerConstructor = (this.constructor as any).nativeWorkerCtor; - this.nativeWorker = new nativeWorkerCtor(options?.serverOptions); + protected constructor(nativeWorker: NativeWorkerLike, public readonly pwd: string, options?: WorkerOptions) { + this.nativeWorker = nativeWorker; // TODO: merge activityDefaults this.options = compileWorkerOptions({ ...getDefaultOptions(pwd), ...options }); @@ -385,7 +388,7 @@ export class Worker { filter((state): state is 'STOPPING' => state === 'STOPPING'), delay(this.options.shutdownGraceTimeMs), map(() => { - throw new Error('Timed out waiting while waiting for worker to shutdown gracefully'); + throw new Error('Timed out while waiting for worker to shutdown gracefully'); }) ); } @@ -393,14 +396,14 @@ export class Worker { /** * An observable which repeatedly polls for new tasks unless worker becomes suspended */ - protected pollLoop$(queueName: string): Observable { + protected pollLoop$(pollFn: () => Promise): Observable { return of(this.stateSubject).pipe( map((state) => state.getValue()), concatMap((state) => { switch (state) { case 'RUNNING': case 'STOPPING': - return this.nativeWorker.poll(queueName); + return pollFn(); case 'SUSPENDED': // Completes once we're out of SUSPENDED state return this.stateSubject.pipe( @@ -414,8 +417,7 @@ export class Worker { throw new Error(`Unexpected state ${state}`); } }), - repeat(), - map((buffer) => coresdk.Task.decode(new Uint8Array(buffer))) + repeat() ); } @@ -423,26 +425,18 @@ export class Worker { * The main observable of the worker, starts the poll loop and takes care of state transitions * in case of an error during poll or shutdown */ - protected poller$(queueName: string): Observable { - return merge(this.gracefulShutdown$(), this.pollLoop$(queueName)).pipe( - catchError((err) => (err.message.includes('[Core::shutdown]') ? EMPTY : throwError(err))), - tap({ - complete: () => { - this.state = 'STOPPED'; - }, - error: () => { - this.state = 'FAILED'; - }, - }) + protected poller$(pollFn: () => Promise): Observable { + return merge(this.gracefulShutdown$(), this.pollLoop$(pollFn)).pipe( + catchError((err) => (err.message.includes('Core is shut down') ? EMPTY : throwError(err))) ); } /** * Process activity tasks */ - protected activityOperator(): OperatorFunction { + protected activityOperator(): OperatorFunction { return pipe( - closeableGroupBy((task) => task.activity.activityId), + closeableGroupBy((task) => task.activityId), mergeMap((group$) => { return group$.pipe( mergeMapWithState(async (activity: Activity | undefined, task) => { @@ -452,15 +446,14 @@ export class Worker { let output: | { type: 'result'; result: coresdk.activity_result.IActivityResult } | { type: 'run'; activity: Activity }; - const { taskToken } = task; - const { variant } = task.activity; + const { taskToken, variant, activityId } = task; if (!variant) { throw new Error('Got an activity task without a "variant" attribute'); } switch (variant) { case 'start': { - const { start, activityId } = task.activity; + const { start } = task; if (!start) { throw new Error('Got a "start" activity task without a "start" attribute'); } @@ -496,7 +489,6 @@ export class Worker { break; } case 'cancel': { - const { activityId } = task.activity; if (activity === undefined) { this.log.error('Tried to cancel a non-existing activity', { activityId }); output = { type: 'result', result: { failed: { failure: { message: 'Activity not found' } } } }; @@ -528,13 +520,8 @@ export class Worker { return { taskToken, result }; }), filter((result: T): result is Exclude => result !== undefined), - map(({ taskToken, result }) => - coresdk.TaskCompletion.encodeDelimited({ - taskToken: taskToken, - activity: result, - }).finish() - ), - tap(group$.close) + map((result) => coresdk.ActivityTaskCompletion.encodeDelimited(result).finish()), + tap(group$.close) // Close the group after activity task completion ); }) ); @@ -544,9 +531,11 @@ export class Worker { * Process workflow activations * @param queueName used to propagate the current task queue to the workflow */ - protected workflowOperator(queueName: string): OperatorFunction { + protected workflowOperator( + queueName: string + ): OperatorFunction { return pipe( - closeableGroupBy((task) => task.workflow.runId), + closeableGroupBy((task) => task.runId), mergeMap((group$) => { return group$.pipe( mergeMapWithState(async (workflow: Workflow | undefined, task) => { @@ -555,7 +544,7 @@ export class Worker { try { // Find a workflow start job in the activation jobs list // TODO: should this always be the first job in the list? - const maybeStartWorkflow = task.workflow.jobs.find((j) => j.startWorkflow); + const maybeStartWorkflow = task.jobs.find((j) => j.startWorkflow); if (maybeStartWorkflow !== undefined) { const attrs = maybeStartWorkflow.startWorkflow; if (!(attrs && attrs.workflowId && attrs.workflowType)) { @@ -568,7 +557,7 @@ export class Worker { this.log.debug('Creating workflow', { taskToken, workflowId: attrs.workflowId, - runId: task.workflow.runId, + runId: task.runId, }); workflow = await Workflow.create(attrs.workflowId, queueName); // TODO: this probably shouldn't be here, consider alternative implementation @@ -583,24 +572,20 @@ export class Worker { throw new Error('Received workflow activation for an untracked workflow with no start workflow job'); } } catch (error) { - this.log.error('Failed to create a workflow', { taskToken, runId: task.workflow.runId, error }); + this.log.error('Failed to create a workflow', { taskToken, runId: task.runId, error }); let arr: Uint8Array; if (error instanceof LoaderError) { - arr = coresdk.TaskCompletion.encodeDelimited({ + arr = coresdk.workflow_completion.WFActivationCompletion.encodeDelimited({ taskToken: task.taskToken, - workflow: { - successful: { - commands: [{ failWorkflowExecution: { failure: errorToUserCodeFailure(error) } }], - }, + successful: { + commands: [{ failWorkflowExecution: { failure: errorToUserCodeFailure(error) } }], }, }).finish(); } else { - arr = coresdk.TaskCompletion.encodeDelimited({ + arr = coresdk.workflow_completion.WFActivationCompletion.encodeDelimited({ taskToken: task.taskToken, - workflow: { - failed: { - failure: errorToUserCodeFailure(error), - }, + failed: { + failure: errorToUserCodeFailure(error), }, }).finish(); } @@ -610,7 +595,8 @@ export class Worker { } this.log.debug('Activating workflow', { taskToken }); - const arr = await workflow.activate(task.taskToken, task.workflow); + // TODO: single param + const arr = await workflow.activate(task.taskToken, task); return { state: workflow, output: { close: false, arr } }; }, undefined), tap(({ close }) => void close && group$.close()) @@ -627,15 +613,15 @@ export class Worker { return this.activityHeartbeatSubject.pipe( takeUntil(this.stateSubject.pipe(filter((value) => value === 'STOPPED' || value === 'FAILED'))), tap(({ activityId }) => this.log.debug('Got activity heartbeat', { activityId })), - map(({ activityId, details }) => { + mergeMap(async ({ activityId, details }) => { const payload = this.options.dataConverter.toPayload(details); if (!payload) { - this.nativeWorker.sendActivityHeartbeat(activityId); + await this.nativeWorker.sendActivityHeartbeat(activityId, undefined); return; } const arr = coresdk.common.Payload.encode(payload).finish(); - this.nativeWorker.sendActivityHeartbeat( + await this.nativeWorker.sendActivityHeartbeat( activityId, arr.buffer.slice(arr.byteOffset, arr.byteLength + arr.byteOffset) ); @@ -644,7 +630,31 @@ export class Worker { } /** - * Start polling on tasks, completes after graceful shutdown after a receiving a shutdown signal + * Poll core for `WFActivation`s while respecting worker state + */ + protected workflow$(queueName: string): Observable { + return this.poller$(async () => { + const buffer = await this.nativeWorker.pollWorkflowActivation(queueName); + const task = coresdk.workflow_activation.WFActivation.decode(new Uint8Array(buffer)); + this.log.debug('Got workflow task', task); + return task; + }); + } + + /** + * Poll core for `ActivityTask`s while respecting worker state + */ + protected activity$(queueName: string): Observable { + return this.poller$(async () => { + const buffer = await this.nativeWorker.pollActivityTask(queueName); + const task = coresdk.activity_task.ActivityTask.decode(new Uint8Array(buffer)); + this.log.debug('Got activity task', task); + return task; + }); + } + + /** + * Start polling on tasks, completes after graceful shutdown due to receiving a shutdown signal * or call to {@link shutdown}. */ async run(queueName: string): Promise { @@ -662,22 +672,31 @@ export class Worker { for (const signal of this.options.shutdownSignals) { process.on(signal, startShutdownSequence); } - - const partitioned$ = partition( - this.poller$(queueName).pipe( - share(), - tap((task) => this.log.debug('Got task', task)) - ), - (task) => task.variant === 'workflow' - ); - // Need to cast to any in order to assign the specific types since partition returns an Observable - const [workflow$, activity$] = (partitioned$ as any) as [Observable, Observable]; - - return await merge( - this.activityHeartbeat$(), - merge(workflow$.pipe(this.workflowOperator(queueName)), activity$.pipe(this.activityOperator())).pipe( - map((arr) => this.nativeWorker.completeTask(arr.buffer.slice(arr.byteOffset))) - ) - ).toPromise(); + try { + await merge( + this.activityHeartbeat$(), + merge( + this.workflow$(queueName).pipe( + this.workflowOperator(queueName), + mergeMap((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) + ), + this.activity$(queueName).pipe( + this.activityOperator(), + mergeMap((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) + ) + ).pipe( + tap({ + complete: () => { + this.state = 'STOPPED'; + }, + error: () => { + this.state = 'FAILED'; + }, + }) + ) + ).toPromise(); + } finally { + await this.nativeWorker.breakLoop(); + } } } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 9faa66abd..fe207ce52 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -85,7 +85,7 @@ export type WorkflowTaskHandler = { function completeWorkflow(result: any) { state.commands.push({ completeWorkflowExecution: { - result: defaultDataConverter.toPayloads(result), + result: defaultDataConverter.toPayload(result), }, }); state.completed = true; @@ -162,7 +162,13 @@ export class Activator implements WorkflowTaskHandler { } const { resolve, reject, scope } = consumeCompletion(idToSeq(activation.activityId)); if (activation.result.completed) { - resolve(defaultDataConverter.fromPayloads(0, activation.result.completed.result)); + const completed = activation.result.completed; + const result = completed.result ? defaultDataConverter.fromPayload(completed.result) : undefined; + if (result === undefined) { + reject(new Error('Failed to convert from payload')); + } else { + resolve(result); + } } else if (activation.result.failed) { reject(new Error(nullToUndefined(activation.result.failed.failure?.message))); } else if (activation.result.canceled) { @@ -240,9 +246,9 @@ export function activate(encodedActivation: Uint8Array, jobIndex: number): boole export function concludeActivation(taskToken: Uint8Array): Uint8Array { const { commands } = state; // TODO: activation failed (should this be done in main node isolate?) - const encoded = iface.coresdk.TaskCompletion.encodeDelimited({ + const encoded = iface.coresdk.workflow_completion.WFActivationCompletion.encodeDelimited({ taskToken, - workflow: { successful: { commands } }, + successful: { commands }, }).finish(); state.commands = []; return encoded;