Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ name = "temporal_sdk_node"
crate-type = ["dylib"]

[dependencies]
futures = { version = "0.3", features = ["executor"] }
neon = { version = "0.7", default-features = false, features = ["napi-4", "event-queue-api"] }
prost = "0.7"
prost-types = "0.7"
Expand Down
98 changes: 47 additions & 51 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
mod mock_core;

use neon::prelude::*;
use neon::register_module;
use neon::{prelude::*, register_module};
use prost::Message;
use prost_types::Timestamp;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use temporal_sdk_core::protos::coresdk::{
self, task, wf_activation_job, CompleteTaskReq, StartWorkflowTaskAttributes,
TimerFiredTaskAttributes, WfActivation,
use std::{
convert::TryInto,
sync::{Arc, Condvar, Mutex},
time::Duration,
};
use temporal_sdk_core::{
init,
protos::coresdk::{self, CompleteTaskReq},
Core, CoreInitOptions, ServerGatewayOptions,
};

use ::temporal_sdk_core::Core;

type BoxedWorker = JsBox<Arc<RwLock<Worker>>>;
type BoxedWorker = JsBox<Arc<Worker>>;

pub struct Worker {
_queue_name: String,
core: mock_core::MockCore,
queue_name: String,
core: Box<dyn Core + Send + Sync>,
condition: Condvar,
suspended: Mutex<bool>,
}
Expand All @@ -25,46 +26,40 @@ impl Finalize for Worker {}

impl Worker {
pub fn new(queue_name: String) -> Self {
let mut tasks = ::std::collections::VecDeque::<task::Variant>::new();
tasks.push_back(task::Variant::Workflow(WfActivation {
run_id: "test".to_string(),
timestamp: Some(Timestamp::from(::std::time::SystemTime::now())),
jobs: vec![
wf_activation_job::Attributes::StartWorkflow(StartWorkflowTaskAttributes {
arguments: None,
workflow_type: "set-timeout".to_string(),
workflow_id: "test".to_string(),
})
.into(),
],
}));
tasks.push_back(task::Variant::Workflow(WfActivation {
run_id: "test".to_string(),
timestamp: Some(Timestamp::from(::std::time::SystemTime::now())),
jobs: vec![
wf_activation_job::Attributes::TimerFired(TimerFiredTaskAttributes {
timer_id: "0".to_string(),
})
.into(),
],
}));
let core = mock_core::MockCore { tasks };
let core = init(CoreInitOptions {
gateway_opts: ServerGatewayOptions {
target_url: "http://localhost:7233".try_into().unwrap(),
namespace: "default".to_string(),
identity: "node_sdk_test".to_string(),
worker_binary_id: "".to_string(),
long_poll_timeout: Duration::from_secs(30),
},
})
.unwrap();

// TODO: Needs to be moved to it's own function, and async handled better somehow
futures::executor::block_on(core.server_gateway().unwrap().start_workflow(
"default",
&queue_name,
"test-node-wf-id",
"set-timeout",
))
.unwrap();

Worker {
_queue_name: queue_name,
core,
queue_name,
core: Box::new(core),
condition: Condvar::new(),
suspended: Mutex::new(false),
}
}

pub fn poll(&mut self) -> ::temporal_sdk_core::Result<coresdk::Task> {
pub fn poll(&self) -> ::temporal_sdk_core::Result<coresdk::Task> {
let _guard = self
.condition
.wait_while(self.suspended.lock().unwrap(), |suspended| *suspended)
.unwrap();
let res = self.core.poll_task();
self.core.tasks.pop_front();
let res = self.core.poll_task(&self.queue_name);
res
}

Expand All @@ -74,16 +69,18 @@ impl Worker {

pub fn suspend_polling(&self) {
*self.suspended.lock().unwrap() = true;
self.condition.notify_one();
}

pub fn resume_polling(&self) {
*self.suspended.lock().unwrap() = false;
self.condition.notify_one();
}
}

fn worker_new(mut cx: FunctionContext) -> JsResult<BoxedWorker> {
let queue_name = cx.argument::<JsString>(0)?.value(&mut cx);
let worker = Arc::new(RwLock::new(Worker::new(queue_name)));
let worker = Arc::new(Worker::new(queue_name));

Ok(cx.boxed(worker))
}
Expand All @@ -94,11 +91,14 @@ fn worker_poll(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let arc_worker = Arc::clone(&**worker); // deref Handle and JsBox
let arc_callback = Arc::new(callback);
let queue = cx.queue();

std::thread::spawn(move || loop {
let arc_callback = arc_callback.clone();
let arc_worker = arc_worker.clone();
let worker = &mut arc_worker.write().unwrap();
let worker = arc_worker;
let result = worker.poll();
// We don't want to poll until re-awoken
worker.suspend_polling();
match result {
Ok(task) => {
queue.send(move |mut cx| {
Expand Down Expand Up @@ -147,9 +147,8 @@ fn worker_complete_task(mut cx: FunctionContext) -> JsResult<JsUndefined> {
});
match result {
Ok(completion) => {
let w = &mut worker.read().unwrap();
// TODO: submit from background thread (using neon::Task)?
if let Err(err) = w.core.complete_task(completion) {
if let Err(err) = worker.core.complete_task(completion) {
let error = JsError::error(&mut cx, format!("{}", err))?;
cx.throw(error)
} else {
Expand All @@ -169,22 +168,19 @@ fn worker_shutdown(mut cx: FunctionContext) -> JsResult<JsUndefined> {

fn worker_suspend_polling(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let worker = cx.argument::<BoxedWorker>(0)?;
let w = &mut worker.write().unwrap();
w.suspend_polling();
worker.suspend_polling();
Ok(cx.undefined())
}

fn worker_resume_polling(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let worker = cx.argument::<BoxedWorker>(0)?;
let w = &mut worker.write().unwrap();
w.resume_polling();
worker.resume_polling();
Ok(cx.undefined())
}

fn worker_is_suspended(mut cx: FunctionContext) -> JsResult<JsBoolean> {
let worker = cx.argument::<BoxedWorker>(0)?;
let w = &mut worker.read().unwrap();
Ok(cx.boolean(w.is_suspended()))
Ok(cx.boolean(worker.is_suspended()))
}

register_module!(mut cx, {
Expand Down
60 changes: 54 additions & 6 deletions native/src/mock_core.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
use ::temporal_sdk_core::protos::coresdk::{
complete_task_req, task, wf_activation_completion, CompleteTaskReq, Task,
use std::{
collections::VecDeque,
sync::{Arc, RwLock},
};
use temporal_sdk_core::{
protos::coresdk::{complete_task_req, task, wf_activation_completion, CompleteTaskReq, Task},
Core,
CoreError::NoWork,
Result, ServerGatewayApis,
};
use ::temporal_sdk_core::{Core, CoreError::NoWork, Result};

#[derive(Clone)]
pub struct MockCore {
pub tasks: ::std::collections::VecDeque<task::Variant>,
tasks: Arc<RwLock<VecDeque<task::Variant>>>,
}

impl MockCore {
pub fn new(tasks: VecDeque<task::Variant>) -> Self {
Self {
tasks: Arc::new(RwLock::new(tasks)),
}
}
}

impl Core for MockCore {
fn poll_task(&self) -> Result<Task> {
match self.tasks.get(0) {
fn poll_task(&self, _task_q: &str) -> Result<Task> {
match self
.tasks
.write()
.expect("Mock queue must be writeable")
.pop_front()
{
Some(task) => Result::Ok(Task {
task_token: b"abc".to_vec(),
variant: Some(task.clone()),
Expand All @@ -33,4 +52,33 @@ impl Core for MockCore {
};
Result::Ok(())
}

fn server_gateway(&self) -> Result<Arc<dyn ServerGatewayApis>> {
unimplemented!()
}
}
// TODO: Left here to be stuffed in a fake poll builder or something
// let mut tasks = VecDeque::<task::Variant>::new();
// tasks.push_back(task::Variant::Workflow(WfActivation {
// run_id: "test".to_string(),
// timestamp: Some(Timestamp::from(SystemTime::now())),
// jobs: vec![
// wf_activation_job::Attributes::StartWorkflow(StartWorkflowTaskAttributes {
// arguments: None,
// workflow_type: "set-timeout".to_string(),
// workflow_id: "test".to_string(),
// })
// .into(),
// ],
// }));
// tasks.push_back(task::Variant::Workflow(WfActivation {
// run_id: "test".to_string(),
// timestamp: Some(Timestamp::from(SystemTime::now())),
// jobs: vec![
// wf_activation_job::Attributes::TimerFired(TimerFiredTaskAttributes {
// timer_id: "0".to_string(),
// })
// .into(),
// ],
// }));
// let core = mock_core::MockCore::new(tasks);
9 changes: 6 additions & 3 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function getDefaultOptions(dirname: string): WorkerOptionsWithDefaults {
activitiesPath: resolve(dirname, '../activities'),
workflowsPath: resolve(dirname, '../workflows'),
};
};
}

export class Worker {
public readonly options: WorkerOptionsWithDefaults;
Expand Down Expand Up @@ -146,12 +146,12 @@ export class Worker {
mergeScan(async (workflow: Workflow | undefined, task) => {
if (workflow === undefined) {
// Find a workflow start job in the activation jobs list
// TODO: should this alway be the first job in the list?
// TODO: should this always be the first job in the list?
const maybeStartWorkflow = task.workflow.jobs.find(j => j.startWorkflow);
if (maybeStartWorkflow !== undefined) {
const attrs = maybeStartWorkflow.startWorkflow;
if (!(attrs && attrs.workflowId && attrs.workflowType)) {
throw new Error('Expected StartWorkflow with workflowId and name');
throw new Error(`Expected StartWorkflow with workflowId and workflowType, got ${JSON.stringify(maybeStartWorkflow)}`);
}
workflow = await Workflow.create(attrs.workflowId);
// TODO: this probably shouldn't be here, consider alternative implementation
Expand All @@ -162,8 +162,11 @@ export class Worker {
throw new Error('Received workflow activation for an untracked workflow with no start workflow job');
}
}
console.log(`!!!!! Trying to complete task: ${task.taskToken}`)
const arr = await workflow.activate(task.taskToken!, task.workflow);
workerCompleteTask(native, arr.buffer.slice(arr.byteOffset));
// Allow polling to continue
this.resumePolling();
return workflow;
}, undefined, 1 /* concurrency */))
})
Expand Down
Loading