From 4ab390c2015e014f8d05d7fd132056146675be0d Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Thu, 4 Mar 2021 14:37:45 +0200 Subject: [PATCH 1/4] Implement graceful worker shutdown --- packages/test/src/helpers.ts | 8 + packages/test/src/run-a-worker.ts | 15 ++ packages/test/src/test-integration.ts | 12 +- packages/test/src/test-worker-shutdown.ts | 34 +++++ packages/worker/native/index.d.ts | 4 +- packages/worker/native/src/lib.rs | 17 +-- packages/worker/src/index.ts | 2 +- packages/worker/src/worker.ts | 177 ++++++++++++++++------ 8 files changed, 201 insertions(+), 68 deletions(-) create mode 100644 packages/test/src/run-a-worker.ts create mode 100644 packages/test/src/test-worker-shutdown.ts diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index adec6997e..8d0cf4acf 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -4,3 +4,11 @@ export function u8(s: string): Uint8Array { // @ts-ignore return new TextEncoder().encode(s); } + +export function isSet(env: string | undefined): boolean { + if (env === undefined) return false; + env = env.toLocaleLowerCase(); + return env === '1' || env === 't' || env === 'true'; +} + +export const RUN_INTEGRATION_TESTS = isSet(process.env.RUN_INTEGRATION_TESTS); diff --git a/packages/test/src/run-a-worker.ts b/packages/test/src/run-a-worker.ts new file mode 100644 index 000000000..7c7425f59 --- /dev/null +++ b/packages/test/src/run-a-worker.ts @@ -0,0 +1,15 @@ +import { Worker } from '@temporalio/worker'; + +async function main() { + const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` }); + await worker.run('test'); + console.log('Worker gracefully shutdown'); +} + +main().then( + () => void process.exit(0), + (err) => { + console.error(err); + process.exit(1); + } +); diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index 94f4b1326..a42a2ae92 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -2,11 +2,11 @@ import test from 'ava'; import { v4 as uuid4 } from 'uuid'; import { Connection, compileWorkflowOptions, addDefaults } from '@temporalio/client'; -import { Worker } from '@temporalio/worker/lib/worker'; +import { Worker } from '@temporalio/worker'; import { ArgsAndReturn } from '../../test-interfaces/lib'; import * as iface from '@temporalio/proto'; import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/data-converter'; -import { u8 } from './helpers'; +import { u8, RUN_INTEGRATION_TESTS } from './helpers'; import { tsToMs } from '@temporalio/workflow/commonjs/time'; const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` }); @@ -19,13 +19,7 @@ const { const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); -function isSet(env: string | undefined) { - if (env === undefined) return false; - env = env.toLocaleLowerCase(); - return env === '1' || env === 't' || env === 'true'; -} - -if (isSet(process.env.RUN_INTEGRATION_TESTS)) { +if (RUN_INTEGRATION_TESTS) { test.before((t) => { worker.run('test').catch((err) => { t.fail(`Failed to run worker: ${err}`); diff --git a/packages/test/src/test-worker-shutdown.ts b/packages/test/src/test-worker-shutdown.ts new file mode 100644 index 000000000..64612ae27 --- /dev/null +++ b/packages/test/src/test-worker-shutdown.ts @@ -0,0 +1,34 @@ +import test from 'ava'; +import { Worker } from '@temporalio/worker'; +import { RUN_INTEGRATION_TESTS } from './helpers'; + +if (RUN_INTEGRATION_TESTS) { + test.serial('run shuts down gracefully', async (t) => { + const worker = new Worker(__dirname, { shutdownGraceTime: '500ms' }); + t.is(worker.getState(), 'INITIALIZED'); + const p = worker.run('shutdown-test'); + t.is(worker.getState(), 'RUNNING'); + process.emit('SIGINT', 'SIGINT'); + t.is(worker.getState(), 'STOPPING'); + await p; + t.is(worker.getState(), 'STOPPED'); + await t.throwsAsync(worker.run('shutdown-test'), { message: 'Poller was aleady started' }); + }); + + test.serial('run throws if not shut down gracefully', async (t) => { + const worker = new Worker(__dirname, { shutdownGraceTime: '5ms' }); + t.is(worker.getState(), 'INITIALIZED'); + const p = worker.run('shutdown-test'); + t.is(worker.getState(), 'RUNNING'); + (worker as any).shutdown = function () { + // Pretend we're shutting down + this.state = 'STOPPING'; + }; + process.emit('SIGINT', 'SIGINT'); + await t.throwsAsync(p, { + message: 'Timed out waiting while waiting for worker to shutdown gracefully', + }); + t.is(worker.getState(), 'FAILED'); + await t.throwsAsync(worker.run('shutdown-test'), { message: 'Poller was aleady started' }); + }); +} diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 76190396c..29c9e064c 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -1,9 +1,9 @@ export interface Worker {} export declare type PollCallback = (err?: Error, result?: ArrayBuffer) => void; -export declare function newWorker(queueName: string): Worker; +export declare function newWorker(): Worker; export declare function workerShutdown(worker: Worker): void; -export declare function workerPoll(worker: Worker, callback: PollCallback): void; +export declare function workerPoll(worker: Worker, queueName: string, callback: PollCallback): void; export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): boolean; export declare function workerSuspendPolling(worker: Worker): void; export declare function workerResumePolling(worker: Worker): void; diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 402100722..691be17a6 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -14,7 +14,6 @@ use temporal_sdk_core::{ type BoxedWorker = JsBox>; pub struct Worker { - queue_name: String, core: Box, condition: Condvar, suspended: Mutex, @@ -23,7 +22,7 @@ pub struct Worker { impl Finalize for Worker {} impl Worker { - pub fn new(queue_name: String) -> Self { + pub fn new() -> Self { let core = init(CoreInitOptions { gateway_opts: ServerGatewayOptions { target_url: "http://localhost:7233".try_into().unwrap(), @@ -36,19 +35,18 @@ impl Worker { .unwrap(); Worker { - queue_name, core: Box::new(core), condition: Condvar::new(), suspended: Mutex::new(false), } } - pub fn poll(&self) -> ::temporal_sdk_core::Result { + pub fn poll(&self, queue_name: String) -> ::temporal_sdk_core::Result { let _guard = self .condition .wait_while(self.suspended.lock().unwrap(), |suspended| *suspended) .unwrap(); - self.core.poll_task(&self.queue_name) + self.core.poll_task(&queue_name) } pub fn is_suspended(&self) -> bool { @@ -67,15 +65,15 @@ impl Worker { } fn worker_new(mut cx: FunctionContext) -> JsResult { - let queue_name = cx.argument::(0)?.value(&mut cx); - let worker = Arc::new(Worker::new(queue_name)); + let worker = Arc::new(Worker::new()); Ok(cx.boxed(worker)) } fn worker_poll(mut cx: FunctionContext) -> JsResult { let worker = cx.argument::(0)?; - let callback = cx.argument::(1)?.root(&mut cx); + let queue_name = cx.argument::(1)?.value(&mut cx); + let callback = cx.argument::(2)?.root(&mut cx); let arc_worker = Arc::clone(&**worker); // deref Handle and JsBox let arc_callback = Arc::new(callback); let queue = cx.queue(); @@ -83,8 +81,9 @@ fn worker_poll(mut cx: FunctionContext) -> JsResult { std::thread::spawn(move || loop { let arc_callback = arc_callback.clone(); let arc_worker = arc_worker.clone(); + let queue_name = queue_name.clone(); let worker = arc_worker; - let result = worker.poll(); + let result = worker.poll(queue_name); match result { Ok(task) => { queue.send(move |mut cx| { diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 50e708d26..d7470132a 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -1 +1 @@ -export { Worker } from './worker'; +export { Worker, WorkerOptions } from './worker'; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 31911920c..4c8d505ef 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1,6 +1,7 @@ import { resolve } from 'path'; import { Observable, partition } from 'rxjs'; -import { groupBy, mapTo, mergeMap } from 'rxjs/operators'; +import { groupBy, mapTo, mergeMap, share, tap } from 'rxjs/operators'; +import ms from 'ms'; import { coresdk } from '@temporalio/proto'; import { newWorker, @@ -10,8 +11,8 @@ import { workerResumePolling, workerSuspendPolling, workerCompleteTask, - Worker as NativeWorker, } from '../native'; +import { sleep } from './utils'; import { mergeMapWithState } from './rxutils'; import { Workflow } from './workflow'; import { resolveFilename, LoaderError } from './loader'; @@ -32,6 +33,19 @@ export interface WorkerOptions { * pass `null` to manually register workflows */ workflowsPath?: string | null; + /** + * Time to wait for pending tasks to drain after receiving a shutdown signal. + * @see {@link shutdownSignals} + * + * @format ms formatted string + */ + shutdownGraceTime?: string; + + /** + * Automatically shut down worker on any of these signals. + * @default ['SIGINT', 'SIGTERM', 'SIGQUIT'] + */ + shutdownSignals?: NodeJS.Signals[]; // TODO: implement all of these maxConcurrentActivityExecutions?: number; // defaults to 200 @@ -42,7 +56,12 @@ export interface WorkerOptions { isLocalActivityWorkerOnly?: boolean; // defaults to false } -export type WorkerOptionsWithDefaults = Required>; +export type WorkerOptionsWithDefaults = WorkerOptions & + Required>; + +export interface CompiledWorkerOptionsWithDefaults extends WorkerOptionsWithDefaults { + shutdownGraceTimeMs: number; +} export const resolver = (baseDir: string | null, overrides: Map) => async ( lookupName: string @@ -60,47 +79,50 @@ export function getDefaultOptions(dirname: string): WorkerOptionsWithDefaults { return { activitiesPath: resolve(dirname, '../activities'), workflowsPath: resolve(dirname, '../workflows'), + shutdownGraceTime: '5s', + shutdownSignals: ['SIGINT', 'SIGTERM', 'SIGQUIT'], }; } +export function compileWorkerOptions(opts: WorkerOptionsWithDefaults): CompiledWorkerOptionsWithDefaults { + return { ...opts, shutdownGraceTimeMs: ms(opts.shutdownGraceTime) }; +} + +export type State = 'INITIALIZED' | 'RUNNING' | 'STOPPED' | 'STOPPING' | 'FAILED'; + +type TaskForWorkflow = Required<{ taskToken: Uint8Array; workflow: coresdk.WFActivation }>; +type TaskForActivity = Required<{ taskToken: Uint8Array; workflow: coresdk.ActivityTask }>; + export class Worker { - public readonly options: WorkerOptionsWithDefaults; + public readonly options: CompiledWorkerOptionsWithDefaults; protected readonly workflowOverrides: Map = new Map(); - nativeWorker?: NativeWorker; + nativeWorker = newWorker(); + _state: State = 'INITIALIZED'; /** * Create a new `Worker`, `pwd` is used to resolve relative paths for locating and importing activities and workflows. */ constructor(public readonly pwd: string, options?: WorkerOptions) { // TODO: merge activityDefaults - this.options = { ...getDefaultOptions(pwd), ...options }; + this.options = compileWorkerOptions({ ...getDefaultOptions(pwd), ...options }); } /** - * Do not make new poll requests. + * Get the poll state of this worker */ - public suspendPolling(): void { - if (this.nativeWorker === undefined) { - throw new Error('Not running'); - } - workerSuspendPolling(this.nativeWorker); + public getState(): State { + // Setters and getters require the same visibility, add this public getter function + return this._state; } - /** - * Allow new poll requests. - */ - public resumePolling(): void { - if (this.nativeWorker === undefined) { - throw new Error('Not running'); - } - workerResumePolling(this.nativeWorker); + get state(): State { + return this._state; } - public isSuspended(): boolean { - if (this.nativeWorker === undefined) { - throw new Error('Not running'); - } - return workerIsSuspended(this.nativeWorker); + set state(state: State) { + // TODO: use logger + console.log('Worker state changed', { state }); + this._state = state; } /** @@ -121,38 +143,99 @@ export class Worker { // Not implemented yet } + /** + * Do not make new poll requests. + */ + public suspendPolling(): void { + if (this.state !== 'RUNNING') { + throw new Error('Not running'); + } + workerSuspendPolling(this.nativeWorker); + } + + /** + * Allow new poll requests. + */ + public resumePolling(): void { + if (this.state !== 'RUNNING') { + throw new Error('Not running'); + } + workerResumePolling(this.nativeWorker); + } + + public isSuspended(): boolean { + if (this.state !== 'RUNNING') { + throw new Error('Not running'); + } + return workerIsSuspended(this.nativeWorker); + } + shutdown(): void { - if (this.nativeWorker === undefined) { + if (this.state !== 'RUNNING') { throw new Error('Not running'); } + this.state = 'STOPPING'; workerShutdown(this.nativeWorker); } - async run(queueName: string): Promise { - const native = newWorker(queueName); - this.nativeWorker = native; - const poller$ = new Observable((subscriber) => { - workerPoll(native, (err, buffer) => { - // TODO: this shouldn't happen in the non-mocked version - if (err && err.message === 'No tasks to perform for now') { - subscriber.complete(); - return; + protected poller$(queueName: string): Observable { + if (this.state !== 'INITIALIZED') { + throw new Error('Poller was aleady started'); + } + return new Observable((subscriber) => { + const startShutdownSequence = async (): Promise => { + deregisterSignalHandlers(); + this.shutdown(); + await sleep(this.options.shutdownGraceTimeMs); + if (!subscriber.closed) { + subscriber.error(new Error('Timed out waiting while waiting for worker to shutdown gracefully')); + } + }; + const deregisterSignalHandlers = () => { + for (const signal of this.options.shutdownSignals) { + process.off(signal, startShutdownSequence); } - if (buffer === undefined) { + }; + for (const signal of this.options.shutdownSignals) { + process.once(signal, startShutdownSequence); + } + + this.state = 'RUNNING'; + + workerPoll(this.nativeWorker, queueName, (err, buffer) => { + if (err && err.message.includes('[Core::shutdown]')) { + subscriber.complete(); + } else if (buffer === undefined) { subscriber.error(err); - return; + } else { + const task = coresdk.Task.decode(new Uint8Array(buffer)); + subscriber.next(task); } - const task = coresdk.Task.decode(new Uint8Array(buffer)); - subscriber.next(task); - return () => undefined; // TODO: shutdown worker if no subscribers }); + + return function unsubscribe() { + // NOTE: We don't expose this observable directly so we don't have to shutdown here + deregisterSignalHandlers(); + }; }); - type TaskForWorkflow = Required<{ taskToken: Uint8Array; workflow: coresdk.WFActivation }>; - type TaskForActivity = Required<{ taskToken: Uint8Array; workflow: coresdk.ActivityTask }>; - const [workflow$] = (partition(poller$, (task) => task.variant === 'workflow') as any) as [ - Observable, - Observable - ]; + } + + async run(queueName: string): Promise { + const [workflow$] = (partition( + this.poller$(queueName).pipe( + tap( + () => undefined, + () => { + this.state = 'FAILED'; + }, + () => { + this.state = 'STOPPED'; + } + ), + share() + ), + (task) => task.variant === 'workflow' + ) as any) as [Observable, Observable]; return await workflow$ .pipe( @@ -187,7 +270,7 @@ export class Worker { } const arr = await workflow.activate(task.taskToken, task.workflow); - workerCompleteTask(native, arr.buffer.slice(arr.byteOffset)); + workerCompleteTask(this.nativeWorker, arr.buffer.slice(arr.byteOffset)); return { state: workflow, output: arr }; }, undefined) ); From 2a3a441cd5e95fc1f3a081eccddf00f58072d98c Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Thu, 4 Mar 2021 14:53:32 +0200 Subject: [PATCH 2/4] Organize worker code --- packages/worker/native/index.d.ts | 2 +- packages/worker/src/worker.ts | 95 +++++++++++++++++-------------- 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 29c9e064c..3c4135aaa 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -4,7 +4,7 @@ export declare type PollCallback = (err?: Error, result?: ArrayBuffer) => void; export declare function newWorker(): Worker; export declare function workerShutdown(worker: Worker): void; export declare function workerPoll(worker: Worker, queueName: string, callback: PollCallback): void; -export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): boolean; +export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): void; export declare function workerSuspendPolling(worker: Worker): void; export declare function workerResumePolling(worker: Worker): void; export declare function workerIsSuspended(worker: Worker): boolean; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 4c8d505ef..2bfcdbd22 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1,6 +1,6 @@ import { resolve } from 'path'; -import { Observable, partition } from 'rxjs'; -import { groupBy, mapTo, mergeMap, share, tap } from 'rxjs/operators'; +import { merge, Observable, OperatorFunction, partition, pipe } from 'rxjs'; +import { groupBy, map, mapTo, mergeMap, share, tap } from 'rxjs/operators'; import ms from 'ms'; import { coresdk } from '@temporalio/proto'; import { @@ -220,8 +220,51 @@ export class Worker { }); } + activityOperator(): OperatorFunction { + // TODO: implement this + return mapTo(new Uint8Array()); + } + + workflowOperator(): OperatorFunction { + return pipe( + groupBy((task) => task.workflow.runId), + mergeMap((group$) => { + return group$.pipe( + mergeMapWithState(async (workflow: Workflow | undefined, task) => { + if (workflow === undefined) { + // Find a workflow start job in the activation jobs list + // TODO: should this always be the first job in the list? + const maybeStartWorkflow = task.workflow.jobs.find((j) => j.startWorkflow); + if (maybeStartWorkflow !== undefined) { + const attrs = maybeStartWorkflow.startWorkflow; + if (!(attrs && attrs.workflowId && attrs.workflowType)) { + throw new Error( + `Expected StartWorkflow with workflowId and workflowType, got ${JSON.stringify(maybeStartWorkflow)}` + ); + } + workflow = await Workflow.create(attrs.workflowId); + // TODO: this probably shouldn't be here, consider alternative implementation + await workflow.inject('console.log', console.log); + const scriptName = await resolver( + this.options.workflowsPath, + this.workflowOverrides + )(attrs.workflowType); + await workflow.registerImplementation(scriptName); + } else { + throw new Error('Received workflow activation for an untracked workflow with no start workflow job'); + } + } + + const arr = await workflow.activate(task.taskToken, task.workflow); + return { state: workflow, output: arr }; + }, undefined) + ); + }) + ); + } + async run(queueName: string): Promise { - const [workflow$] = (partition( + const partitioned$ = partition( this.poller$(queueName).pipe( tap( () => undefined, @@ -235,48 +278,12 @@ export class Worker { share() ), (task) => task.variant === 'workflow' - ) as any) as [Observable, Observable]; - - return await workflow$ - .pipe( - groupBy((task) => task.workflow.runId), - mergeMap((group$) => { - return group$.pipe( - mergeMapWithState(async (workflow: Workflow | undefined, task) => { - if (workflow === undefined) { - // Find a workflow start job in the activation jobs list - // TODO: should this always be the first job in the list? - const maybeStartWorkflow = task.workflow.jobs.find((j) => j.startWorkflow); - if (maybeStartWorkflow !== undefined) { - const attrs = maybeStartWorkflow.startWorkflow; - if (!(attrs && attrs.workflowId && attrs.workflowType)) { - throw new Error( - `Expected StartWorkflow with workflowId and workflowType, got ${JSON.stringify( - maybeStartWorkflow - )}` - ); - } - workflow = await Workflow.create(attrs.workflowId); - // TODO: this probably shouldn't be here, consider alternative implementation - await workflow.inject('console.log', console.log); - const scriptName = await resolver( - this.options.workflowsPath, - this.workflowOverrides - )(attrs.workflowType); - await workflow.registerImplementation(scriptName); - } else { - throw new Error('Received workflow activation for an untracked workflow with no start workflow job'); - } - } + ); + // Need to cast to any in order to assign the correct types a partition returns an Observable + const [workflow$, activity$] = (partitioned$ as any) as [Observable, Observable]; - const arr = await workflow.activate(task.taskToken, task.workflow); - workerCompleteTask(this.nativeWorker, arr.buffer.slice(arr.byteOffset)); - return { state: workflow, output: arr }; - }, undefined) - ); - }), - mapTo(undefined) - ) + return await merge(workflow$.pipe(this.workflowOperator()), activity$.pipe(this.activityOperator())) + .pipe(map((arr) => workerCompleteTask(this.nativeWorker, arr.buffer.slice(arr.byteOffset)))) .toPromise(); } } From 35176a7d82e104c2cc0b1c9fac9c99dac29b6aad Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 9 Mar 2021 09:46:48 +0200 Subject: [PATCH 3/4] Update core --- packages/worker/native/Cargo.lock | 121 ++++++++++++++--------------- packages/worker/native/sdk-core | 2 +- packages/workflow/src/internals.ts | 4 + 3 files changed, 62 insertions(+), 65 deletions(-) diff --git a/packages/worker/native/Cargo.lock b/packages/worker/native/Cargo.lock index 29150e872..b7703f744 100644 --- a/packages/worker/native/Cargo.lock +++ b/packages/worker/native/Cargo.lock @@ -1,14 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -[[package]] -name = "aho-corasick" -version = "0.7.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" -dependencies = [ - "memchr", -] - [[package]] name = "ansi_term" version = "0.12.1" @@ -56,17 +47,6 @@ dependencies = [ "syn", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.0.1" @@ -243,19 +223,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "env_logger" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e" -dependencies = [ - "atty", - "humantime", - "log", - "regex", - "termcolor", -] - [[package]] name = "fixedbitset" version = "0.2.0" @@ -474,12 +441,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.2" @@ -525,6 +486,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if", +] + [[package]] name = "integer-encoding" version = "1.1.6" @@ -540,6 +510,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d572918e350e82412fe766d24b15e6682fb2ed2bbe018280caa810397cb319" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.7" @@ -577,6 +556,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "lock_api" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.14" @@ -779,6 +767,31 @@ dependencies = [ "num-traits", ] +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -892,7 +905,7 @@ checksum = "32d3ebd75ac2679c2af3a92246639f9fcc8a442ee420719cc4fe195b98dd5fa3" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.9.0", "log", "multimap", "petgraph", @@ -909,7 +922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" dependencies = [ "anyhow", - "itertools", + "itertools 0.9.0", "proc-macro2", "quote", "syn", @@ -989,10 +1002,7 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" dependencies = [ - "aho-corasick", - "memchr", "regex-syntax", - "thread_local", ] [[package]] @@ -1171,16 +1181,17 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64", "crossbeam", "dashmap", "derive_more", "displaydoc", - "env_logger", "futures", - "log", + "itertools 0.10.0", "once_cell", "opentelemetry", "opentelemetry-jaeger", + "parking_lot", "prost", "prost-types", "rand", @@ -1209,15 +1220,6 @@ dependencies = [ "temporal-sdk-core", ] -[[package]] -name = "termcolor" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.23" @@ -1675,15 +1677,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/packages/worker/native/sdk-core b/packages/worker/native/sdk-core index ec360a5c9..887bc594e 160000 --- a/packages/worker/native/sdk-core +++ b/packages/worker/native/sdk-core @@ -1 +1 @@ -Subproject commit ec360a5c9f8ef2b274c545c04e4f9b66a6d1914f +Subproject commit 887bc594e19d7863302576726b5a36f968b3f389 diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 299a962e8..2082c2e8e 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -208,6 +208,10 @@ export class Activator implements WorkflowTaskHandler { } } + public signalWorkflow(): void { + throw new Error('Not implemented'); + } + public updateRandomSeed(_activation: iface.coresdk.IUpdateRandomSeed): void { throw new Error('Not implemented'); } From 9904ac276695d89537b8cba24f189f82ccb307d5 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 9 Mar 2021 10:10:19 +0200 Subject: [PATCH 4/4] Fix macos integration tests --- packages/test/src/test-integration.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index a42a2ae92..249c66666 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -9,8 +9,6 @@ import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/da import { u8, RUN_INTEGRATION_TESTS } from './helpers'; import { tsToMs } from '@temporalio/workflow/commonjs/time'; -const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` }); - const { EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, @@ -20,6 +18,8 @@ const { const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); if (RUN_INTEGRATION_TESTS) { + const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` }); + test.before((t) => { worker.run('test').catch((err) => { t.fail(`Failed to run worker: ${err}`);