From ca523712e1ccb758769f4a170ff1cb61a5d3c7c4 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 29 Mar 2021 21:39:17 +0300 Subject: [PATCH 01/15] Made bridge compile with async --- packages/worker/native/Cargo.lock | 14 +- packages/worker/native/Cargo.toml | 1 + packages/worker/native/sdk-core | 2 +- packages/worker/native/src/lib.rs | 324 ++++++++++++++++++++---------- 4 files changed, 223 insertions(+), 118 deletions(-) diff --git a/packages/worker/native/Cargo.lock b/packages/worker/native/Cargo.lock index 71da846c5..723b1c8d3 100644 --- a/packages/worker/native/Cargo.lock +++ b/packages/worker/native/Cargo.lock @@ -208,9 +208,9 @@ dependencies = [ [[package]] name = "displaydoc" -version = "0.1.7" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc2ab4d5a16117f9029e9a6b5e4e79f4c67f6519bc134210d4d4a04ba31f41b" +checksum = "3274a6bc8a6a4521291b53b9dcb8345e963fe931c3fc462a7d3ead71d7ccd30d" dependencies = [ "proc-macro2", "quote", @@ -1202,6 +1202,7 @@ dependencies = [ "tonic", "tonic-build", "tracing", + "tracing-futures", "tracing-opentelemetry", "tracing-subscriber", "url", @@ -1218,6 +1219,7 @@ dependencies = [ "prost", "prost-types", "temporal-sdk-core", + "tokio", ] [[package]] @@ -1298,9 +1300,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.1.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6714d663090b6b0acb0fa85841c6d66233d150cdb2602c8f9b8abb03370beb3f" +checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" dependencies = [ "autocfg", "bytes", @@ -1315,9 +1317,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42517d2975ca3114b22a16192634e8241dc5cc1f130be194645970cc1c371494" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" dependencies = [ "proc-macro2", "quote", diff --git a/packages/worker/native/Cargo.toml b/packages/worker/native/Cargo.toml index d67de1be7..b8c03c083 100644 --- a/packages/worker/native/Cargo.toml +++ b/packages/worker/native/Cargo.toml @@ -16,6 +16,7 @@ futures = { version = "0.3", features = ["executor"] } neon = { version = "0.7", default-features = false, features = ["napi-4", "event-queue-api"] } prost = "0.7" prost-types = "0.7" +tokio = "1.4.0" [dependencies.temporal-sdk-core] version = "*" diff --git a/packages/worker/native/sdk-core b/packages/worker/native/sdk-core index 95573783c..0a516125c 160000 --- a/packages/worker/native/sdk-core +++ b/packages/worker/native/sdk-core @@ -1 +1 @@ -Subproject commit 95573783ce045cb4c04c50bd2835c9489e6926b2 +Subproject commit 0a516125c9edba62c880be6af89084a5cfcf6226 diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index b9a4fb970..6d80ee646 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -1,53 +1,51 @@ use neon::{prelude::*, register_module}; use prost::Message; -use std::{ - sync::{ - mpsc::{sync_channel, Receiver, SyncSender}, - Arc, - }, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use temporal_sdk_core::{ - init, - protos::coresdk::{self, TaskCompletion}, - Core, CoreInitOptions, ServerGatewayOptions, Url, + init, protos::coresdk::activity_result::ActivityResult, + protos::coresdk::workflow_activation::WfActivation, + protos::coresdk::workflow_completion::WfActivationCompletion, Core, CoreInitOptions, + ServerGatewayOptions, Url, }; +use tokio::sync::mpsc::{channel, Receiver, Sender}; -type BoxedWorker = JsBox>; +/// Possible Request variants +enum RequestVariant { + /// A request to shutdown core and the bridge thread, lang should wait on + /// CoreError::ShuttingDown before exiting to allow draining of pending tasks + Shutdown, + /// A request to poll for workflow activations + PollWorkflowActivation { + /// Name of queue to poll + queue_name: String, + }, + /// A request to complete a single workflow activation + CompleteWorkflowActivation { completion: WfActivationCompletion }, + /// A request to poll for activity tasks + PollActivityTask { + /// Name of queue to poll + queue_name: String, + }, +} -/// A request from lang to poll a queue -pub struct PollRequest { - /// Name of queue to poll - queue_name: String, - /// Used to send the poll result back into lang +/// A request from lang to bridge to core +pub struct Request { + variant: RequestVariant, + /// Used to send the result back into lang callback: Root, } +/// Worker struct, hold a reference for the channel sender responsible for sending requests from +/// lang to core pub struct Worker { - core: Box, - sender: SyncSender, + sender: Sender, } -impl Finalize for Worker {} - -impl Worker { - pub fn new(gateway_opts: ServerGatewayOptions) -> (Self, Receiver) { - // Set capacity to 1 because we only poll from a single thread - let (sender, receiver) = sync_channel::(1); - let core = init(CoreInitOptions { gateway_opts }).unwrap(); - - let worker = Worker { - core: Box::new(core), - sender, - }; - - (worker, receiver) - } +/// Box it so we can use Worker from JS +// TODO: we might not need Arc +type BoxedWorker = JsBox>; - pub fn poll(&self, queue_name: String) -> ::temporal_sdk_core::Result { - self.core.poll_task(&queue_name) - } -} +impl Finalize for Worker {} // Below are functions exported to JS @@ -59,6 +57,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .get(&mut cx, "url")? .downcast_or_throw::(&mut cx)? .value(&mut cx); + let callback = cx.argument::(1)?.root(&mut cx); let gateway_opts = ServerGatewayOptions { target_url: Url::parse(&url).unwrap(), @@ -81,95 +80,186 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .value(&mut cx) as u64, ), }; - let (worker, receiver) = Worker::new(gateway_opts); + let (sender, mut receiver) = channel::(1000); + let worker = Worker { sender }; let worker = Arc::new(worker); - let queue = cx.queue(); + let queue_arc = Arc::new(cx.queue()); let cloned_worker = Arc::clone(&worker); - std::thread::spawn(move || loop { - let item = receiver.recv().unwrap(); - let queue_name = item.queue_name; - let callback = item.callback; - let result = worker.poll(queue_name); - match result { - Ok(task) => { - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = cx.undefined(); - let len = task.encoded_len(); - let mut result = JsArrayBuffer::new(&mut cx, len as u32)?; - cx.borrow_mut(&mut result, |data| { - let mut slice = data.as_mut_slice::(); - if let Err(_) = task.encode(&mut slice) { - panic!("Failed to encode task") - }; - }); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); - } - Err(err) => { - // TODO: on the JS side we consider all errors fatal, revise this later - let should_break = match err { - temporal_sdk_core::CoreError::ShuttingDown => true, - _ => false, - }; - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = JsError::error(&mut cx, format!("{}", err))?; - let result = cx.undefined(); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); - if should_break { - break; + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + match init(CoreInitOptions { gateway_opts }).await { + Ok(result) => { + let core_arc = Arc::new(result); + loop { + // TODO: handle this error + let request = receiver.recv().await.unwrap(); + let variant = request.variant; + let callback = request.callback; + let core = Arc::clone(&core_arc); + let queue = Arc::clone(&queue_arc); + tokio::spawn(async move { + match variant { + RequestVariant::PollWorkflowActivation { queue_name } => { + match core.poll_workflow_task(&queue_name).await { + Ok(task) => { + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = cx.undefined(); + let len = task.encoded_len(); + let mut result = + JsArrayBuffer::new(&mut cx, len as u32)?; + cx.borrow_mut(&mut result, |data| { + let mut slice = data.as_mut_slice::(); + if let Err(_) = task.encode(&mut slice) { + panic!("Failed to encode task") + }; + }); + let args: Vec> = + vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + } + Err(err) => { + // TODO: on the JS side we consider all errors fatal, revise this later + // let should_break = match err { + // temporal_sdk_core::CoreError::ShuttingDown => true, + // _ => false, + // }; + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = JsError::error( + &mut cx, + format!("{}", err), + )?; + let result = cx.undefined(); + let args: Vec> = + vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + // if should_break { + // break; + // } + } + } + } + RequestVariant::CompleteWorkflowActivation { completion } => { + match core.complete_workflow_task(completion).await { + Ok(()) => { + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = cx.undefined(); + let result = cx.undefined(); + let args: Vec> = + vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + } + Err(err) => { + // TODO: on the JS side we consider all errors fatal, revise this later + // let should_break = match err { + // temporal_sdk_core::CoreError::ShuttingDown => true, + // _ => false, + // }; + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = JsError::error( + &mut cx, + format!("{}", err), + )?; + let result = cx.undefined(); + let args: Vec> = + vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + // if should_break { + // break; + // } + } + } + } + // Shutdown { + // match core.shutdown().await { + // } + // } + _ => {} + } + }); + } + } + Err(err) => { + let queue = queue_arc.clone(); + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = JsError::error(&mut cx, format!("{}", err))?; + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + } } - } - } + }) }); Ok(cx.boxed(cloned_worker)) } /// Initiate a single poll request. -/// Will block if a poll request is already in-flight -fn worker_poll(mut cx: FunctionContext) -> JsResult { +/// There should be only one concurrent poll request for this type. +fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; let queue_name = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let item = PollRequest { - queue_name, - callback, + let callback = cx.argument::(2)?; + let request = Request { + variant: RequestVariant::PollWorkflowActivation { queue_name }, + callback: callback.root(&mut cx), }; - match worker.sender.send(item) { - Ok(_) => Ok(cx.undefined()), - Err(err) => { - let error = JsError::error(&mut cx, format!("{}", err))?; - cx.throw(error) - } + if let Err(err) = worker.sender.blocking_send(request) { + let this = cx.undefined(); + let error = JsError::error(&mut cx, format!("{}", err))?; + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; } + Ok(cx.undefined()) } -/// Submit a task completion to core. -fn worker_complete_task(mut cx: FunctionContext) -> JsResult { +/// Submit a workflow activation completion to core. +fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; let completion = cx.argument::(1)?; + let callback = cx.argument::(2)?; let result = cx.borrow(&completion, |data| { - TaskCompletion::decode_length_delimited(data.as_slice::()) + WfActivationCompletion::decode_length_delimited(data.as_slice::()) }); match result { Ok(completion) => { - // TODO: submit from background thread (using neon::Task)? - if let Err(err) = worker.core.complete_task(completion) { + let request = Request { + variant: RequestVariant::CompleteWorkflowActivation { completion }, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + let this = cx.undefined(); let error = JsError::error(&mut cx, format!("{}", err))?; - cx.throw(error) - } else { - Ok(cx.undefined()) - } + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + }; + Ok(cx.undefined()) } Err(_) => cx.throw_type_error("Cannot decode Completion from buffer"), } @@ -180,19 +270,31 @@ fn worker_complete_task(mut cx: FunctionContext) -> JsResult { /// shutdown. fn worker_shutdown(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; - match worker.core.shutdown() { - Ok(_) => Ok(cx.undefined()), - Err(err) => { - let error = JsError::error(&mut cx, format!("{}", err))?; - cx.throw(error) - } + let callback = cx.argument::(1)?; + let request = Request { + variant: RequestVariant::Shutdown, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + let this = cx.undefined(); + let error = JsError::error(&mut cx, format!("{}", err))?; + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; } + Ok(cx.undefined()) } register_module!(mut cx, { cx.export_function("newWorker", worker_new)?; cx.export_function("workerShutdown", worker_shutdown)?; - cx.export_function("workerPoll", worker_poll)?; - cx.export_function("workerCompleteTask", worker_complete_task)?; + cx.export_function( + "workerPollWorkflowActivation", + worker_poll_workflow_activation, + )?; + cx.export_function( + "workerCompleteWorkflowActivation", + worker_complete_workflow_activation, + )?; Ok(()) }); From 8975612cfef5967f7b0eea90c3b6ba06ac520285 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 29 Mar 2021 22:25:33 +0300 Subject: [PATCH 02/15] Make typescript build --- packages/test/src/mock-native-worker.ts | 45 +++++++++-- packages/test/src/test-worker-activities.ts | 87 +++++++++------------ packages/test/src/test-worker-lifecycle.ts | 4 +- packages/test/src/test-workflows.ts | 24 +++--- packages/worker/native/index.d.ts | 6 +- packages/worker/src/activity.ts | 2 +- packages/worker/src/worker.ts | 86 ++++++++++---------- 7 files changed, 132 insertions(+), 122 deletions(-) diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index dcbcdb8a6..1b48199c0 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -5,6 +5,15 @@ import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/da import { Worker as RealWorker, NativeWorkerLike } from '@temporalio/worker/lib/worker'; import { sleep } from '@temporalio/worker/lib/utils'; +export type Task = + | { workflow: coresdk.workflow_activation.IWFActivation } + | { activity: coresdk.activity_task.IActivityTask }; + +export interface ActivityCompletion { + taskToken?: Uint8Array | null; + result: coresdk.activity_result.ActivityResult; +} + export class MockNativeWorker implements NativeWorkerLike { tasks: Array> = []; reject?: (err: Error) => void; @@ -15,7 +24,7 @@ export class MockNativeWorker implements NativeWorkerLike { this.tasks.unshift(Promise.reject(new Error('[Core::shutdown]'))); } - public async poll(_queueName: string): Promise { + public async pollWorkflowActivation(_queueName: string): Promise { for (;;) { const task = this.tasks.pop(); if (task !== undefined) { @@ -25,26 +34,46 @@ export class MockNativeWorker implements NativeWorkerLike { } } - public completeTask(result: ArrayBuffer): void { + public completeWorkflowActivation(result: ArrayBuffer): void { this.completionCallback!(result); this.completionCallback = undefined; } - public emit(task: coresdk.ITask): void { - const arr = coresdk.Task.encode(task).finish(); + public emit(task: Task): void { + let arr: Uint8Array; + if ('workflow' in task) { + arr = coresdk.workflow_activation.WFActivation.encode(task.workflow).finish(); + } else { + arr = coresdk.activity_task.ActivityTask.encode(task.activity).finish(); + } const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); this.tasks.unshift(Promise.resolve(buffer)); } - public async runAndWaitCompletion(task: coresdk.ITask): Promise { - task = { ...task, taskToken: task.taskToken ?? Buffer.from(uuid4()) }; - const arr = coresdk.Task.encode(task).finish(); + public async runWorkflowActivation( + activation: coresdk.workflow_activation.IWFActivation + ): Promise { + activation = { ...activation, taskToken: activation.taskToken ?? Buffer.from(uuid4()) }; + const arr = coresdk.workflow_activation.WFActivation.encode(activation).finish(); + const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); + const result = await new Promise((resolve) => { + this.completionCallback = resolve; + this.tasks.unshift(Promise.resolve(buffer)); + }); + return coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(new Uint8Array(result)); + } + + public async runActivityTask(task: coresdk.activity_task.IActivityTask): Promise { + const arr = coresdk.activity_task.ActivityTask.encode(task).finish(); const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); const result = await new Promise((resolve) => { this.completionCallback = resolve; this.tasks.unshift(Promise.resolve(buffer)); }); - return coresdk.TaskCompletion.decodeDelimited(new Uint8Array(result)); + return { + taskToken: task.taskToken, + result: coresdk.activity_result.ActivityResult.decodeDelimited(new Uint8Array(result)), + }; } sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { diff --git a/packages/test/src/test-worker-activities.ts b/packages/test/src/test-worker-activities.ts index 2fbae94f2..8bdecb09c 100644 --- a/packages/test/src/test-worker-activities.ts +++ b/packages/test/src/test-worker-activities.ts @@ -31,10 +31,10 @@ test.beforeEach((t) => { function compareCompletion( t: ExecutionContext, - actual: coresdk.TaskCompletion, - expected: coresdk.ITaskCompletion + actual: coresdk.activity_result.ActivityResult, + expected: coresdk.activity_result.IActivityResult ) { - t.deepEqual(actual.toJSON(), coresdk.TaskCompletion.create(expected).toJSON()); + t.deepEqual(actual.toJSON(), coresdk.activity_result.ActivityResult.create(expected).toJSON()); } test('Worker runs an activity and reports completion', async (t) => { @@ -42,19 +42,16 @@ test('Worker runs an activity and reports completion', async (t) => { await runWorker(t, async () => { const taskToken = Buffer.from(uuid4()); const url = 'https://temporal.io'; - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - start: { - activityType: JSON.stringify(['@activities', 'httpGet']), - input: defaultDataConverter.toPayloads(url), - }, + activityId: 'abc', + start: { + activityType: JSON.stringify(['@activities', 'httpGet']), + input: defaultDataConverter.toPayloads(url), }, }); - compareCompletion(t, completion, { - taskToken, - activity: { completed: { result: defaultDataConverter.toPayloads(await httpGet(url)) } }, + compareCompletion(t, completion.result, { + completed: { result: defaultDataConverter.toPayload(await httpGet(url)) }, }); }); }); @@ -64,19 +61,16 @@ test('Worker runs an activity and reports failure', async (t) => { await runWorker(t, async () => { const taskToken = Buffer.from(uuid4()); const message = ':('; - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - start: { - activityType: JSON.stringify(['@activities', 'throwAnError']), - input: defaultDataConverter.toPayloads(message), - }, + activityId: 'abc', + start: { + activityType: JSON.stringify(['@activities', 'throwAnError']), + input: defaultDataConverter.toPayloads(message), }, }); - compareCompletion(t, completion, { - taskToken, - activity: { failed: { failure: { message } } }, + compareCompletion(t, completion.result, { + failed: { failure: { message } }, }); }); }); @@ -85,8 +79,8 @@ test('Worker cancels activity and reports cancellation', async (t) => { const { worker } = t.context; await runWorker(t, async () => { worker.native.emit({ - taskToken: Buffer.from(uuid4()), activity: { + taskToken: Buffer.from(uuid4()), activityId: 'abc', start: { activityType: JSON.stringify(['@activities', 'waitForCancellation']), @@ -95,16 +89,13 @@ test('Worker cancels activity and reports cancellation', async (t) => { }, }); const taskToken = Buffer.from(uuid4()); - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - cancel: {}, - }, + activityId: 'abc', + cancel: {}, }); - compareCompletion(t, completion, { - taskToken, - activity: { canceled: {} }, + compareCompletion(t, completion.result, { + canceled: {}, }); }); }); @@ -113,8 +104,8 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => { const { worker } = t.context; await runWorker(t, async () => { worker.native.emit({ - taskToken: Buffer.from(uuid4()), activity: { + taskToken: Buffer.from(uuid4()), activityId: 'abc', start: { activityType: JSON.stringify(['@activities', 'cancellableFetch']), @@ -123,16 +114,13 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => { }, }); const taskToken = Buffer.from(uuid4()); - const completion = await worker.native.runAndWaitCompletion({ + const completion = await worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - cancel: {}, - }, + activityId: 'abc', + cancel: {}, }); - compareCompletion(t, completion, { - taskToken, - activity: { canceled: {} }, + compareCompletion(t, completion.result, { + canceled: {}, }); }); }); @@ -141,22 +129,19 @@ test('Activity Context heartbeat is sent to core', async (t) => { const { worker } = t.context; await runWorker(t, async () => { const taskToken = Buffer.from(uuid4()); - const completionPromise = worker.native.runAndWaitCompletion({ + const completionPromise = worker.native.runActivityTask({ taskToken, - activity: { - activityId: 'abc', - start: { - activityType: JSON.stringify(['@activities', 'progressiveSleep']), - input: defaultDataConverter.toPayloads(), - }, + activityId: 'abc', + start: { + activityType: JSON.stringify(['@activities', 'progressiveSleep']), + input: defaultDataConverter.toPayloads(), }, }); t.is(await worker.native.untilHeartbeat('abc'), 1); t.is(await worker.native.untilHeartbeat('abc'), 2); t.is(await worker.native.untilHeartbeat('abc'), 3); - compareCompletion(t, await completionPromise, { - taskToken, - activity: { completed: { result: defaultDataConverter.toPayloads(undefined) } }, + compareCompletion(t, (await completionPromise).result, { + completed: { result: defaultDataConverter.toPayload(undefined) }, }); }); }); diff --git a/packages/test/src/test-worker-lifecycle.ts b/packages/test/src/test-worker-lifecycle.ts index 330538c6f..8301a0110 100644 --- a/packages/test/src/test-worker-lifecycle.ts +++ b/packages/test/src/test-worker-lifecycle.ts @@ -55,8 +55,8 @@ test('Mocked worker suspends and resumes', async (t) => { worker.suspendPolling(); t.is(worker.getState(), 'SUSPENDED'); // Worker finishes its polling before suspension - await worker.native.runAndWaitCompletion({ workflow: { runId: 'abc' } }); - const completion = worker.native.runAndWaitCompletion({ workflow: { runId: 'abc' } }); + await worker.native.runWorkflowActivation({ runId: 'abc' }); + const completion = worker.native.runWorkflowActivation({ runId: 'abc' }); await t.throwsAsync( Promise.race([ sleep(10).then(() => { diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index ba7c2755e..f95228d25 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -35,19 +35,17 @@ test.beforeEach(async (t) => { async function activate(t: ExecutionContext, activation: coresdk.workflow_activation.IWFActivation) { const taskToken = u8(`${Math.random()}`); const arr = await t.context.workflow.activate(taskToken, activation); - const req = coresdk.TaskCompletion.decodeDelimited(arr); + const req = coresdk.workflow_activation.WFActivation.decodeDelimited(arr); t.deepEqual(req.taskToken, taskToken); - t.is(req.variant, 'workflow'); return req; } function compareCompletion( t: ExecutionContext, - req: coresdk.TaskCompletion, + req: coresdk.workflow_completion.WFActivationCompletion, expected: coresdk.workflow_completion.IWFActivationCompletion ) { - const actual = req.toJSON().workflow; - t.deepEqual(actual, coresdk.workflow_completion.WFActivationCompletion.create(expected).toJSON()); + t.deepEqual(req.toJSON(), coresdk.workflow_completion.WFActivationCompletion.create(expected).toJSON()); } function makeSuccess( @@ -131,12 +129,10 @@ function makeSignalWorkflow( return makeActivation(timestamp, { signalWorkflow: { signalName, input: args } }); } -function makeCompleteWorkflowExecution( - ...payloads: coresdk.common.IPayload[] -): coresdk.workflow_commands.IWorkflowCommand { - if (payloads.length === 0) payloads = [{ metadata: { encoding: u8('binary/null') } }]; +function makeCompleteWorkflowExecution(result?: coresdk.common.IPayload): coresdk.workflow_commands.IWorkflowCommand { + result ??= { metadata: { encoding: u8('binary/null') } }; return { - completeWorkflowExecution: { result: payloads }, + completeWorkflowExecution: { result }, }; } @@ -217,11 +213,11 @@ function cleanStackTrace(stack: string) { return stack.replace(/\bat (\S+) \(.*\)/g, (_, m0) => `at ${m0}`); } -function cleanWorkflowFailureStackTrace(req: coresdk.TaskCompletion, commandIndex = 0) { +function cleanWorkflowFailureStackTrace(req: coresdk.workflow_completion.WFActivationCompletion, commandIndex = 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - req.workflow!.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace = cleanStackTrace( + req.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace = cleanStackTrace( // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - req.workflow!.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace! + req.successful!.commands![commandIndex].failWorkflowExecution!.failure!.stackTrace! ); return req; } @@ -769,7 +765,7 @@ test('http', async (t) => { { const req = await activate( t, - makeResolveActivity('0', { completed: { result: defaultDataConverter.toPayloads(result) } }) + makeResolveActivity('0', { completed: { result: defaultDataConverter.toPayload(result) } }) ); compareCompletion( t, diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 6fa0dc51c..d348bb0eb 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -27,6 +27,8 @@ export interface Worker {} export declare type PollCallback = (err?: Error, result: ArrayBuffer) => void; export declare function newWorker(serverOptions: ServerOptions): Worker; export declare function workerShutdown(worker: Worker): void; -export declare function workerPoll(worker: Worker, queueName: string, callback: PollCallback): void; -export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): void; +export declare function workerPollWorkflowActivation(worker: Worker, queueName: string, callback: PollCallback): void; +export declare function workerCompleteWorkflowActivation(worker: Worker, result: ArrayBuffer): void; +export declare function workerPollActivityTask(worker: Worker, queueName: string, callback: PollCallback): void; +export declare function workerCompleteActivityTask(worker: Worker, result: ArrayBuffer): void; export declare function workerSendActivityHeartbeat(worker: Worker, activityId: string, details?: ArrayBuffer): void; diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 3400a165a..0a0a389f7 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -40,7 +40,7 @@ export class Activity { return { canceled: {} }; } console.log('completed activity', { result }); - return { completed: { result: this.dataConverter.toPayloads(result) } }; + return { completed: { result: this.dataConverter.toPayload(result) } }; } catch (err) { if (this.cancelRequested) { return { canceled: {} }; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index c2b2bc049..c83833f2a 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -212,8 +212,8 @@ type OmitFirstParam = T extends (...args: any[]) => any ? (...args: RestParam export interface NativeWorkerLike { shutdown: OmitFirstParam; - poll(queueName: string): Promise; - completeTask: OmitFirstParam; + pollWorkflowActivation(queueName: string): Promise; + completeWorkflowActivation: OmitFirstParam; sendActivityHeartbeat: OmitFirstParam; } @@ -228,19 +228,19 @@ export class NativeWorker implements NativeWorkerLike { public constructor(options?: ServerOptions) { const compiledOptions = compileServerOptions({ ...getDefaultServerOptions(), ...options }); this.native = native.newWorker(compiledOptions); - this.pollFn = promisify(native.workerPoll); + this.pollFn = promisify(native.workerPollWorkflowActivation); } public shutdown(): void { return native.workerShutdown(this.native); } - public poll(queueName: string): Promise { + public pollWorkflowActivation(queueName: string): Promise { return this.pollFn(this.native, queueName); } - public completeTask(result: ArrayBuffer): void { - return native.workerCompleteTask(this.native, result); + public completeWorkflowActivation(result: ArrayBuffer): void { + return native.workerCompleteWorkflowActivation(this.native, result); } public sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { @@ -259,7 +259,7 @@ export class Worker { activityId: string; details?: any; }>(); - protected readonly pollSubject = new Subject(); + protected readonly pollSubject = new Subject(); protected stateSubject: BehaviorSubject = new BehaviorSubject('INITIALIZED'); protected readonly nativeWorker: NativeWorkerLike; @@ -393,14 +393,14 @@ export class Worker { /** * An observable which repeatedly polls for new tasks unless worker becomes suspended */ - protected pollLoop$(queueName: string): Observable { + protected pollLoop$(queueName: string): Observable { return of(this.stateSubject).pipe( map((state) => state.getValue()), concatMap((state) => { switch (state) { case 'RUNNING': case 'STOPPING': - return this.nativeWorker.poll(queueName); + return this.nativeWorker.pollWorkflowActivation(queueName); case 'SUSPENDED': // Completes once we're out of SUSPENDED state return this.stateSubject.pipe( @@ -415,7 +415,7 @@ export class Worker { } }), repeat(), - map((buffer) => coresdk.Task.decode(new Uint8Array(buffer))) + map((buffer) => coresdk.workflow_activation.WFActivation.decode(new Uint8Array(buffer))) ); } @@ -423,7 +423,7 @@ export class Worker { * The main observable of the worker, starts the poll loop and takes care of state transitions * in case of an error during poll or shutdown */ - protected poller$(queueName: string): Observable { + protected poller$(queueName: string): Observable { return merge(this.gracefulShutdown$(), this.pollLoop$(queueName)).pipe( catchError((err) => (err.message.includes('[Core::shutdown]') ? EMPTY : throwError(err))), tap({ @@ -529,10 +529,8 @@ export class Worker { }), filter((result: T): result is Exclude => result !== undefined), map(({ taskToken, result }) => - coresdk.TaskCompletion.encodeDelimited({ - taskToken: taskToken, - activity: result, - }).finish() + // TODO: taskToken: taskToken, + coresdk.activity_result.ActivityResult.encodeDelimited(result).finish() ), tap(group$.close) ); @@ -544,9 +542,11 @@ export class Worker { * Process workflow activations * @param queueName used to propagate the current task queue to the workflow */ - protected workflowOperator(queueName: string): OperatorFunction { + protected workflowOperator( + queueName: string + ): OperatorFunction { return pipe( - closeableGroupBy((task) => task.workflow.runId), + closeableGroupBy((task) => task.runId), mergeMap((group$) => { return group$.pipe( mergeMapWithState(async (workflow: Workflow | undefined, task) => { @@ -555,7 +555,7 @@ export class Worker { try { // Find a workflow start job in the activation jobs list // TODO: should this always be the first job in the list? - const maybeStartWorkflow = task.workflow.jobs.find((j) => j.startWorkflow); + const maybeStartWorkflow = task.jobs.find((j) => j.startWorkflow); if (maybeStartWorkflow !== undefined) { const attrs = maybeStartWorkflow.startWorkflow; if (!(attrs && attrs.workflowId && attrs.workflowType)) { @@ -568,7 +568,7 @@ export class Worker { this.log.debug('Creating workflow', { taskToken, workflowId: attrs.workflowId, - runId: task.workflow.runId, + runId: task.runId, }); workflow = await Workflow.create(attrs.workflowId, queueName); // TODO: this probably shouldn't be here, consider alternative implementation @@ -583,24 +583,20 @@ export class Worker { throw new Error('Received workflow activation for an untracked workflow with no start workflow job'); } } catch (error) { - this.log.error('Failed to create a workflow', { taskToken, runId: task.workflow.runId, error }); + this.log.error('Failed to create a workflow', { taskToken, runId: task.runId, error }); let arr: Uint8Array; if (error instanceof LoaderError) { - arr = coresdk.TaskCompletion.encodeDelimited({ + arr = coresdk.workflow_completion.WFActivationCompletion.encodeDelimited({ taskToken: task.taskToken, - workflow: { - successful: { - commands: [{ failWorkflowExecution: { failure: errorToUserCodeFailure(error) } }], - }, + successful: { + commands: [{ failWorkflowExecution: { failure: errorToUserCodeFailure(error) } }], }, }).finish(); } else { - arr = coresdk.TaskCompletion.encodeDelimited({ + arr = coresdk.workflow_completion.WFActivationCompletion.encodeDelimited({ taskToken: task.taskToken, - workflow: { - failed: { - failure: errorToUserCodeFailure(error), - }, + failed: { + failure: errorToUserCodeFailure(error), }, }).finish(); } @@ -610,7 +606,8 @@ export class Worker { } this.log.debug('Activating workflow', { taskToken }); - const arr = await workflow.activate(task.taskToken, task.workflow); + // TODO: single param + const arr = await workflow.activate(task.taskToken, task); return { state: workflow, output: { close: false, arr } }; }, undefined), tap(({ close }) => void close && group$.close()) @@ -662,21 +659,22 @@ export class Worker { for (const signal of this.options.shutdownSignals) { process.on(signal, startShutdownSequence); } - - const partitioned$ = partition( - this.poller$(queueName).pipe( - share(), - tap((task) => this.log.debug('Got task', task)) - ), - (task) => task.variant === 'workflow' - ); - // Need to cast to any in order to assign the specific types since partition returns an Observable - const [workflow$, activity$] = (partitioned$ as any) as [Observable, Observable]; - + const workflow$ = this.poller$(queueName).pipe(tap((task) => this.log.debug('Got task', task))); + + // const partitioned$ = partition( + // this.poller$(queueName).pipe( + // share(), + // tap((task) => this.log.debug('Got task', task)) + // ), + // (task) => task.variant === 'workflow' + // ); + // // Need to cast to any in order to assign the specific types since partition returns an Observable + // const [workflow$, activity$] = (partitioned$ as any) as [Observable, Observable]; + // , activity$.pipe(this.activityOperator()) return await merge( this.activityHeartbeat$(), - merge(workflow$.pipe(this.workflowOperator(queueName)), activity$.pipe(this.activityOperator())).pipe( - map((arr) => this.nativeWorker.completeTask(arr.buffer.slice(arr.byteOffset))) + merge(workflow$.pipe(this.workflowOperator(queueName))).pipe( + map((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) ) ).toPromise(); } From 957126881b8d2fcc667791dce056f7d859ea954b Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 29 Mar 2021 23:13:06 +0300 Subject: [PATCH 03/15] Pass unit tests --- packages/test/src/mock-native-worker.ts | 51 ++++++++++++------ packages/test/src/test-workflows.ts | 7 ++- packages/worker/src/worker.ts | 72 ++++++++++++++----------- packages/workflow/src/internals.ts | 14 +++-- 4 files changed, 92 insertions(+), 52 deletions(-) diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 1b48199c0..04e4f6619 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -15,18 +15,31 @@ export interface ActivityCompletion { } export class MockNativeWorker implements NativeWorkerLike { - tasks: Array> = []; - reject?: (err: Error) => void; - completionCallback?: (arr: ArrayBuffer) => void; + activityTasks: Array> = []; + workflowActivations: Array> = []; + activityCompletionCallback?: (arr: ArrayBuffer) => void; + workflowCompletionCallback?: (arr: ArrayBuffer) => void; activityHeartbeatCallback?: (activityId: string, details: any) => void; + reject?: (err: Error) => void; public shutdown(): void { - this.tasks.unshift(Promise.reject(new Error('[Core::shutdown]'))); + this.activityTasks.unshift(Promise.reject(new Error('[Core::shutdown]'))); + this.workflowActivations.unshift(Promise.reject(new Error('[Core::shutdown]'))); } public async pollWorkflowActivation(_queueName: string): Promise { for (;;) { - const task = this.tasks.pop(); + const task = this.workflowActivations.pop(); + if (task !== undefined) { + return task; + } + await sleep(1); + } + } + + public async pollActivityTask(_queueName: string): Promise { + for (;;) { + const task = this.activityTasks.pop(); if (task !== undefined) { return task; } @@ -35,19 +48,25 @@ export class MockNativeWorker implements NativeWorkerLike { } public completeWorkflowActivation(result: ArrayBuffer): void { - this.completionCallback!(result); - this.completionCallback = undefined; + this.workflowCompletionCallback!(result); + this.workflowCompletionCallback = undefined; + } + + public completeActivityTask(result: ArrayBuffer): void { + this.activityCompletionCallback!(result); + this.activityCompletionCallback = undefined; } public emit(task: Task): void { - let arr: Uint8Array; if ('workflow' in task) { - arr = coresdk.workflow_activation.WFActivation.encode(task.workflow).finish(); + const arr = coresdk.workflow_activation.WFActivation.encode(task.workflow).finish(); + const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); + this.workflowActivations.unshift(Promise.resolve(buffer)); } else { - arr = coresdk.activity_task.ActivityTask.encode(task.activity).finish(); + const arr = coresdk.activity_task.ActivityTask.encode(task.activity).finish(); + const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); + this.activityTasks.unshift(Promise.resolve(buffer)); } - const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); - this.tasks.unshift(Promise.resolve(buffer)); } public async runWorkflowActivation( @@ -57,8 +76,8 @@ export class MockNativeWorker implements NativeWorkerLike { const arr = coresdk.workflow_activation.WFActivation.encode(activation).finish(); const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); const result = await new Promise((resolve) => { - this.completionCallback = resolve; - this.tasks.unshift(Promise.resolve(buffer)); + this.workflowCompletionCallback = resolve; + this.workflowActivations.unshift(Promise.resolve(buffer)); }); return coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(new Uint8Array(result)); } @@ -67,8 +86,8 @@ export class MockNativeWorker implements NativeWorkerLike { const arr = coresdk.activity_task.ActivityTask.encode(task).finish(); const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); const result = await new Promise((resolve) => { - this.completionCallback = resolve; - this.tasks.unshift(Promise.resolve(buffer)); + this.activityCompletionCallback = resolve; + this.activityTasks.unshift(Promise.resolve(buffer)); }); return { taskToken: task.taskToken, diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index f95228d25..faad4ff2e 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -35,7 +35,7 @@ test.beforeEach(async (t) => { async function activate(t: ExecutionContext, activation: coresdk.workflow_activation.IWFActivation) { const taskToken = u8(`${Math.random()}`); const arr = await t.context.workflow.activate(taskToken, activation); - const req = coresdk.workflow_activation.WFActivation.decodeDelimited(arr); + const req = coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(arr); t.deepEqual(req.taskToken, taskToken); return req; } @@ -45,7 +45,10 @@ function compareCompletion( req: coresdk.workflow_completion.WFActivationCompletion, expected: coresdk.workflow_completion.IWFActivationCompletion ) { - t.deepEqual(req.toJSON(), coresdk.workflow_completion.WFActivationCompletion.create(expected).toJSON()); + t.deepEqual( + req.toJSON(), + coresdk.workflow_completion.WFActivationCompletion.create({ ...expected, taskToken: req.taskToken }).toJSON() + ); } function makeSuccess( diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index c83833f2a..41918d3b5 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -213,7 +213,9 @@ type OmitFirstParam = T extends (...args: any[]) => any ? (...args: RestParam export interface NativeWorkerLike { shutdown: OmitFirstParam; pollWorkflowActivation(queueName: string): Promise; + pollActivityTask(queueName: string): Promise; completeWorkflowActivation: OmitFirstParam; + completeActivityTask: OmitFirstParam; sendActivityHeartbeat: OmitFirstParam; } @@ -223,12 +225,14 @@ export interface WorkerConstructor { export class NativeWorker implements NativeWorkerLike { protected readonly native: native.Worker; - protected readonly pollFn: (worker: native.Worker, queueName: string) => Promise; + protected readonly workflowPollFn: (worker: native.Worker, queueName: string) => Promise; + protected readonly activityPollFn: (worker: native.Worker, queueName: string) => Promise; public constructor(options?: ServerOptions) { const compiledOptions = compileServerOptions({ ...getDefaultServerOptions(), ...options }); this.native = native.newWorker(compiledOptions); - this.pollFn = promisify(native.workerPollWorkflowActivation); + this.workflowPollFn = promisify(native.workerPollWorkflowActivation); + this.activityPollFn = promisify(native.workerPollActivityTask); } public shutdown(): void { @@ -236,13 +240,21 @@ export class NativeWorker implements NativeWorkerLike { } public pollWorkflowActivation(queueName: string): Promise { - return this.pollFn(this.native, queueName); + return this.workflowPollFn(this.native, queueName); + } + + public pollActivityTask(queueName: string): Promise { + return this.activityPollFn(this.native, queueName); } public completeWorkflowActivation(result: ArrayBuffer): void { return native.workerCompleteWorkflowActivation(this.native, result); } + public completeActivityTask(result: ArrayBuffer): void { + return native.workerCompleteActivityTask(this.native, result); + } + public sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { return native.workerSendActivityHeartbeat(this.native, activityId, details); } @@ -259,7 +271,6 @@ export class Worker { activityId: string; details?: any; }>(); - protected readonly pollSubject = new Subject(); protected stateSubject: BehaviorSubject = new BehaviorSubject('INITIALIZED'); protected readonly nativeWorker: NativeWorkerLike; @@ -393,14 +404,14 @@ export class Worker { /** * An observable which repeatedly polls for new tasks unless worker becomes suspended */ - protected pollLoop$(queueName: string): Observable { + protected pollLoop$(pollFn: () => Promise): Observable { return of(this.stateSubject).pipe( map((state) => state.getValue()), concatMap((state) => { switch (state) { case 'RUNNING': case 'STOPPING': - return this.nativeWorker.pollWorkflowActivation(queueName); + return pollFn(); case 'SUSPENDED': // Completes once we're out of SUSPENDED state return this.stateSubject.pipe( @@ -414,8 +425,7 @@ export class Worker { throw new Error(`Unexpected state ${state}`); } }), - repeat(), - map((buffer) => coresdk.workflow_activation.WFActivation.decode(new Uint8Array(buffer))) + repeat() ); } @@ -423,8 +433,8 @@ export class Worker { * The main observable of the worker, starts the poll loop and takes care of state transitions * in case of an error during poll or shutdown */ - protected poller$(queueName: string): Observable { - return merge(this.gracefulShutdown$(), this.pollLoop$(queueName)).pipe( + protected poller$(pollFn: () => Promise): Observable { + return merge(this.gracefulShutdown$(), this.pollLoop$(pollFn)).pipe( catchError((err) => (err.message.includes('[Core::shutdown]') ? EMPTY : throwError(err))), tap({ complete: () => { @@ -440,9 +450,9 @@ export class Worker { /** * Process activity tasks */ - protected activityOperator(): OperatorFunction { + protected activityOperator(): OperatorFunction { return pipe( - closeableGroupBy((task) => task.activity.activityId), + closeableGroupBy((task) => task.activityId), mergeMap((group$) => { return group$.pipe( mergeMapWithState(async (activity: Activity | undefined, task) => { @@ -452,15 +462,14 @@ export class Worker { let output: | { type: 'result'; result: coresdk.activity_result.IActivityResult } | { type: 'run'; activity: Activity }; - const { taskToken } = task; - const { variant } = task.activity; + const { taskToken, variant, activityId } = task; if (!variant) { throw new Error('Got an activity task without a "variant" attribute'); } switch (variant) { case 'start': { - const { start, activityId } = task.activity; + const { start } = task; if (!start) { throw new Error('Got a "start" activity task without a "start" attribute'); } @@ -496,7 +505,6 @@ export class Worker { break; } case 'cancel': { - const { activityId } = task.activity; if (activity === undefined) { this.log.error('Tried to cancel a non-existing activity', { activityId }); output = { type: 'result', result: { failed: { failure: { message: 'Activity not found' } } } }; @@ -659,22 +667,26 @@ export class Worker { for (const signal of this.options.shutdownSignals) { process.on(signal, startShutdownSequence); } - const workflow$ = this.poller$(queueName).pipe(tap((task) => this.log.debug('Got task', task))); - - // const partitioned$ = partition( - // this.poller$(queueName).pipe( - // share(), - // tap((task) => this.log.debug('Got task', task)) - // ), - // (task) => task.variant === 'workflow' - // ); - // // Need to cast to any in order to assign the specific types since partition returns an Observable - // const [workflow$, activity$] = (partitioned$ as any) as [Observable, Observable]; - // , activity$.pipe(this.activityOperator()) + const workflow$ = this.poller$(async () => { + const buffer = await this.nativeWorker.pollWorkflowActivation(queueName); + return coresdk.workflow_activation.WFActivation.decode(new Uint8Array(buffer)); + }).pipe(tap((task) => this.log.debug('Got workflow task', task))); + const activity$ = this.poller$(async () => { + const buffer = await this.nativeWorker.pollActivityTask(queueName); + return coresdk.activity_task.ActivityTask.decode(new Uint8Array(buffer)); + }).pipe(tap((task) => this.log.debug('Got activity task', task))); + return await merge( this.activityHeartbeat$(), - merge(workflow$.pipe(this.workflowOperator(queueName))).pipe( - map((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) + merge( + workflow$.pipe( + this.workflowOperator(queueName), + map((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) + ), + activity$.pipe( + this.activityOperator(), + map((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) + ) ) ).toPromise(); } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 9faa66abd..fe207ce52 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -85,7 +85,7 @@ export type WorkflowTaskHandler = { function completeWorkflow(result: any) { state.commands.push({ completeWorkflowExecution: { - result: defaultDataConverter.toPayloads(result), + result: defaultDataConverter.toPayload(result), }, }); state.completed = true; @@ -162,7 +162,13 @@ export class Activator implements WorkflowTaskHandler { } const { resolve, reject, scope } = consumeCompletion(idToSeq(activation.activityId)); if (activation.result.completed) { - resolve(defaultDataConverter.fromPayloads(0, activation.result.completed.result)); + const completed = activation.result.completed; + const result = completed.result ? defaultDataConverter.fromPayload(completed.result) : undefined; + if (result === undefined) { + reject(new Error('Failed to convert from payload')); + } else { + resolve(result); + } } else if (activation.result.failed) { reject(new Error(nullToUndefined(activation.result.failed.failure?.message))); } else if (activation.result.canceled) { @@ -240,9 +246,9 @@ export function activate(encodedActivation: Uint8Array, jobIndex: number): boole export function concludeActivation(taskToken: Uint8Array): Uint8Array { const { commands } = state; // TODO: activation failed (should this be done in main node isolate?) - const encoded = iface.coresdk.TaskCompletion.encodeDelimited({ + const encoded = iface.coresdk.workflow_completion.WFActivationCompletion.encodeDelimited({ taskToken, - workflow: { successful: { commands } }, + successful: { commands }, }).finish(); state.commands = []; return encoded; From 7a95348a0a299cb7c8fd99159f1e6186c5af88fa Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 29 Mar 2021 23:56:11 +0300 Subject: [PATCH 04/15] Async worker initialization --- packages/test/src/mock-native-worker.ts | 11 +++++- packages/test/src/run-a-worker.ts | 2 +- packages/test/src/test-integration.ts | 25 +++++++----- packages/test/src/test-worker-lifecycle.ts | 2 +- packages/worker/native/index.d.ts | 3 +- packages/worker/native/src/lib.rs | 23 ++++++++--- packages/worker/src/worker.ts | 46 ++++++++++++++-------- 7 files changed, 77 insertions(+), 35 deletions(-) diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 04e4f6619..9bb2ef7cf 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -2,7 +2,7 @@ import { v4 as uuid4 } from 'uuid'; import { coresdk } from '@temporalio/proto'; import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/data-converter'; -import { Worker as RealWorker, NativeWorkerLike } from '@temporalio/worker/lib/worker'; +import { Worker as RealWorker, NativeWorkerLike, WorkerOptions } from '@temporalio/worker/lib/worker'; import { sleep } from '@temporalio/worker/lib/utils'; export type Task = @@ -22,6 +22,10 @@ export class MockNativeWorker implements NativeWorkerLike { activityHeartbeatCallback?: (activityId: string, details: any) => void; reject?: (err: Error) => void; + public static async create(): Promise { + return new this(); + } + public shutdown(): void { this.activityTasks.unshift(Promise.reject(new Error('[Core::shutdown]'))); this.workflowActivations.unshift(Promise.reject(new Error('[Core::shutdown]'))); @@ -118,4 +122,9 @@ export class Worker extends RealWorker { public get native(): MockNativeWorker { return this.nativeWorker as MockNativeWorker; } + + public constructor(pwd: string, opts?: WorkerOptions) { + const nativeWorker = new MockNativeWorker(); + super(nativeWorker, pwd, opts); + } } diff --git a/packages/test/src/run-a-worker.ts b/packages/test/src/run-a-worker.ts index 313a6194d..35e81a640 100644 --- a/packages/test/src/run-a-worker.ts +++ b/packages/test/src/run-a-worker.ts @@ -1,7 +1,7 @@ import { Worker } from '@temporalio/worker'; async function main() { - const worker = new Worker(__dirname, { + const worker = await Worker.create(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib`, activitiesPath: `${__dirname}/../../test-activities/lib`, }); diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index ccc429c71..10b9c5171 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -1,5 +1,5 @@ /* eslint @typescript-eslint/no-non-null-assertion: 0 */ -import test from 'ava'; +import anyTest, { TestInterface, ExecutionContext } from 'ava'; import { v4 as uuid4 } from 'uuid'; import { Connection } from '@temporalio/client'; import { tsToMs } from '@temporalio/workflow/commonjs/time'; @@ -23,21 +23,28 @@ const { const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); +export interface Context { + worker: Worker; +} + +const test = anyTest as TestInterface; + if (RUN_INTEGRATION_TESTS) { - const worker = new Worker(__dirname, { - workflowsPath: `${__dirname}/../../test-workflows/lib`, - activitiesPath: `${__dirname}/../../test-activities/lib`, - logger: new DefaultLogger('DEBUG'), - }); + test.before(async (t) => { + const worker = await Worker.create(__dirname, { + workflowsPath: `${__dirname}/../../test-workflows/lib`, + activitiesPath: `${__dirname}/../../test-activities/lib`, + logger: new DefaultLogger('DEBUG'), + }); + t.context = { worker }; - test.before((t) => { worker.run('test').catch((err) => { console.error(err); t.fail(`Failed to run worker: ${err}`); }); }); - test.after.always(() => { - worker.shutdown(); + test.after.always((t) => { + t.context.worker.shutdown(); }); test('Workflow not found results in failure', async (t) => { diff --git a/packages/test/src/test-worker-lifecycle.ts b/packages/test/src/test-worker-lifecycle.ts index 8301a0110..6807f23ff 100644 --- a/packages/test/src/test-worker-lifecycle.ts +++ b/packages/test/src/test-worker-lifecycle.ts @@ -10,7 +10,7 @@ import { RUN_INTEGRATION_TESTS } from './helpers'; if (RUN_INTEGRATION_TESTS) { test.serial('run shuts down gracefully', async (t) => { - const worker = new Worker(__dirname, { shutdownGraceTime: '500ms', activitiesPath: null }); + const worker = await Worker.create(__dirname, { shutdownGraceTime: '500ms', activitiesPath: null }); t.is(worker.getState(), 'INITIALIZED'); const p = worker.run('shutdown-test'); t.is(worker.getState(), 'RUNNING'); diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index d348bb0eb..09368edfd 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -25,7 +25,8 @@ export interface ServerOptions { export interface Worker {} export declare type PollCallback = (err?: Error, result: ArrayBuffer) => void; -export declare function newWorker(serverOptions: ServerOptions): Worker; +export declare type WorkerCallback = (err?: Error, result: Worker) => void; +export declare function newWorker(serverOptions: ServerOptions, callback: WorkerCallback): void; export declare function workerShutdown(worker: Worker): void; export declare function workerPollWorkflowActivation(worker: Worker, queueName: string, callback: PollCallback): void; export declare function workerCompleteWorkflowActivation(worker: Worker, result: ArrayBuffer): void; diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 6d80ee646..13ae67ce6 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -49,9 +49,10 @@ impl Finalize for Worker {} // Below are functions exported to JS -/// Create a new worker. -/// Immediately spawns a poller thread that will block on [PollRequest]s -fn worker_new(mut cx: FunctionContext) -> JsResult { +/// Create a new worker asynchronously. +/// Immediately spawns a poller thread that will block on [Request]s +/// Worker is returned to JS using supplied callback +fn worker_new(mut cx: FunctionContext) -> JsResult { let options = cx.argument::(0)?; let url = options .get(&mut cx, "url")? @@ -80,6 +81,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .value(&mut cx) as u64, ), }; + // TODO: make this configurable let (sender, mut receiver) = channel::(1000); let worker = Worker { sender }; let worker = Arc::new(worker); @@ -94,14 +96,23 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .block_on(async { match init(CoreInitOptions { gateway_opts }).await { Ok(result) => { + queue_arc.clone().send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = cx.undefined(); + let result = cx.boxed(cloned_worker); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); let core_arc = Arc::new(result); loop { // TODO: handle this error let request = receiver.recv().await.unwrap(); let variant = request.variant; let callback = request.callback; - let core = Arc::clone(&core_arc); - let queue = Arc::clone(&queue_arc); + let core = core_arc.clone(); + let queue = queue_arc.clone(); tokio::spawn(async move { match variant { RequestVariant::PollWorkflowActivation { queue_name } => { @@ -215,7 +226,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { }) }); - Ok(cx.boxed(cloned_worker)) + Ok(cx.undefined()) } /// Initiate a single poll request. diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 41918d3b5..54e12a119 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -220,7 +220,7 @@ export interface NativeWorkerLike { } export interface WorkerConstructor { - new (options?: ServerOptions): NativeWorkerLike; + create(options?: ServerOptions): Promise; } export class NativeWorker implements NativeWorkerLike { @@ -228,9 +228,14 @@ export class NativeWorker implements NativeWorkerLike { protected readonly workflowPollFn: (worker: native.Worker, queueName: string) => Promise; protected readonly activityPollFn: (worker: native.Worker, queueName: string) => Promise; - public constructor(options?: ServerOptions) { + public static async create(options?: ServerOptions): Promise { const compiledOptions = compileServerOptions({ ...getDefaultServerOptions(), ...options }); - this.native = native.newWorker(compiledOptions); + const nativeWorker = await promisify(native.newWorker)(compiledOptions); + return new NativeWorker(nativeWorker); + } + + protected constructor(nativeWorker: native.Worker) { + this.native = nativeWorker; this.workflowPollFn = promisify(native.workerPollWorkflowActivation); this.activityPollFn = promisify(native.workerPollActivityTask); } @@ -281,10 +286,18 @@ export class Worker { * This method immediately connects to the server and will throw on connection failure. * @param pwd - Used to resolve relative paths for locating and importing activities and workflows. */ - constructor(public readonly pwd: string, options?: WorkerOptions) { - // Typescript derives the type of `this.constructor` as Function, work around it by casting to any - const nativeWorkerCtor: WorkerConstructor = (this.constructor as any).nativeWorkerCtor; - this.nativeWorker = new nativeWorkerCtor(options?.serverOptions); + public static async create(pwd: string, options?: WorkerOptions): Promise { + const nativeWorkerCtor: WorkerConstructor = this.nativeWorkerCtor; + const nativeWorker = await nativeWorkerCtor.create(options?.serverOptions); + return new this(nativeWorker, pwd, options); + } + + /** + * Create a new Worker from nativeWorker. + * @param pwd - Used to resolve relative paths for locating and importing activities and workflows. + */ + protected constructor(nativeWorker: NativeWorkerLike, public readonly pwd: string, options?: WorkerOptions) { + this.nativeWorker = nativeWorker; // TODO: merge activityDefaults this.options = compileWorkerOptions({ ...getDefaultOptions(pwd), ...options }); @@ -435,15 +448,7 @@ export class Worker { */ protected poller$(pollFn: () => Promise): Observable { return merge(this.gracefulShutdown$(), this.pollLoop$(pollFn)).pipe( - catchError((err) => (err.message.includes('[Core::shutdown]') ? EMPTY : throwError(err))), - tap({ - complete: () => { - this.state = 'STOPPED'; - }, - error: () => { - this.state = 'FAILED'; - }, - }) + catchError((err) => (err.message.includes('[Core::shutdown]') ? EMPTY : throwError(err))) ); } @@ -687,6 +692,15 @@ export class Worker { this.activityOperator(), map((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) ) + ).pipe( + tap({ + complete: () => { + this.state = 'STOPPED'; + }, + error: () => { + this.state = 'FAILED'; + }, + }) ) ).toPromise(); } From 36db2b872d5ffe7242ce478aeb71ed7317587b49 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 10:53:02 +0300 Subject: [PATCH 05/15] Implement bridge shutdown --- packages/worker/native/src/lib.rs | 122 ++++++++++++++++-------------- 1 file changed, 64 insertions(+), 58 deletions(-) diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 13ae67ce6..463c01de1 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -7,10 +7,12 @@ use temporal_sdk_core::{ protos::coresdk::workflow_completion::WfActivationCompletion, Core, CoreInitOptions, ServerGatewayOptions, Url, }; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{channel, Sender}; -/// Possible Request variants -enum RequestVariant { +/// A request from lang to bridge to core +pub enum Request { + /// A request sent from within the bridge when it encounters a CoreError::ShuttingDown + ShutdownComplete, /// A request to shutdown core and the bridge thread, lang should wait on /// CoreError::ShuttingDown before exiting to allow draining of pending tasks Shutdown, @@ -18,23 +20,24 @@ enum RequestVariant { PollWorkflowActivation { /// Name of queue to poll queue_name: String, + /// Used to send the result back into lang + callback: Root, }, /// A request to complete a single workflow activation - CompleteWorkflowActivation { completion: WfActivationCompletion }, + CompleteWorkflowActivation { + completion: WfActivationCompletion, + /// Used to send the result back into lang + callback: Root, + }, /// A request to poll for activity tasks PollActivityTask { /// Name of queue to poll queue_name: String, + /// Used to send the result back into lang + callback: Root, }, } -/// A request from lang to bridge to core -pub struct Request { - variant: RequestVariant, - /// Used to send the result back into lang - callback: Root, -} - /// Worker struct, hold a reference for the channel sender responsible for sending requests from /// lang to core pub struct Worker { @@ -83,9 +86,10 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { }; // TODO: make this configurable let (sender, mut receiver) = channel::(1000); - let worker = Worker { sender }; - let worker = Arc::new(worker); - let queue_arc = Arc::new(cx.queue()); + let worker = Arc::new(Worker { + sender: sender.clone(), + }); + let queue = Arc::new(cx.queue()); let cloned_worker = Arc::clone(&worker); std::thread::spawn(move || { @@ -96,7 +100,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .block_on(async { match init(CoreInitOptions { gateway_opts }).await { Ok(result) => { - queue_arc.clone().send(move |mut cx| { + queue.clone().send(move |mut cx| { let callback = callback.into_inner(&mut cx); let this = cx.undefined(); let error = cx.undefined(); @@ -105,17 +109,25 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { callback.call(&mut cx, this, args)?; Ok(()) }); - let core_arc = Arc::new(result); + let core = Arc::new(result); loop { // TODO: handle this error let request = receiver.recv().await.unwrap(); - let variant = request.variant; - let callback = request.callback; - let core = core_arc.clone(); - let queue = queue_arc.clone(); + if matches!(request, Request::ShutdownComplete) { + break; + } else if matches!(request, Request::Shutdown) { + core.shutdown(); + continue; + } + let core = core.clone(); + let queue = queue.clone(); + let sender = sender.clone(); tokio::spawn(async move { - match variant { - RequestVariant::PollWorkflowActivation { queue_name } => { + match request { + Request::PollWorkflowActivation { + queue_name, + callback, + } => { match core.poll_workflow_task(&queue_name).await { Ok(task) => { queue.send(move |mut cx| { @@ -139,10 +151,15 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } Err(err) => { // TODO: on the JS side we consider all errors fatal, revise this later - // let should_break = match err { - // temporal_sdk_core::CoreError::ShuttingDown => true, - // _ => false, - // }; + if let temporal_sdk_core::CoreError::ShuttingDown = + err + { + if let Err(_) = + sender.send(Request::ShutdownComplete).await + { + // TODO: handle error + } + }; queue.send(move |mut cx| { let callback = callback.into_inner(&mut cx); let this = cx.undefined(); @@ -156,13 +173,13 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { callback.call(&mut cx, this, args)?; Ok(()) }); - // if should_break { - // break; - // } } } } - RequestVariant::CompleteWorkflowActivation { completion } => { + Request::CompleteWorkflowActivation { + completion, + callback, + } => { match core.complete_workflow_task(completion).await { Ok(()) => { queue.send(move |mut cx| { @@ -178,10 +195,15 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } Err(err) => { // TODO: on the JS side we consider all errors fatal, revise this later - // let should_break = match err { - // temporal_sdk_core::CoreError::ShuttingDown => true, - // _ => false, - // }; + if let temporal_sdk_core::CoreError::ShuttingDown = + err + { + if let Err(_) = + sender.send(Request::ShutdownComplete).await + { + // TODO: handle error + } + }; queue.send(move |mut cx| { let callback = callback.into_inner(&mut cx); let this = cx.undefined(); @@ -195,23 +217,16 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { callback.call(&mut cx, this, args)?; Ok(()) }); - // if should_break { - // break; - // } } } } - // Shutdown { - // match core.shutdown().await { - // } - // } _ => {} } }); } } Err(err) => { - let queue = queue_arc.clone(); + let queue = queue.clone(); queue.send(move |mut cx| { let callback = callback.into_inner(&mut cx); let this = cx.undefined(); @@ -235,8 +250,8 @@ fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult(0)?; let queue_name = cx.argument::(1)?.value(&mut cx); let callback = cx.argument::(2)?; - let request = Request { - variant: RequestVariant::PollWorkflowActivation { queue_name }, + let request = Request::PollWorkflowActivation { + queue_name, callback: callback.root(&mut cx), }; if let Err(err) = worker.sender.blocking_send(request) { @@ -259,8 +274,8 @@ fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult { - let request = Request { - variant: RequestVariant::CompleteWorkflowActivation { completion }, + let request = Request::CompleteWorkflowActivation { + completion, callback: callback.root(&mut cx), }; if let Err(err) = worker.sender.blocking_send(request) { @@ -281,19 +296,10 @@ fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult JsResult { let worker = cx.argument::(0)?; - let callback = cx.argument::(1)?; - let request = Request { - variant: RequestVariant::Shutdown, - callback: callback.root(&mut cx), - }; - if let Err(err) = worker.sender.blocking_send(request) { - let this = cx.undefined(); - let error = JsError::error(&mut cx, format!("{}", err))?; - let result = cx.undefined(); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; + match worker.sender.blocking_send(Request::Shutdown) { + Err(err) => cx.throw_error(format!("{}", err)), + _ => Ok(cx.undefined()), } - Ok(cx.undefined()) } register_module!(mut cx, { From d4278faeea153c192c36e42572cd9076b8178566 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 12:19:24 +0300 Subject: [PATCH 06/15] Got most of the integration tests to pass --- packages/test/src/mock-native-worker.ts | 6 +- packages/test/src/test-integration.ts | 2 +- packages/test/src/test-worker-lifecycle.ts | 4 +- packages/worker/native/index.d.ts | 17 ++++- packages/worker/native/src/lib.rs | 71 ++++++++++++++++++-- packages/worker/src/worker.ts | 78 +++++++++------------- 6 files changed, 116 insertions(+), 62 deletions(-) diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 9bb2ef7cf..e2d7bc44a 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -51,12 +51,12 @@ export class MockNativeWorker implements NativeWorkerLike { } } - public completeWorkflowActivation(result: ArrayBuffer): void { + public async completeWorkflowActivation(result: ArrayBuffer): Promise { this.workflowCompletionCallback!(result); this.workflowCompletionCallback = undefined; } - public completeActivityTask(result: ArrayBuffer): void { + public async completeActivityTask(result: ArrayBuffer): Promise { this.activityCompletionCallback!(result); this.activityCompletionCallback = undefined; } @@ -99,7 +99,7 @@ export class MockNativeWorker implements NativeWorkerLike { }; } - sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { + public async sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): Promise { const payload = details && coresdk.common.Payload.decode(new Uint8Array(details)); const arg = payload ? defaultDataConverter.fromPayload(payload) : undefined; this.activityHeartbeatCallback!(activityId, arg); diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index 10b9c5171..80262f8b7 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -1,5 +1,5 @@ /* eslint @typescript-eslint/no-non-null-assertion: 0 */ -import anyTest, { TestInterface, ExecutionContext } from 'ava'; +import anyTest, { TestInterface } from 'ava'; import { v4 as uuid4 } from 'uuid'; import { Connection } from '@temporalio/client'; import { tsToMs } from '@temporalio/workflow/commonjs/time'; diff --git a/packages/test/src/test-worker-lifecycle.ts b/packages/test/src/test-worker-lifecycle.ts index 6807f23ff..29cb661dc 100644 --- a/packages/test/src/test-worker-lifecycle.ts +++ b/packages/test/src/test-worker-lifecycle.ts @@ -9,7 +9,7 @@ import { Worker as MockedWorker } from './mock-native-worker'; import { RUN_INTEGRATION_TESTS } from './helpers'; if (RUN_INTEGRATION_TESTS) { - test.serial('run shuts down gracefully', async (t) => { + test.serial.skip('run shuts down gracefully', async (t) => { const worker = await Worker.create(__dirname, { shutdownGraceTime: '500ms', activitiesPath: null }); t.is(worker.getState(), 'INITIALIZED'); const p = worker.run('shutdown-test'); @@ -42,7 +42,7 @@ test.serial('Mocked run throws if not shut down gracefully', async (t) => { worker.native.shutdown = () => undefined; // Make sure shutdown does not emit core shutdown process.emit('SIGINT', 'SIGINT'); await t.throwsAsync(p, { - message: 'Timed out waiting while waiting for worker to shutdown gracefully', + message: 'Timed out while waiting for worker to shutdown gracefully', }); t.is(worker.getState(), 'FAILED'); await t.throwsAsync(worker.run('shutdown-test'), { message: 'Poller was aleady started' }); diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 09368edfd..8e291ef57 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -26,10 +26,21 @@ export interface Worker {} export declare type PollCallback = (err?: Error, result: ArrayBuffer) => void; export declare type WorkerCallback = (err?: Error, result: Worker) => void; +export declare type VoidCallback = (err?: Error, result: void) => void; + export declare function newWorker(serverOptions: ServerOptions, callback: WorkerCallback): void; export declare function workerShutdown(worker: Worker): void; export declare function workerPollWorkflowActivation(worker: Worker, queueName: string, callback: PollCallback): void; -export declare function workerCompleteWorkflowActivation(worker: Worker, result: ArrayBuffer): void; +export declare function workerCompleteWorkflowActivation( + worker: Worker, + result: ArrayBuffer, + callback: VoidCallback +): void; export declare function workerPollActivityTask(worker: Worker, queueName: string, callback: PollCallback): void; -export declare function workerCompleteActivityTask(worker: Worker, result: ArrayBuffer): void; -export declare function workerSendActivityHeartbeat(worker: Worker, activityId: string, details?: ArrayBuffer): void; +export declare function workerCompleteActivityTask(worker: Worker, result: ArrayBuffer, callback: VoidCallback): void; +export declare function workerSendActivityHeartbeat( + worker: Worker, + activityId: string, + details?: ArrayBuffer, + callback: VoidCallback +): void; diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 463c01de1..0048f03fe 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -176,17 +176,25 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } } } - Request::CompleteWorkflowActivation { - completion, + Request::PollActivityTask { + queue_name, callback, } => { - match core.complete_workflow_task(completion).await { - Ok(()) => { + match core.poll_activity_task(&queue_name).await { + Ok(task) => { queue.send(move |mut cx| { let callback = callback.into_inner(&mut cx); let this = cx.undefined(); let error = cx.undefined(); - let result = cx.undefined(); + let len = task.encoded_len(); + let mut result = + JsArrayBuffer::new(&mut cx, len as u32)?; + cx.borrow_mut(&mut result, |data| { + let mut slice = data.as_mut_slice::(); + if let Err(_) = task.encode(&mut slice) { + panic!("Failed to encode task") + }; + }); let args: Vec> = vec![error.upcast(), result.upcast()]; callback.call(&mut cx, this, args)?; @@ -220,6 +228,36 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } } } + Request::CompleteWorkflowActivation { + completion, + callback, + } => match core.complete_workflow_task(completion).await { + Ok(()) => { + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = cx.undefined(); + let result = cx.undefined(); + let args: Vec> = + vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + } + Err(err) => { + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = + JsError::error(&mut cx, format!("{}", err))?; + let result = cx.undefined(); + let args: Vec> = + vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); + } + }, _ => {} } }); @@ -244,7 +282,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { Ok(cx.undefined()) } -/// Initiate a single poll request. +/// Initiate a single workflow activation poll request. /// There should be only one concurrent poll request for this type. fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; @@ -264,6 +302,26 @@ fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult JsResult { + let worker = cx.argument::(0)?; + let queue_name = cx.argument::(1)?.value(&mut cx); + let callback = cx.argument::(2)?; + let request = Request::PollActivityTask { + queue_name, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + let this = cx.undefined(); + let error = JsError::error(&mut cx, format!("{}", err))?; + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + } + Ok(cx.undefined()) +} + /// Submit a workflow activation completion to core. fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; @@ -309,6 +367,7 @@ register_module!(mut cx, { "workerPollWorkflowActivation", worker_poll_workflow_activation, )?; + cx.export_function("workerPollActivityTask", worker_poll_activity_task)?; cx.export_function( "workerCompleteWorkflowActivation", worker_complete_workflow_activation, diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 54e12a119..7ce91af77 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -2,18 +2,7 @@ import { basename, extname, resolve } from 'path'; import os from 'os'; import { readdirSync } from 'fs'; import { promisify } from 'util'; -import { - BehaviorSubject, - merge, - Observable, - OperatorFunction, - Subject, - partition, - pipe, - EMPTY, - throwError, - of, -} from 'rxjs'; +import { BehaviorSubject, merge, Observable, OperatorFunction, Subject, pipe, EMPTY, throwError, of } from 'rxjs'; import { catchError, concatMap, @@ -24,7 +13,6 @@ import { map, mergeMap, repeat, - share, takeUntil, tap, } from 'rxjs/operators'; @@ -203,20 +191,26 @@ export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledW export type State = 'INITIALIZED' | 'RUNNING' | 'STOPPED' | 'STOPPING' | 'FAILED' | 'SUSPENDED'; -type TaskForWorkflow = Required<{ taskToken: Uint8Array; workflow: coresdk.workflow_activation.WFActivation }>; -type TaskForActivity = Required<{ taskToken: Uint8Array; activity: coresdk.activity_task.ActivityTask }>; - +type ExtractToPromise = T extends (err: any, result: infer R) => void ? Promise : never; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +type Last = T extends [...infer I, infer L] ? L : never; +type LastParameter any> = Last>; type OmitFirst = T extends [any, ...infer REST] ? REST : never; -type RestParams = T extends (...args: any[]) => any ? OmitFirst> : never; -type OmitFirstParam = T extends (...args: any[]) => any ? (...args: RestParams) => ReturnType : never; +type OmitLast = T extends [...infer REST, any] ? REST : never; +type OmitFirstParam = T extends (...args: any[]) => any + ? (...args: OmitFirst>) => ReturnType + : never; +type Promisify = T extends (...args: any[]) => void + ? (...args: OmitLast>) => ExtractToPromise> + : never; export interface NativeWorkerLike { shutdown: OmitFirstParam; pollWorkflowActivation(queueName: string): Promise; pollActivityTask(queueName: string): Promise; - completeWorkflowActivation: OmitFirstParam; - completeActivityTask: OmitFirstParam; - sendActivityHeartbeat: OmitFirstParam; + completeWorkflowActivation: Promisify>; + completeActivityTask: Promisify>; + sendActivityHeartbeat: Promisify>; } export interface WorkerConstructor { @@ -225,8 +219,9 @@ export interface WorkerConstructor { export class NativeWorker implements NativeWorkerLike { protected readonly native: native.Worker; - protected readonly workflowPollFn: (worker: native.Worker, queueName: string) => Promise; - protected readonly activityPollFn: (worker: native.Worker, queueName: string) => Promise; + public readonly pollWorkflowActivation: Promisify>; + public readonly pollActivityTask: Promisify>; + public readonly completeWorkflowActivation: Promisify>; public static async create(options?: ServerOptions): Promise { const compiledOptions = compileServerOptions({ ...getDefaultServerOptions(), ...options }); @@ -236,32 +231,21 @@ export class NativeWorker implements NativeWorkerLike { protected constructor(nativeWorker: native.Worker) { this.native = nativeWorker; - this.workflowPollFn = promisify(native.workerPollWorkflowActivation); - this.activityPollFn = promisify(native.workerPollActivityTask); + this.pollWorkflowActivation = promisify(native.workerPollWorkflowActivation).bind(undefined, nativeWorker); + this.pollActivityTask = promisify(native.workerPollActivityTask).bind(undefined, nativeWorker); + this.completeWorkflowActivation = promisify(native.workerCompleteWorkflowActivation).bind(undefined, nativeWorker); } public shutdown(): void { return native.workerShutdown(this.native); } - public pollWorkflowActivation(queueName: string): Promise { - return this.workflowPollFn(this.native, queueName); - } - - public pollActivityTask(queueName: string): Promise { - return this.activityPollFn(this.native, queueName); - } - - public completeWorkflowActivation(result: ArrayBuffer): void { - return native.workerCompleteWorkflowActivation(this.native, result); - } - - public completeActivityTask(result: ArrayBuffer): void { - return native.workerCompleteActivityTask(this.native, result); + public completeActivityTask(_result: ArrayBuffer): Promise { + throw new Error('Not implemented'); } - public sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void { - return native.workerSendActivityHeartbeat(this.native, activityId, details); + public sendActivityHeartbeat(_activityId: string, _details?: ArrayBuffer): Promise { + throw new Error('Not implemented'); } } @@ -409,7 +393,7 @@ export class Worker { filter((state): state is 'STOPPING' => state === 'STOPPING'), delay(this.options.shutdownGraceTimeMs), map(() => { - throw new Error('Timed out waiting while waiting for worker to shutdown gracefully'); + throw new Error('Timed out while waiting for worker to shutdown gracefully'); }) ); } @@ -637,15 +621,15 @@ export class Worker { return this.activityHeartbeatSubject.pipe( takeUntil(this.stateSubject.pipe(filter((value) => value === 'STOPPED' || value === 'FAILED'))), tap(({ activityId }) => this.log.debug('Got activity heartbeat', { activityId })), - map(({ activityId, details }) => { + mergeMap(async ({ activityId, details }) => { const payload = this.options.dataConverter.toPayload(details); if (!payload) { - this.nativeWorker.sendActivityHeartbeat(activityId); + await this.nativeWorker.sendActivityHeartbeat(activityId, undefined); return; } const arr = coresdk.common.Payload.encode(payload).finish(); - this.nativeWorker.sendActivityHeartbeat( + await this.nativeWorker.sendActivityHeartbeat( activityId, arr.buffer.slice(arr.byteOffset, arr.byteLength + arr.byteOffset) ); @@ -686,11 +670,11 @@ export class Worker { merge( workflow$.pipe( this.workflowOperator(queueName), - map((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) + mergeMap((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) ), activity$.pipe( this.activityOperator(), - map((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) + mergeMap((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) ) ).pipe( tap({ From 4576b6de5bd6034b3f5a4ac0c7fc18c79a85efc8 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 13:37:41 +0300 Subject: [PATCH 07/15] Cleanup rust code --- packages/worker/native/src/lib.rs | 173 ++++++++++++------------------ 1 file changed, 66 insertions(+), 107 deletions(-) diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 0048f03fe..d8be3849d 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -45,11 +45,55 @@ pub struct Worker { } /// Box it so we can use Worker from JS -// TODO: we might not need Arc -type BoxedWorker = JsBox>; +type BoxedWorker = JsBox; impl Finalize for Worker {} +/// Send a result to JS via callback using an [EventQueue] +fn send_result(queue: Arc, callback: Root, res_fn: F) +where + F: for<'a> FnOnce(&mut TaskContext<'a>) -> NeonResult> + Send + 'static, + T: Value, +{ + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + let error = cx.undefined(); + let result = res_fn(&mut cx)?; + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(&mut cx, this, args)?; + Ok(()) + }); +} + +/// Send an error to JS via callback using an [EventQueue] +fn send_error(queue: Arc, callback: Root, error: T) +where + T: ::std::fmt::Display + Send + 'static, +{ + queue.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + callback_with_error(&mut cx, callback, error) + }); +} + +/// Call [callback] with given error +fn callback_with_error<'a, T>( + cx: &mut impl Context<'a>, + callback: Handle, + error: T, +) -> NeonResult<()> +where + T: ::std::fmt::Display + Send + 'static, +{ + let this = cx.undefined(); + let error = JsError::error(cx, format!("{}", error))?; + let result = cx.undefined(); + let args: Vec> = vec![error.upcast(), result.upcast()]; + callback.call(cx, this, args)?; + Ok(()) +} + // Below are functions exported to JS /// Create a new worker asynchronously. @@ -86,11 +130,10 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { }; // TODO: make this configurable let (sender, mut receiver) = channel::(1000); - let worker = Arc::new(Worker { + let worker = Worker { sender: sender.clone(), - }); + }; let queue = Arc::new(cx.queue()); - let cloned_worker = Arc::clone(&worker); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread() @@ -100,15 +143,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .block_on(async { match init(CoreInitOptions { gateway_opts }).await { Ok(result) => { - queue.clone().send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = cx.undefined(); - let result = cx.boxed(cloned_worker); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); + send_result(queue.clone(), callback, |cx| Ok(cx.boxed(worker))); let core = Arc::new(result); loop { // TODO: handle this error @@ -130,23 +165,17 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } => { match core.poll_workflow_task(&queue_name).await { Ok(task) => { - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = cx.undefined(); + send_result(queue, callback, move |cx| { let len = task.encoded_len(); let mut result = - JsArrayBuffer::new(&mut cx, len as u32)?; + JsArrayBuffer::new(cx, len as u32)?; cx.borrow_mut(&mut result, |data| { let mut slice = data.as_mut_slice::(); if let Err(_) = task.encode(&mut slice) { panic!("Failed to encode task") }; }); - let args: Vec> = - vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) + Ok(result) }); } Err(err) => { @@ -160,19 +189,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { // TODO: handle error } }; - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = JsError::error( - &mut cx, - format!("{}", err), - )?; - let result = cx.undefined(); - let args: Vec> = - vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); + send_error(queue, callback, err); } } } @@ -182,23 +199,17 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } => { match core.poll_activity_task(&queue_name).await { Ok(task) => { - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = cx.undefined(); + send_result(queue, callback, move |cx| { let len = task.encoded_len(); let mut result = - JsArrayBuffer::new(&mut cx, len as u32)?; + JsArrayBuffer::new(cx, len as u32)?; cx.borrow_mut(&mut result, |data| { let mut slice = data.as_mut_slice::(); if let Err(_) = task.encode(&mut slice) { panic!("Failed to encode task") }; }); - let args: Vec> = - vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) + Ok(result) }); } Err(err) => { @@ -212,19 +223,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { // TODO: handle error } }; - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = JsError::error( - &mut cx, - format!("{}", err), - )?; - let result = cx.undefined(); - let args: Vec> = - vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); + send_error(queue, callback, err); } } } @@ -233,29 +232,10 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { callback, } => match core.complete_workflow_task(completion).await { Ok(()) => { - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = cx.undefined(); - let result = cx.undefined(); - let args: Vec> = - vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); + send_result(queue, callback, |cx| Ok(cx.undefined())); } Err(err) => { - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = - JsError::error(&mut cx, format!("{}", err))?; - let result = cx.undefined(); - let args: Vec> = - vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); + send_error(queue, callback, err); } }, _ => {} @@ -264,16 +244,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { } } Err(err) => { - let queue = queue.clone(); - queue.send(move |mut cx| { - let callback = callback.into_inner(&mut cx); - let this = cx.undefined(); - let error = JsError::error(&mut cx, format!("{}", err))?; - let result = cx.undefined(); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; - Ok(()) - }); + send_error(queue.clone(), callback, err); } } }) @@ -293,11 +264,7 @@ fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; + callback_with_error(&mut cx, callback, err)?; } Ok(cx.undefined()) } @@ -313,11 +280,7 @@ fn worker_poll_activity_task(mut cx: FunctionContext) -> JsResult { callback: callback.root(&mut cx), }; if let Err(err) = worker.sender.blocking_send(request) { - let this = cx.undefined(); - let error = JsError::error(&mut cx, format!("{}", err))?; - let result = cx.undefined(); - let args: Vec> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; + callback_with_error(&mut cx, callback, err)?; } Ok(cx.undefined()) } @@ -337,16 +300,12 @@ fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult> = vec![error.upcast(), result.upcast()]; - callback.call(&mut cx, this, args)?; + callback_with_error(&mut cx, callback, err)?; }; - Ok(cx.undefined()) } - Err(_) => cx.throw_type_error("Cannot decode Completion from buffer"), - } + Err(_) => callback_with_error(&mut cx, callback, "Cannot decode Completion from buffer")?, + }; + Ok(cx.undefined()) } /// Request shutdown of the worker. From c74fe261bca5f054537ee7b21496023e5d0c82e1 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 15:20:22 +0300 Subject: [PATCH 08/15] Cleanup bridge, add ActivityTaskCompletion --- packages/test/src/mock-native-worker.ts | 12 +-- packages/test/src/test-worker-activities.ts | 7 +- packages/worker/native/sdk-core | 2 +- packages/worker/native/src/lib.rs | 95 +++++++++++++++++---- packages/worker/src/worker.ts | 5 +- 5 files changed, 87 insertions(+), 34 deletions(-) diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index e2d7bc44a..3209fb6f1 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -9,11 +9,6 @@ export type Task = | { workflow: coresdk.workflow_activation.IWFActivation } | { activity: coresdk.activity_task.IActivityTask }; -export interface ActivityCompletion { - taskToken?: Uint8Array | null; - result: coresdk.activity_result.ActivityResult; -} - export class MockNativeWorker implements NativeWorkerLike { activityTasks: Array> = []; workflowActivations: Array> = []; @@ -86,17 +81,14 @@ export class MockNativeWorker implements NativeWorkerLike { return coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(new Uint8Array(result)); } - public async runActivityTask(task: coresdk.activity_task.IActivityTask): Promise { + public async runActivityTask(task: coresdk.activity_task.IActivityTask): Promise { const arr = coresdk.activity_task.ActivityTask.encode(task).finish(); const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength); const result = await new Promise((resolve) => { this.activityCompletionCallback = resolve; this.activityTasks.unshift(Promise.resolve(buffer)); }); - return { - taskToken: task.taskToken, - result: coresdk.activity_result.ActivityResult.decodeDelimited(new Uint8Array(result)), - }; + return coresdk.ActivityTaskCompletion.decodeDelimited(new Uint8Array(result)); } public async sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): Promise { diff --git a/packages/test/src/test-worker-activities.ts b/packages/test/src/test-worker-activities.ts index 8bdecb09c..68b7df279 100644 --- a/packages/test/src/test-worker-activities.ts +++ b/packages/test/src/test-worker-activities.ts @@ -31,10 +31,13 @@ test.beforeEach((t) => { function compareCompletion( t: ExecutionContext, - actual: coresdk.activity_result.ActivityResult, + actual: coresdk.activity_result.IActivityResult | null | undefined, expected: coresdk.activity_result.IActivityResult ) { - t.deepEqual(actual.toJSON(), coresdk.activity_result.ActivityResult.create(expected).toJSON()); + t.deepEqual( + coresdk.activity_result.ActivityResult.create(actual || undefined).toJSON(), + coresdk.activity_result.ActivityResult.create(expected).toJSON() + ); } test('Worker runs an activity and reports completion', async (t) => { diff --git a/packages/worker/native/sdk-core b/packages/worker/native/sdk-core index 0a516125c..264281b3a 160000 --- a/packages/worker/native/sdk-core +++ b/packages/worker/native/sdk-core @@ -1 +1 @@ -Subproject commit 0a516125c9edba62c880be6af89084a5cfcf6226 +Subproject commit 264281b3aacf79d4dfa21c839aba39f4101d7fd6 diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index d8be3849d..368c860f5 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -2,44 +2,57 @@ use neon::{prelude::*, register_module}; use prost::Message; use std::{sync::Arc, time::Duration}; use temporal_sdk_core::{ - init, protos::coresdk::activity_result::ActivityResult, - protos::coresdk::workflow_activation::WfActivation, - protos::coresdk::workflow_completion::WfActivationCompletion, Core, CoreInitOptions, - ServerGatewayOptions, Url, + init, protos::coresdk::workflow_completion::WfActivationCompletion, + protos::coresdk::ActivityHeartbeat, protos::coresdk::ActivityTaskCompletion, Core, + CoreInitOptions, ServerGatewayOptions, Url, }; use tokio::sync::mpsc::{channel, Sender}; -/// A request from lang to bridge to core +/// A request from JS to bridge to core pub enum Request { - /// A request sent from within the bridge when it encounters a CoreError::ShuttingDown - ShutdownComplete, - /// A request to shutdown core and the bridge thread, lang should wait on - /// CoreError::ShuttingDown before exiting to allow draining of pending tasks + /// A request to break from the thread loop sent from within the bridge when + /// it encounters a CoreError::ShuttingDown + /// TODO: this is odd, see if this should be sent from JS + BreakPoller, + /// A request to shutdown core, JS should wait on CoreError::ShuttingDown + /// before exiting to allow draining of pending tasks Shutdown, /// A request to poll for workflow activations PollWorkflowActivation { /// Name of queue to poll queue_name: String, - /// Used to send the result back into lang + /// Used to send the result back into JS callback: Root, }, /// A request to complete a single workflow activation CompleteWorkflowActivation { completion: WfActivationCompletion, - /// Used to send the result back into lang + /// Used to send the result back into JS callback: Root, }, /// A request to poll for activity tasks PollActivityTask { /// Name of queue to poll queue_name: String, - /// Used to send the result back into lang + /// Used to report completion or error back into JS + callback: Root, + }, + /// A request to complete a single activity task + CompleteActivityTask { + completion: ActivityTaskCompletion, + /// Used to send the result back into JS + callback: Root, + }, + /// A request to send a heartbeat from a running activity + SendActivityHeartbeat { + heartbeat: ActivityHeartbeat, + /// Used to send the result back into JS callback: Root, }, } /// Worker struct, hold a reference for the channel sender responsible for sending requests from -/// lang to core +/// JS to core pub struct Worker { sender: Sender, } @@ -148,7 +161,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { loop { // TODO: handle this error let request = receiver.recv().await.unwrap(); - if matches!(request, Request::ShutdownComplete) { + if matches!(request, Request::BreakPoller) { break; } else if matches!(request, Request::Shutdown) { core.shutdown(); @@ -184,7 +197,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { err { if let Err(_) = - sender.send(Request::ShutdownComplete).await + sender.send(Request::BreakPoller).await { // TODO: handle error } @@ -218,7 +231,7 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { err { if let Err(_) = - sender.send(Request::ShutdownComplete).await + sender.send(Request::BreakPoller).await { // TODO: handle error } @@ -238,7 +251,31 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { send_error(queue, callback, err); } }, - _ => {} + Request::CompleteActivityTask { + completion, + callback, + } => match core.complete_activity_task(completion).await { + Ok(()) => { + send_result(queue, callback, |cx| Ok(cx.undefined())); + } + Err(err) => { + send_error(queue, callback, err); + } + }, + Request::SendActivityHeartbeat { + heartbeat, + callback, + } => match core.send_activity_heartbeat(heartbeat).await { + Ok(()) => { + send_result(queue, callback, |cx| Ok(cx.undefined())); + } + Err(err) => { + send_error(queue, callback, err); + } + }, + // Ignore BreakPoller and Shutdown, they're handled above + Request::BreakPoller => {} + Request::Shutdown => {} } }); } @@ -308,6 +345,29 @@ fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult JsResult { + let worker = cx.argument::(0)?; + let result = cx.argument::(1)?; + let callback = cx.argument::(2)?; + let result = cx.borrow(&result, |data| { + ActivityTaskCompletion::decode_length_delimited(data.as_slice::()) + }); + match result { + Ok(completion) => { + let request = Request::CompleteActivityTask { + completion, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; + } + Err(_) => callback_with_error(&mut cx, callback, "Cannot decode Completion from buffer")?, + }; + Ok(cx.undefined()) +} + /// Request shutdown of the worker. /// Caller should wait until a [CoreError::ShuttingDown] is returned from poll to ensure graceful /// shutdown. @@ -331,5 +391,6 @@ register_module!(mut cx, { "workerCompleteWorkflowActivation", worker_complete_workflow_activation, )?; + cx.export_function("workerCompleteActivityTask", worker_complete_activity_task)?; Ok(()) }); diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 7ce91af77..c4a192fa8 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -525,10 +525,7 @@ export class Worker { return { taskToken, result }; }), filter((result: T): result is Exclude => result !== undefined), - map(({ taskToken, result }) => - // TODO: taskToken: taskToken, - coresdk.activity_result.ActivityResult.encodeDelimited(result).finish() - ), + map((result) => coresdk.ActivityTaskCompletion.encodeDelimited(result).finish()), tap(group$.close) ); }) From fbfcbee0e2b551a4faab5c385ae0a272fbfbeb3e Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 15:30:19 +0300 Subject: [PATCH 09/15] Fix docs for Worker creation --- docs/logging.md | 4 ++-- packages/create-project/samples/worker.ts | 2 +- packages/meta/README.md | 2 +- packages/worker/README.md | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/logging.md b/docs/logging.md index e8f71a436..765248664 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -13,7 +13,7 @@ import { Worker, DefaultLogger } from '@temporalio/worker'; const logger = new DefaultLogger('WARNING', (severity, message, meta) => { /* Implement this in order to generate custom log output */ }); -const worker = new Worker(__dirname, { logger }); +const worker = await Worker.create(__dirname, { logger }); ``` #### BYOL - Bring your own logger @@ -27,5 +27,5 @@ const logger = winston.createLogger({ format: winston.format.json(), transports: [new winston.transports.Console()], }); -const worker = new Worker(__dirname, { logger }); +const worker = await Worker.create(__dirname, { logger }); ``` diff --git a/packages/create-project/samples/worker.ts b/packages/create-project/samples/worker.ts index 3a9ffbcb5..ea72509fd 100644 --- a/packages/create-project/samples/worker.ts +++ b/packages/create-project/samples/worker.ts @@ -2,7 +2,7 @@ import { Worker } from '@temporalio/worker'; async function run() { // Automatically locate and register activities and workflows - const worker = new Worker(__dirname); + const worker = await Worker.create(__dirname); // Bind to the `tutorial` queue and start accepting tasks await worker.run('tutorial'); } diff --git a/packages/meta/README.md b/packages/meta/README.md index e4a16d444..d5107f1ea 100644 --- a/packages/meta/README.md +++ b/packages/meta/README.md @@ -121,7 +121,7 @@ import { Worker } from '@temporalio/worker'; (async () => { // Automatically locate and register activities and workflows - const worker = new Worker(__dirname); + const worker = await Worker.create(__dirname); // Bind to the `tutorial` queue and start accepting tasks await worker.run('tutorial'); })(); diff --git a/packages/worker/README.md b/packages/worker/README.md index c3ac560bc..ec755c2a9 100644 --- a/packages/worker/README.md +++ b/packages/worker/README.md @@ -15,7 +15,7 @@ async function run() => { // (assuming package was bootstrapped with `npm init @temporalio`). // Worker connects to localhost by default and uses console error for logging. // Customize the worker by passing options a second parameter to the constructor. - const worker = new Worker(__dirname); + const worker = await Worker.create(__dirname); // Bind to the `tutorial` queue and start accepting tasks await worker.run('tutorial'); } From d2dc788395d354885cfdcd2fa467ef1ab2842ff6 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 15:51:58 +0300 Subject: [PATCH 10/15] Hook up all of the async core methods via bridge --- packages/worker/native/src/lib.rs | 33 ++++++++++++++++++++++++++++++- packages/worker/src/worker.ts | 20 ++++++------------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 368c860f5..592ebe036 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -345,7 +345,7 @@ fn worker_complete_workflow_activation(mut cx: FunctionContext) -> JsResult JsResult { let worker = cx.argument::(0)?; let result = cx.argument::(1)?; @@ -368,6 +368,33 @@ fn worker_complete_activity_task(mut cx: FunctionContext) -> JsResult JsResult { + let worker = cx.argument::(0)?; + let result = cx.argument::(1)?; + let callback = cx.argument::(2)?; + let result = cx.borrow(&result, |data| { + ActivityHeartbeat::decode_length_delimited(data.as_slice::()) + }); + match result { + Ok(heartbeat) => { + let request = Request::SendActivityHeartbeat { + heartbeat, + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; + } + Err(_) => callback_with_error( + &mut cx, + callback, + "Cannot decode ActivityHeartbeat from buffer", + )?, + }; + Ok(cx.undefined()) +} + /// Request shutdown of the worker. /// Caller should wait until a [CoreError::ShuttingDown] is returned from poll to ensure graceful /// shutdown. @@ -392,5 +419,9 @@ register_module!(mut cx, { worker_complete_workflow_activation, )?; cx.export_function("workerCompleteActivityTask", worker_complete_activity_task)?; + cx.export_function( + "workerSendActivityHeartbeat", + worker_send_activity_heartbeat, + )?; Ok(()) }); diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index c4a192fa8..734da067d 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -218,10 +218,12 @@ export interface WorkerConstructor { } export class NativeWorker implements NativeWorkerLike { - protected readonly native: native.Worker; public readonly pollWorkflowActivation: Promisify>; public readonly pollActivityTask: Promisify>; public readonly completeWorkflowActivation: Promisify>; + public readonly completeActivityTask: Promisify>; + public readonly sendActivityHeartbeat: Promisify>; + public readonly shutdown: OmitFirstParam; public static async create(options?: ServerOptions): Promise { const compiledOptions = compileServerOptions({ ...getDefaultServerOptions(), ...options }); @@ -230,22 +232,12 @@ export class NativeWorker implements NativeWorkerLike { } protected constructor(nativeWorker: native.Worker) { - this.native = nativeWorker; this.pollWorkflowActivation = promisify(native.workerPollWorkflowActivation).bind(undefined, nativeWorker); this.pollActivityTask = promisify(native.workerPollActivityTask).bind(undefined, nativeWorker); this.completeWorkflowActivation = promisify(native.workerCompleteWorkflowActivation).bind(undefined, nativeWorker); - } - - public shutdown(): void { - return native.workerShutdown(this.native); - } - - public completeActivityTask(_result: ArrayBuffer): Promise { - throw new Error('Not implemented'); - } - - public sendActivityHeartbeat(_activityId: string, _details?: ArrayBuffer): Promise { - throw new Error('Not implemented'); + this.completeActivityTask = promisify(native.workerCompleteActivityTask).bind(undefined, nativeWorker); + this.sendActivityHeartbeat = promisify(native.workerSendActivityHeartbeat).bind(undefined, nativeWorker); + this.shutdown = native.workerShutdown.bind(undefined, nativeWorker); } } From 5c135a6376efa954e42b6d71dc8510d860ad2b03 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 16:08:45 +0300 Subject: [PATCH 11/15] Clean up worker code --- packages/worker/src/worker.ts | 43 +++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 734da067d..8840aea49 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -259,7 +259,7 @@ export class Worker { /** * Create a new Worker. - * This method immediately connects to the server and will throw on connection failure. + * This method initiates a connection to the server and will throw (asynchronously) on connection failure. * @param pwd - Used to resolve relative paths for locating and importing activities and workflows. */ public static async create(pwd: string, options?: WorkerOptions): Promise { @@ -518,7 +518,7 @@ export class Worker { }), filter((result: T): result is Exclude => result !== undefined), map((result) => coresdk.ActivityTaskCompletion.encodeDelimited(result).finish()), - tap(group$.close) + tap(group$.close) // Close the group after activity task completion ); }) ); @@ -627,7 +627,31 @@ export class Worker { } /** - * Start polling on tasks, completes after graceful shutdown after a receiving a shutdown signal + * Poll core for `WFActivation`s while respecting worker state + */ + protected workflow$(queueName: string): Observable { + return this.poller$(async () => { + const buffer = await this.nativeWorker.pollWorkflowActivation(queueName); + const task = coresdk.workflow_activation.WFActivation.decode(new Uint8Array(buffer)); + this.log.debug('Got workflow task', task); + return task; + }); + } + + /** + * Poll core for `ActivityTask`s while respecting worker state + */ + protected activity$(queueName: string): Observable { + return this.poller$(async () => { + const buffer = await this.nativeWorker.pollActivityTask(queueName); + const task = coresdk.activity_task.ActivityTask.decode(new Uint8Array(buffer)); + this.log.debug('Got activity task', task); + return task; + }); + } + + /** + * Start polling on tasks, completes after graceful shutdown due to receiving a shutdown signal * or call to {@link shutdown}. */ async run(queueName: string): Promise { @@ -645,23 +669,14 @@ export class Worker { for (const signal of this.options.shutdownSignals) { process.on(signal, startShutdownSequence); } - const workflow$ = this.poller$(async () => { - const buffer = await this.nativeWorker.pollWorkflowActivation(queueName); - return coresdk.workflow_activation.WFActivation.decode(new Uint8Array(buffer)); - }).pipe(tap((task) => this.log.debug('Got workflow task', task))); - const activity$ = this.poller$(async () => { - const buffer = await this.nativeWorker.pollActivityTask(queueName); - return coresdk.activity_task.ActivityTask.decode(new Uint8Array(buffer)); - }).pipe(tap((task) => this.log.debug('Got activity task', task))); - return await merge( this.activityHeartbeat$(), merge( - workflow$.pipe( + this.workflow$(queueName).pipe( this.workflowOperator(queueName), mergeMap((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) ), - activity$.pipe( + this.activity$(queueName).pipe( this.activityOperator(), mergeMap((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) ) From 4c33cc9e0e8ebead93683ccefad86c1c0027ffc1 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 16:17:53 +0300 Subject: [PATCH 12/15] nskip the activity integration test --- packages/test/src/test-integration.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index 80262f8b7..3b1981777 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -82,12 +82,11 @@ if (RUN_INTEGRATION_TESTS) { t.throwsAsync(promise, { message: /just because/, instanceOf: WorkflowExecutionFailedError }); }); - // Activities not yet properly implemented - test.skip('http', async (t) => { + test('http', async (t) => { const client = new Connection(); const workflow = client.workflow('http', { taskQueue: 'test' }); const res = await workflow.start(); - t.is(res, [await httpGet('https://google.com'), await httpGet('http://example.com')]); + t.deepEqual(res, [await httpGet('https://google.com'), await httpGet('http://example.com')]); }); test('set-timeout', async (t) => { From e3e3d6a12e796cc95269432a5d763e1dd78f9ed5 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 30 Mar 2021 19:32:27 +0300 Subject: [PATCH 13/15] Update core submodule --- packages/worker/native/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/worker/native/sdk-core b/packages/worker/native/sdk-core index 264281b3a..827de8e21 160000 --- a/packages/worker/native/sdk-core +++ b/packages/worker/native/sdk-core @@ -1 +1 @@ -Subproject commit 264281b3aacf79d4dfa21c839aba39f4101d7fd6 +Subproject commit 827de8e21461fe37dbbded63af3f42a025c3e8ab From 567d2474fbee9142f19505769510c17d5d7d0ade Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Wed, 31 Mar 2021 08:59:56 +0300 Subject: [PATCH 14/15] Update core submodule --- packages/worker/native/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/worker/native/sdk-core b/packages/worker/native/sdk-core index 827de8e21..6180a4c2d 160000 --- a/packages/worker/native/sdk-core +++ b/packages/worker/native/sdk-core @@ -1 +1 @@ -Subproject commit 827de8e21461fe37dbbded63af3f42a025c3e8ab +Subproject commit 6180a4c2dd36f3242c86f4fba8c1abd6e2a84e16 From 0418eea51b75268e1f2e71e577d8fee1a8582195 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Wed, 31 Mar 2021 14:51:24 +0300 Subject: [PATCH 15/15] Address CR comments, improve shutdown logic --- packages/test/src/mock-native-worker.ts | 8 +- packages/worker/native/Cargo.lock | 12 - packages/worker/native/index.d.ts | 1 + packages/worker/native/src/lib.rs | 339 +++++++++++++----------- packages/worker/src/worker.ts | 55 ++-- 5 files changed, 227 insertions(+), 188 deletions(-) diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 3209fb6f1..a8af6ff04 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -21,9 +21,13 @@ export class MockNativeWorker implements NativeWorkerLike { return new this(); } + public async breakLoop(): Promise { + // Nothing to break from + } + public shutdown(): void { - this.activityTasks.unshift(Promise.reject(new Error('[Core::shutdown]'))); - this.workflowActivations.unshift(Promise.reject(new Error('[Core::shutdown]'))); + this.activityTasks.unshift(Promise.reject(new Error('Core is shut down'))); + this.workflowActivations.unshift(Promise.reject(new Error('Core is shut down'))); } public async pollWorkflowActivation(_queueName: string): Promise { diff --git a/packages/worker/native/Cargo.lock b/packages/worker/native/Cargo.lock index 723b1c8d3..035142ce6 100644 --- a/packages/worker/native/Cargo.lock +++ b/packages/worker/native/Cargo.lock @@ -206,17 +206,6 @@ dependencies = [ "syn", ] -[[package]] -name = "displaydoc" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3274a6bc8a6a4521291b53b9dcb8345e963fe931c3fc462a7d3ead71d7ccd30d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "either" version = "1.6.1" @@ -1185,7 +1174,6 @@ dependencies = [ "crossbeam", "dashmap", "derive_more", - "displaydoc", "futures", "itertools 0.10.0", "once_cell", diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 8e291ef57..889aa89a9 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -30,6 +30,7 @@ export declare type VoidCallback = (err?: Error, result: void) => void; export declare function newWorker(serverOptions: ServerOptions, callback: WorkerCallback): void; export declare function workerShutdown(worker: Worker): void; +export declare function workerBreakLoop(worker: Worker, callback: VoidCallback): void; export declare function workerPollWorkflowActivation(worker: Worker, queueName: string, callback: PollCallback): void; export declare function workerCompleteWorkflowActivation( worker: Worker, diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 592ebe036..ba8c1cb2e 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -1,6 +1,6 @@ use neon::{prelude::*, register_module}; use prost::Message; -use std::{sync::Arc, time::Duration}; +use std::{fmt::Display, future::Future, sync::Arc, time::Duration}; use temporal_sdk_core::{ init, protos::coresdk::workflow_completion::WfActivationCompletion, protos::coresdk::ActivityHeartbeat, protos::coresdk::ActivityTaskCompletion, Core, @@ -10,10 +10,10 @@ use tokio::sync::mpsc::{channel, Sender}; /// A request from JS to bridge to core pub enum Request { - /// A request to break from the thread loop sent from within the bridge when - /// it encounters a CoreError::ShuttingDown - /// TODO: this is odd, see if this should be sent from JS - BreakPoller, + /// A request to break from the thread loop, should be sent from JS when it + /// encounters a CoreError::ShuttingDown and there are no outstanding + /// completions + BreakLoop { callback: Root }, /// A request to shutdown core, JS should wait on CoreError::ShuttingDown /// before exiting to allow draining of pending tasks Shutdown, @@ -52,7 +52,7 @@ pub enum Request { } /// Worker struct, hold a reference for the channel sender responsible for sending requests from -/// JS to core +/// JS to a bridge thread which forwards them to core pub struct Worker { sender: Sender, } @@ -82,7 +82,7 @@ where /// Send an error to JS via callback using an [EventQueue] fn send_error(queue: Arc, callback: Root, error: T) where - T: ::std::fmt::Display + Send + 'static, + T: Display + Send + 'static, { queue.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -97,9 +97,10 @@ fn callback_with_error<'a, T>( error: T, ) -> NeonResult<()> where - T: ::std::fmt::Display + Send + 'static, + T: Display + Send + 'static, { let this = cx.undefined(); + // TODO: create better JS error types let error = JsError::error(cx, format!("{}", error))?; let result = cx.undefined(); let args: Vec> = vec![error.upcast(), result.upcast()]; @@ -107,6 +108,170 @@ where Ok(()) } +/// When Future completes, call given JS callback using a neon::EventQueue with either error or +/// undefined +async fn void_future_to_js(queue: Arc, callback: Root, f: F) -> () +where + E: Display + Send + 'static, + F: Future> + Send + 'static, +{ + match f.await { + Ok(()) => { + send_result(queue, callback, |cx| Ok(cx.undefined())); + } + Err(err) => { + send_error(queue, callback, err); + } + } +} + +/// Builds a tokio runtime and starts polling on [Request]s via an internal channel. +/// Bridges requests from JS to core and sends responses back to JS using a neon::EventQueue. +/// Blocks current thread until a [BreakPoller] request is received in channel. +fn start_bridge_loop( + core_init_options: CoreInitOptions, + queue: Arc, + callback: Root, +) { + // TODO: make capacity configurable + let (sender, mut receiver) = channel::(1000); + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + match init(core_init_options).await { + Err(err) => { + send_error(queue.clone(), callback, err); + } + Ok(result) => { + send_result( + queue.clone(), + callback, + |cx| Ok(cx.boxed(Worker { sender })), + ); + let core = Arc::new(result); + loop { + // TODO: handle this error + let request = receiver.recv().await.unwrap(); + let core = core.clone(); + let queue = queue.clone(); + + match request { + Request::Shutdown => { + core.shutdown(); + } + Request::BreakLoop { callback } => { + send_result(queue, callback, |cx| Ok(cx.undefined())); + break; + } + Request::PollWorkflowActivation { + queue_name, + callback, + } => { + tokio::spawn(handle_poll_workflow_activation_request( + core, queue, queue_name, callback, + )); + } + Request::PollActivityTask { + queue_name, + callback, + } => { + tokio::spawn(handle_poll_activity_task_request( + core, queue, queue_name, callback, + )); + } + Request::CompleteWorkflowActivation { + completion, + callback, + } => { + tokio::spawn(void_future_to_js(queue, callback, async move { + core.complete_workflow_task(completion).await + })); + } + Request::CompleteActivityTask { + completion, + callback, + } => { + tokio::spawn(void_future_to_js(queue, callback, async move { + core.complete_activity_task(completion).await + })); + } + Request::SendActivityHeartbeat { + heartbeat, + callback, + } => { + tokio::spawn(void_future_to_js(queue, callback, async move { + // send_activity_heartbeat returns Result<(), ()> and () + // doesn't implement Display, syntesize an error + match core.send_activity_heartbeat(heartbeat).await { + Ok(()) => Ok(()), + Err(()) => Err("Failed to send activity heartbeat"), + } + })); + } + } + } + } + } + }) +} + +/// Called within the poll loop thread, calls core and triggers JS callback with result +async fn handle_poll_workflow_activation_request( + core: Arc, + queue: Arc, + queue_name: String, + callback: Root, +) { + match core.poll_workflow_task(&queue_name).await { + Ok(task) => { + send_result(queue, callback, move |cx| { + let len = task.encoded_len(); + let mut result = JsArrayBuffer::new(cx, len as u32)?; + cx.borrow_mut(&mut result, |data| { + let mut slice = data.as_mut_slice::(); + if let Err(_) = task.encode(&mut slice) { + panic!("Failed to encode task") + }; + }); + Ok(result) + }); + } + Err(err) => { + send_error(queue, callback, err); + } + } +} + +/// Called within the poll loop thread, calls core and triggers JS callback with result +async fn handle_poll_activity_task_request( + core: Arc, + queue: Arc, + queue_name: String, + callback: Root, +) { + match core.poll_activity_task(&queue_name).await { + Ok(task) => { + send_result(queue, callback, move |cx| { + let len = task.encoded_len(); + let mut result = JsArrayBuffer::new(cx, len as u32)?; + cx.borrow_mut(&mut result, |data| { + let mut slice = data.as_mut_slice::(); + if let Err(_) = task.encode(&mut slice) { + panic!("Failed to encode task") + }; + }); + Ok(result) + }); + } + Err(err) => { + send_error(queue, callback, err); + } + } +} + // Below are functions exported to JS /// Create a new worker asynchronously. @@ -141,155 +306,28 @@ fn worker_new(mut cx: FunctionContext) -> JsResult { .value(&mut cx) as u64, ), }; - // TODO: make this configurable - let (sender, mut receiver) = channel::(1000); - let worker = Worker { - sender: sender.clone(), - }; - let queue = Arc::new(cx.queue()); + let queue = Arc::new(cx.queue()); std::thread::spawn(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - match init(CoreInitOptions { gateway_opts }).await { - Ok(result) => { - send_result(queue.clone(), callback, |cx| Ok(cx.boxed(worker))); - let core = Arc::new(result); - loop { - // TODO: handle this error - let request = receiver.recv().await.unwrap(); - if matches!(request, Request::BreakPoller) { - break; - } else if matches!(request, Request::Shutdown) { - core.shutdown(); - continue; - } - let core = core.clone(); - let queue = queue.clone(); - let sender = sender.clone(); - tokio::spawn(async move { - match request { - Request::PollWorkflowActivation { - queue_name, - callback, - } => { - match core.poll_workflow_task(&queue_name).await { - Ok(task) => { - send_result(queue, callback, move |cx| { - let len = task.encoded_len(); - let mut result = - JsArrayBuffer::new(cx, len as u32)?; - cx.borrow_mut(&mut result, |data| { - let mut slice = data.as_mut_slice::(); - if let Err(_) = task.encode(&mut slice) { - panic!("Failed to encode task") - }; - }); - Ok(result) - }); - } - Err(err) => { - // TODO: on the JS side we consider all errors fatal, revise this later - if let temporal_sdk_core::CoreError::ShuttingDown = - err - { - if let Err(_) = - sender.send(Request::BreakPoller).await - { - // TODO: handle error - } - }; - send_error(queue, callback, err); - } - } - } - Request::PollActivityTask { - queue_name, - callback, - } => { - match core.poll_activity_task(&queue_name).await { - Ok(task) => { - send_result(queue, callback, move |cx| { - let len = task.encoded_len(); - let mut result = - JsArrayBuffer::new(cx, len as u32)?; - cx.borrow_mut(&mut result, |data| { - let mut slice = data.as_mut_slice::(); - if let Err(_) = task.encode(&mut slice) { - panic!("Failed to encode task") - }; - }); - Ok(result) - }); - } - Err(err) => { - // TODO: on the JS side we consider all errors fatal, revise this later - if let temporal_sdk_core::CoreError::ShuttingDown = - err - { - if let Err(_) = - sender.send(Request::BreakPoller).await - { - // TODO: handle error - } - }; - send_error(queue, callback, err); - } - } - } - Request::CompleteWorkflowActivation { - completion, - callback, - } => match core.complete_workflow_task(completion).await { - Ok(()) => { - send_result(queue, callback, |cx| Ok(cx.undefined())); - } - Err(err) => { - send_error(queue, callback, err); - } - }, - Request::CompleteActivityTask { - completion, - callback, - } => match core.complete_activity_task(completion).await { - Ok(()) => { - send_result(queue, callback, |cx| Ok(cx.undefined())); - } - Err(err) => { - send_error(queue, callback, err); - } - }, - Request::SendActivityHeartbeat { - heartbeat, - callback, - } => match core.send_activity_heartbeat(heartbeat).await { - Ok(()) => { - send_result(queue, callback, |cx| Ok(cx.undefined())); - } - Err(err) => { - send_error(queue, callback, err); - } - }, - // Ignore BreakPoller and Shutdown, they're handled above - Request::BreakPoller => {} - Request::Shutdown => {} - } - }); - } - } - Err(err) => { - send_error(queue.clone(), callback, err); - } - } - }) + start_bridge_loop(CoreInitOptions { gateway_opts }, queue, callback) }); Ok(cx.undefined()) } +/// Cause the bridge loop to break, freeing up the thread +fn worker_break_loop(mut cx: FunctionContext) -> JsResult { + let worker = cx.argument::(0)?; + let callback = cx.argument::(1)?; + let request = Request::BreakLoop { + callback: callback.root(&mut cx), + }; + if let Err(err) = worker.sender.blocking_send(request) { + callback_with_error(&mut cx, callback, err)?; + }; + Ok(cx.undefined()) +} + /// Initiate a single workflow activation poll request. /// There should be only one concurrent poll request for this type. fn worker_poll_workflow_activation(mut cx: FunctionContext) -> JsResult { @@ -409,6 +447,7 @@ fn worker_shutdown(mut cx: FunctionContext) -> JsResult { register_module!(mut cx, { cx.export_function("newWorker", worker_new)?; cx.export_function("workerShutdown", worker_shutdown)?; + cx.export_function("workerBreakLoop", worker_break_loop)?; cx.export_function( "workerPollWorkflowActivation", worker_poll_workflow_activation, diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 8840aea49..7882d342c 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -206,8 +206,9 @@ type Promisify = T extends (...args: any[]) => void export interface NativeWorkerLike { shutdown: OmitFirstParam; - pollWorkflowActivation(queueName: string): Promise; - pollActivityTask(queueName: string): Promise; + breakLoop: Promisify>; + pollWorkflowActivation: Promisify>; + pollActivityTask: Promisify>; completeWorkflowActivation: Promisify>; completeActivityTask: Promisify>; sendActivityHeartbeat: Promisify>; @@ -223,6 +224,7 @@ export class NativeWorker implements NativeWorkerLike { public readonly completeWorkflowActivation: Promisify>; public readonly completeActivityTask: Promisify>; public readonly sendActivityHeartbeat: Promisify>; + public readonly breakLoop: Promisify>; public readonly shutdown: OmitFirstParam; public static async create(options?: ServerOptions): Promise { @@ -237,6 +239,7 @@ export class NativeWorker implements NativeWorkerLike { this.completeWorkflowActivation = promisify(native.workerCompleteWorkflowActivation).bind(undefined, nativeWorker); this.completeActivityTask = promisify(native.workerCompleteActivityTask).bind(undefined, nativeWorker); this.sendActivityHeartbeat = promisify(native.workerSendActivityHeartbeat).bind(undefined, nativeWorker); + this.breakLoop = promisify(native.workerBreakLoop).bind(undefined, nativeWorker); this.shutdown = native.workerShutdown.bind(undefined, nativeWorker); } } @@ -424,7 +427,7 @@ export class Worker { */ protected poller$(pollFn: () => Promise): Observable { return merge(this.gracefulShutdown$(), this.pollLoop$(pollFn)).pipe( - catchError((err) => (err.message.includes('[Core::shutdown]') ? EMPTY : throwError(err))) + catchError((err) => (err.message.includes('Core is shut down') ? EMPTY : throwError(err))) ); } @@ -669,27 +672,31 @@ export class Worker { for (const signal of this.options.shutdownSignals) { process.on(signal, startShutdownSequence); } - return await merge( - this.activityHeartbeat$(), - merge( - this.workflow$(queueName).pipe( - this.workflowOperator(queueName), - mergeMap((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) - ), - this.activity$(queueName).pipe( - this.activityOperator(), - mergeMap((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) + try { + await merge( + this.activityHeartbeat$(), + merge( + this.workflow$(queueName).pipe( + this.workflowOperator(queueName), + mergeMap((arr) => this.nativeWorker.completeWorkflowActivation(arr.buffer.slice(arr.byteOffset))) + ), + this.activity$(queueName).pipe( + this.activityOperator(), + mergeMap((arr) => this.nativeWorker.completeActivityTask(arr.buffer.slice(arr.byteOffset))) + ) + ).pipe( + tap({ + complete: () => { + this.state = 'STOPPED'; + }, + error: () => { + this.state = 'FAILED'; + }, + }) ) - ).pipe( - tap({ - complete: () => { - this.state = 'STOPPED'; - }, - error: () => { - this.state = 'FAILED'; - }, - }) - ) - ).toPromise(); + ).toPromise(); + } finally { + await this.nativeWorker.breakLoop(); + } } }