From 4b777d7bfb736c4fcaf5a58c21bcf33b01e1a426 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 15:17:23 -0800 Subject: [PATCH 1/4] Fix compile problems --- native/Cargo.lock | 28 ++++++++++++++++++++++------ native/src/lib.rs | 6 +++--- native/src/mock_core.rs | 2 +- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 641faadcf..234f31f69 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -659,6 +659,22 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514d24875c140ed269eecc2d1b56d7b71b573716922a763c317fb1b1b4b58f15" +dependencies = [ + "async-trait", + "futures", + "js-sys", + "lazy_static", + "percent-encoding", + "pin-project 1.0.5", + "rand 0.8.3", + "thiserror", +] + [[package]] name = "opentelemetry-jaeger" version = "0.10.0" @@ -667,7 +683,7 @@ checksum = "b4c604a73595f605a852c431ef9c6bbacc7b911f094900905fd2f684b6fc44b4" dependencies = [ "async-trait", "lazy_static", - "opentelemetry", + "opentelemetry 0.11.2", "thiserror", "thrift", ] @@ -1098,6 +1114,7 @@ dependencies = [ "opentelemetry-jaeger", "prost", "prost-types", + "rand 0.8.3", "rustfsm", "thiserror", "tokio", @@ -1260,8 +1277,7 @@ dependencies = [ [[package]] name = "tonic" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ba8f479158947373b6df40cf48f4779bb25c99ca3c661bd95e0ab1963ad8b0e" +source = "git+https://github.com/temporalio/tonic#a2658d6faddaf0aa5e5fc8a0ee0e4244ea53cbca" dependencies = [ "async-stream", "async-trait", @@ -1385,11 +1401,11 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1706e1f42970e09aa0635deb4f4607e8704a4390427d5f0062bf59240338bcc" +checksum = "cccdf13c28f1654fe806838f28c5b9cb23ca4c0eae71450daa489f50e523ceb1" dependencies = [ - "opentelemetry", + "opentelemetry 0.12.0", "tracing", "tracing-core", "tracing-log", diff --git a/native/src/lib.rs b/native/src/lib.rs index 35a971ae1..4ff6494f1 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -15,7 +15,7 @@ use ::temporal_sdk_core::Core; type BoxedWorker = JsBox>>; pub struct Worker { - _queue_name: String, + queue_name: String, core: mock_core::MockCore, condition: Condvar, suspended: Mutex, @@ -51,7 +51,7 @@ impl Worker { let core = mock_core::MockCore { tasks }; Worker { - _queue_name: queue_name, + queue_name, core, condition: Condvar::new(), suspended: Mutex::new(false), @@ -63,7 +63,7 @@ impl Worker { .condition .wait_while(self.suspended.lock().unwrap(), |suspended| *suspended) .unwrap(); - let res = self.core.poll_task(); + let res = self.core.poll_task(&self.queue_name); self.core.tasks.pop_front(); res } diff --git a/native/src/mock_core.rs b/native/src/mock_core.rs index 17217d0e1..9db02ac8c 100644 --- a/native/src/mock_core.rs +++ b/native/src/mock_core.rs @@ -9,7 +9,7 @@ pub struct MockCore { } impl Core for MockCore { - fn poll_task(&self) -> Result { + fn poll_task(&self, _task_q: &str) -> Result { match self.tasks.get(0) { Some(task) => Result::Ok(Task { task_token: b"abc".to_vec(), From 4b2b10a2fa2bf800ce20049821606ae76aa2cdf7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 16:44:01 -0800 Subject: [PATCH 2/4] Prep to allow using real core --- native/src/lib.rs | 41 +++++++++++++++++++++++++---------------- native/src/mock_core.rs | 29 ++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/native/src/lib.rs b/native/src/lib.rs index 4ff6494f1..9bda3a531 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -1,22 +1,31 @@ mod mock_core; -use neon::prelude::*; -use neon::register_module; +use neon::{prelude::*, register_module}; use prost::Message; use prost_types::Timestamp; -use std::sync::{Arc, Condvar, Mutex, RwLock}; -use temporal_sdk_core::protos::coresdk::{ - self, task, wf_activation_job, CompleteTaskReq, StartWorkflowTaskAttributes, - TimerFiredTaskAttributes, WfActivation, +use std::sync::RwLock; +use std::{ + collections::VecDeque, + sync::{Arc, Condvar, Mutex}, + time::SystemTime, +}; +use temporal_sdk_core::{ + protos::coresdk::{ + self, task, wf_activation_job, CompleteTaskReq, StartWorkflowTaskAttributes, + TimerFiredTaskAttributes, WfActivation, + }, + Core, }; -use ::temporal_sdk_core::Core; - +// TODO: In principle this lock is totally unnecessary since worker never needs to mutate itself. +// -- in practice we are forced into it because the jsbox is passed into a new thread, and it +// imposes weird requirements where it can only be Send if also Sync. Can we avoid doing that, or +// otherwise avoid the lock? type BoxedWorker = JsBox>>; pub struct Worker { queue_name: String, - core: mock_core::MockCore, + core: Box, condition: Condvar, suspended: Mutex, } @@ -25,10 +34,10 @@ impl Finalize for Worker {} impl Worker { pub fn new(queue_name: String) -> Self { - let mut tasks = ::std::collections::VecDeque::::new(); + let mut tasks = VecDeque::::new(); tasks.push_back(task::Variant::Workflow(WfActivation { run_id: "test".to_string(), - timestamp: Some(Timestamp::from(::std::time::SystemTime::now())), + timestamp: Some(Timestamp::from(SystemTime::now())), jobs: vec![ wf_activation_job::Attributes::StartWorkflow(StartWorkflowTaskAttributes { arguments: None, @@ -40,7 +49,7 @@ impl Worker { })); tasks.push_back(task::Variant::Workflow(WfActivation { run_id: "test".to_string(), - timestamp: Some(Timestamp::from(::std::time::SystemTime::now())), + timestamp: Some(Timestamp::from(SystemTime::now())), jobs: vec![ wf_activation_job::Attributes::TimerFired(TimerFiredTaskAttributes { timer_id: "0".to_string(), @@ -48,23 +57,22 @@ impl Worker { .into(), ], })); - let core = mock_core::MockCore { tasks }; + let core = mock_core::MockCore::new(); Worker { queue_name, - core, + core: Box::new(core), condition: Condvar::new(), suspended: Mutex::new(false), } } - pub fn poll(&mut self) -> ::temporal_sdk_core::Result { + pub fn poll(&self) -> ::temporal_sdk_core::Result { let _guard = self .condition .wait_while(self.suspended.lock().unwrap(), |suspended| *suspended) .unwrap(); let res = self.core.poll_task(&self.queue_name); - self.core.tasks.pop_front(); res } @@ -94,6 +102,7 @@ fn worker_poll(mut cx: FunctionContext) -> JsResult { let arc_worker = Arc::clone(&**worker); // deref Handle and JsBox let arc_callback = Arc::new(callback); let queue = cx.queue(); + std::thread::spawn(move || loop { let arc_callback = arc_callback.clone(); let arc_worker = arc_worker.clone(); diff --git a/native/src/mock_core.rs b/native/src/mock_core.rs index 9db02ac8c..4fb345db7 100644 --- a/native/src/mock_core.rs +++ b/native/src/mock_core.rs @@ -1,16 +1,35 @@ -use ::temporal_sdk_core::protos::coresdk::{ - complete_task_req, task, wf_activation_completion, CompleteTaskReq, Task, +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, +}; +use temporal_sdk_core::{ + protos::coresdk::{complete_task_req, task, wf_activation_completion, CompleteTaskReq, Task}, + Core, + CoreError::NoWork, + Result, }; -use ::temporal_sdk_core::{Core, CoreError::NoWork, Result}; #[derive(Clone)] pub struct MockCore { - pub tasks: ::std::collections::VecDeque, + tasks: Arc>>, +} + +impl MockCore { + pub fn new() -> Self { + Self { + tasks: Arc::new(RwLock::new(Default::default())), + } + } } impl Core for MockCore { fn poll_task(&self, _task_q: &str) -> Result { - match self.tasks.get(0) { + match self + .tasks + .write() + .expect("Mock queue must be writeable") + .pop_front() + { Some(task) => Result::Ok(Task { task_token: b"abc".to_vec(), variant: Some(task.clone()), From d9c84ff34fc945e3f5d3ca8dbb3a32796bf8e622 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Sat, 6 Feb 2021 13:05:48 -0800 Subject: [PATCH 3/4] A few more updates to handle latest --- native/Cargo.lock | 10 ++++++++++ native/src/lib.rs | 2 +- native/src/mock_core.rs | 10 +++++++--- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 234f31f69..8e46be7dc 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1124,6 +1124,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -1492,6 +1493,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom 0.2.2", +] + [[package]] name = "want" version = "0.3.0" diff --git a/native/src/lib.rs b/native/src/lib.rs index 9bda3a531..c41250111 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -57,7 +57,7 @@ impl Worker { .into(), ], })); - let core = mock_core::MockCore::new(); + let core = mock_core::MockCore::new(tasks); Worker { queue_name, diff --git a/native/src/mock_core.rs b/native/src/mock_core.rs index 4fb345db7..137c32850 100644 --- a/native/src/mock_core.rs +++ b/native/src/mock_core.rs @@ -6,7 +6,7 @@ use temporal_sdk_core::{ protos::coresdk::{complete_task_req, task, wf_activation_completion, CompleteTaskReq, Task}, Core, CoreError::NoWork, - Result, + Result, ServerGatewayApis, }; #[derive(Clone)] @@ -15,9 +15,9 @@ pub struct MockCore { } impl MockCore { - pub fn new() -> Self { + pub fn new(tasks: VecDeque) -> Self { Self { - tasks: Arc::new(RwLock::new(Default::default())), + tasks: Arc::new(RwLock::new(tasks)), } } } @@ -52,4 +52,8 @@ impl Core for MockCore { }; Result::Ok(()) } + + fn server_gateway(&self) -> Result> { + unimplemented!() + } } From e0cd2c22da2c9fbf55e8cd45667d7ce23592e8ca Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Sat, 6 Feb 2021 15:43:29 -0800 Subject: [PATCH 4/4] This works! Runs a complete workflow! :tada: --- native/Cargo.lock | 1 + native/Cargo.toml | 1 + native/src/lib.rs | 83 +++++++++++++++-------------------- native/src/mock_core.rs | 25 +++++++++++ src/worker.ts | 9 ++-- workflow-lib/src/internals.ts | 1 + 6 files changed, 69 insertions(+), 51 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 8e46be7dc..115a55887 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1131,6 +1131,7 @@ dependencies = [ name = "temporal-sdk-node" version = "0.1.0" dependencies = [ + "futures", "neon", "neon-build", "prost", diff --git a/native/Cargo.toml b/native/Cargo.toml index f49683685..93e31b122 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -12,6 +12,7 @@ name = "temporal_sdk_node" crate-type = ["dylib"] [dependencies] +futures = { version = "0.3", features = ["executor"] } neon = { version = "0.7", default-features = false, features = ["napi-4", "event-queue-api"] } prost = "0.7" prost-types = "0.7" diff --git a/native/src/lib.rs b/native/src/lib.rs index c41250111..b8fb831b1 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -2,26 +2,18 @@ mod mock_core; use neon::{prelude::*, register_module}; use prost::Message; -use prost_types::Timestamp; -use std::sync::RwLock; use std::{ - collections::VecDeque, + convert::TryInto, sync::{Arc, Condvar, Mutex}, - time::SystemTime, + time::Duration, }; use temporal_sdk_core::{ - protos::coresdk::{ - self, task, wf_activation_job, CompleteTaskReq, StartWorkflowTaskAttributes, - TimerFiredTaskAttributes, WfActivation, - }, - Core, + init, + protos::coresdk::{self, CompleteTaskReq}, + Core, CoreInitOptions, ServerGatewayOptions, }; -// TODO: In principle this lock is totally unnecessary since worker never needs to mutate itself. -// -- in practice we are forced into it because the jsbox is passed into a new thread, and it -// imposes weird requirements where it can only be Send if also Sync. Can we avoid doing that, or -// otherwise avoid the lock? -type BoxedWorker = JsBox>>; +type BoxedWorker = JsBox>; pub struct Worker { queue_name: String, @@ -34,30 +26,25 @@ impl Finalize for Worker {} impl Worker { pub fn new(queue_name: String) -> Self { - let mut tasks = VecDeque::::new(); - tasks.push_back(task::Variant::Workflow(WfActivation { - run_id: "test".to_string(), - timestamp: Some(Timestamp::from(SystemTime::now())), - jobs: vec![ - wf_activation_job::Attributes::StartWorkflow(StartWorkflowTaskAttributes { - arguments: None, - workflow_type: "set-timeout".to_string(), - workflow_id: "test".to_string(), - }) - .into(), - ], - })); - tasks.push_back(task::Variant::Workflow(WfActivation { - run_id: "test".to_string(), - timestamp: Some(Timestamp::from(SystemTime::now())), - jobs: vec![ - wf_activation_job::Attributes::TimerFired(TimerFiredTaskAttributes { - timer_id: "0".to_string(), - }) - .into(), - ], - })); - let core = mock_core::MockCore::new(tasks); + let core = init(CoreInitOptions { + gateway_opts: ServerGatewayOptions { + target_url: "http://localhost:7233".try_into().unwrap(), + namespace: "default".to_string(), + identity: "node_sdk_test".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(30), + }, + }) + .unwrap(); + + // TODO: Needs to be moved to it's own function, and async handled better somehow + futures::executor::block_on(core.server_gateway().unwrap().start_workflow( + "default", + &queue_name, + "test-node-wf-id", + "set-timeout", + )) + .unwrap(); Worker { queue_name, @@ -82,16 +69,18 @@ impl Worker { pub fn suspend_polling(&self) { *self.suspended.lock().unwrap() = true; + self.condition.notify_one(); } pub fn resume_polling(&self) { *self.suspended.lock().unwrap() = false; + self.condition.notify_one(); } } fn worker_new(mut cx: FunctionContext) -> JsResult { let queue_name = cx.argument::(0)?.value(&mut cx); - let worker = Arc::new(RwLock::new(Worker::new(queue_name))); + let worker = Arc::new(Worker::new(queue_name)); Ok(cx.boxed(worker)) } @@ -106,8 +95,10 @@ fn worker_poll(mut cx: FunctionContext) -> JsResult { std::thread::spawn(move || loop { let arc_callback = arc_callback.clone(); let arc_worker = arc_worker.clone(); - let worker = &mut arc_worker.write().unwrap(); + let worker = arc_worker; let result = worker.poll(); + // We don't want to poll until re-awoken + worker.suspend_polling(); match result { Ok(task) => { queue.send(move |mut cx| { @@ -156,9 +147,8 @@ fn worker_complete_task(mut cx: FunctionContext) -> JsResult { }); match result { Ok(completion) => { - let w = &mut worker.read().unwrap(); // TODO: submit from background thread (using neon::Task)? - if let Err(err) = w.core.complete_task(completion) { + if let Err(err) = worker.core.complete_task(completion) { let error = JsError::error(&mut cx, format!("{}", err))?; cx.throw(error) } else { @@ -178,22 +168,19 @@ fn worker_shutdown(mut cx: FunctionContext) -> JsResult { fn worker_suspend_polling(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; - let w = &mut worker.write().unwrap(); - w.suspend_polling(); + worker.suspend_polling(); Ok(cx.undefined()) } fn worker_resume_polling(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; - let w = &mut worker.write().unwrap(); - w.resume_polling(); + worker.resume_polling(); Ok(cx.undefined()) } fn worker_is_suspended(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; - let w = &mut worker.read().unwrap(); - Ok(cx.boolean(w.is_suspended())) + Ok(cx.boolean(worker.is_suspended())) } register_module!(mut cx, { diff --git a/native/src/mock_core.rs b/native/src/mock_core.rs index 137c32850..4e957c9a2 100644 --- a/native/src/mock_core.rs +++ b/native/src/mock_core.rs @@ -57,3 +57,28 @@ impl Core for MockCore { unimplemented!() } } +// TODO: Left here to be stuffed in a fake poll builder or something +// let mut tasks = VecDeque::::new(); +// tasks.push_back(task::Variant::Workflow(WfActivation { +// run_id: "test".to_string(), +// timestamp: Some(Timestamp::from(SystemTime::now())), +// jobs: vec![ +// wf_activation_job::Attributes::StartWorkflow(StartWorkflowTaskAttributes { +// arguments: None, +// workflow_type: "set-timeout".to_string(), +// workflow_id: "test".to_string(), +// }) +// .into(), +// ], +// })); +// tasks.push_back(task::Variant::Workflow(WfActivation { +// run_id: "test".to_string(), +// timestamp: Some(Timestamp::from(SystemTime::now())), +// jobs: vec![ +// wf_activation_job::Attributes::TimerFired(TimerFiredTaskAttributes { +// timer_id: "0".to_string(), +// }) +// .into(), +// ], +// })); +// let core = mock_core::MockCore::new(tasks); diff --git a/src/worker.ts b/src/worker.ts index fb0118c66..efc4926da 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -57,7 +57,7 @@ export function getDefaultOptions(dirname: string): WorkerOptionsWithDefaults { activitiesPath: resolve(dirname, '../activities'), workflowsPath: resolve(dirname, '../workflows'), }; -}; +} export class Worker { public readonly options: WorkerOptionsWithDefaults; @@ -146,12 +146,12 @@ export class Worker { mergeScan(async (workflow: Workflow | undefined, task) => { if (workflow === undefined) { // Find a workflow start job in the activation jobs list - // TODO: should this alway be the first job in the list? + // TODO: should this always be the first job in the list? const maybeStartWorkflow = task.workflow.jobs.find(j => j.startWorkflow); if (maybeStartWorkflow !== undefined) { const attrs = maybeStartWorkflow.startWorkflow; if (!(attrs && attrs.workflowId && attrs.workflowType)) { - throw new Error('Expected StartWorkflow with workflowId and name'); + throw new Error(`Expected StartWorkflow with workflowId and workflowType, got ${JSON.stringify(maybeStartWorkflow)}`); } workflow = await Workflow.create(attrs.workflowId); // TODO: this probably shouldn't be here, consider alternative implementation @@ -162,8 +162,11 @@ export class Worker { throw new Error('Received workflow activation for an untracked workflow with no start workflow job'); } } + console.log(`!!!!! Trying to complete task: ${task.taskToken}`) const arr = await workflow.activate(task.taskToken!, task.workflow); workerCompleteTask(native, arr.buffer.slice(arr.byteOffset)); + // Allow polling to continue + this.resumePolling(); return workflow; }, undefined, 1 /* concurrency */)) }) diff --git a/workflow-lib/src/internals.ts b/workflow-lib/src/internals.ts index b69eed0f0..bfc644691 100644 --- a/workflow-lib/src/internals.ts +++ b/workflow-lib/src/internals.ts @@ -45,6 +45,7 @@ export type WorkflowTaskHandler = { }; function completeWorkflow(result: any) { + console.log("!!!!! Trying to complete workflow"); state.commands.push({ api: { commandType: iface.temporal.api.enums.v1.CommandType.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,