From 749f863d6f22a1467e3bd5b369b790caf335caf1 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 1 Mar 2021 15:11:50 +0200 Subject: [PATCH] Test timer integration, add worker shutdown --- ...l-timer.ts => cancel-timer-immediately.ts} | 0 .../src/cancel-timer-with-delay.ts | 15 +++ packages/test/src/test-integration.ts | 73 ++++++++++- packages/test/src/test-workflows.ts | 2 +- packages/worker/native/Cargo.lock | 116 ++++++------------ packages/worker/native/index.d.ts | 4 +- packages/worker/native/src/lib.rs | 12 +- packages/worker/src/worker.ts | 8 ++ packages/workflow/src/internals.ts | 40 ++++-- sdk-core | 2 +- 10 files changed, 170 insertions(+), 102 deletions(-) rename packages/test-workflows/src/{cancel-timer.ts => cancel-timer-immediately.ts} (100%) create mode 100644 packages/test-workflows/src/cancel-timer-with-delay.ts diff --git a/packages/test-workflows/src/cancel-timer.ts b/packages/test-workflows/src/cancel-timer-immediately.ts similarity index 100% rename from packages/test-workflows/src/cancel-timer.ts rename to packages/test-workflows/src/cancel-timer-immediately.ts diff --git a/packages/test-workflows/src/cancel-timer-with-delay.ts b/packages/test-workflows/src/cancel-timer-with-delay.ts new file mode 100644 index 000000000..c3f49ea2a --- /dev/null +++ b/packages/test-workflows/src/cancel-timer-with-delay.ts @@ -0,0 +1,15 @@ +import { CancellationError, cancel, sleep } from '@temporalio/workflow'; + +export async function main(): Promise { + const timer = sleep(10000); + await sleep(1).then(() => cancel(timer)); + try { + await timer; + } catch (e) { + if (e instanceof CancellationError) { + console.log('Timer cancelled 👍'); + } else { + throw e; + } + } +} diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index 25cf96e4c..aa3c65cde 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -7,12 +7,25 @@ import { ArgsAndReturn } from '../../test-interfaces/lib'; import * as iface from '@temporalio/proto'; import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/data-converter'; import { u8 } from './helpers'; +import { tsToMs } from '@temporalio/workflow/commonjs/time'; + +const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` }); + +const { + EVENT_TYPE_TIMER_STARTED, + EVENT_TYPE_TIMER_FIRED, + EVENT_TYPE_TIMER_CANCELED, +} = iface.temporal.api.enums.v1.EventType; +const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]); if (process.env.RUN_INTEGRATION_TESTS === '1') { - test.before(() => { - const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` }); - // TODO: use worker shutdown, fix this dangling promise - worker.run('test'); + test.before((t) => { + worker.run('test').catch((err) => { + t.fail(`Failed to run worker: ${err}`); + }); + }); + test.after.always(() => { + worker.shutdown(); }); test('args-and-return', async (t) => { @@ -22,6 +35,58 @@ if (process.env.RUN_INTEGRATION_TESTS === '1') { t.is(res, 'Hello, world!'); }); + test('set-timeout', async (t) => { + const client = new Connection(); + const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' })); + const runId = await client.startWorkflowExecution(opts, 'set-timeout'); + const res = await client.untilComplete(opts.workflowId, runId); + t.is(res, undefined); + const execution = await client.service.getWorkflowExecutionHistory({ + namespace: client.options.namespace, + execution: { workflowId: opts.workflowId, runId }, + }); + const timerEvents = execution.history!.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); + t.is(timerEvents.length, 2); + t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '0'); + t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 100); + t.is(timerEvents[1].timerFiredEventAttributes!.timerId, '0'); + }); + + test('cancel-timer-immediately', async (t) => { + const client = new Connection(); + const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' })); + const runId = await client.startWorkflowExecution(opts, 'cancel-timer'); + const res = await client.untilComplete(opts.workflowId, runId); + t.is(res, undefined); + const execution = await client.service.getWorkflowExecutionHistory({ + namespace: client.options.namespace, + execution: { workflowId: opts.workflowId, runId }, + }); + const timerEvents = execution.history!.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); + // Timer is cancelled before it is scheduled + t.is(timerEvents.length, 0); + }); + + test('cancel-timer-with-delay', async (t) => { + const client = new Connection(); + const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' })); + const runId = await client.startWorkflowExecution(opts, 'cancel-timer-with-delay'); + const res = await client.untilComplete(opts.workflowId, runId); + t.is(res, undefined); + const execution = await client.service.getWorkflowExecutionHistory({ + namespace: client.options.namespace, + execution: { workflowId: opts.workflowId, runId }, + }); + const timerEvents = execution.history!.events!.filter(({ eventType }) => timerEventTypes.has(eventType!)); + t.is(timerEvents.length, 4); + t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '0'); + t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 10000); + t.is(timerEvents[1].timerStartedEventAttributes!.timerId, '1'); + t.is(tsToMs(timerEvents[1].timerStartedEventAttributes!.startToFireTimeout), 1); + t.is(timerEvents[2].timerFiredEventAttributes!.timerId, '1'); + t.is(timerEvents[3].timerCanceledEventAttributes!.timerId, '0'); + }); + test('WorkflowOptions are passed correctly with defaults', async (t) => { const client = new Connection(); const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' })); diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 27081f6b6..55220d9ee 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -471,7 +471,7 @@ test('cancel-workflow-from-workflow', async (t) => { t.deepEqual(logs, [['Timer cancelled 👍']]); }); -test('cancel-timer', async (t) => { +test('cancel-timer-immediately', async (t) => { const { script, logs } = t.context; const req = await activate(t, makeStartWorkflow(script)); compareCompletion( diff --git a/packages/worker/native/Cargo.lock b/packages/worker/native/Cargo.lock index a4671640a..c4bb6ac5a 100644 --- a/packages/worker/native/Cargo.lock +++ b/packages/worker/native/Cargo.lock @@ -386,17 +386,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.2.2" @@ -405,7 +394,7 @@ checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" dependencies = [ "cfg-if", "libc", - "wasi 0.10.2+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -754,26 +743,25 @@ checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" [[package]] name = "opentelemetry" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3434e2a9d2aec539d91f4251bf9047cd53b4d3f386f9d336f4c8076c72a5256" +checksum = "514d24875c140ed269eecc2d1b56d7b71b573716922a763c317fb1b1b4b58f15" dependencies = [ "async-trait", "futures", "js-sys", "lazy_static", "percent-encoding", - "pin-project 0.4.27", - "rand 0.7.3", - "regex", + "pin-project 1.0.5", + "rand", "thiserror", ] [[package]] name = "opentelemetry-jaeger" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c604a73595f605a852c431ef9c6bbacc7b911f094900905fd2f684b6fc44b4" +checksum = "a5677b3a361784aff6e2b1b30dbdb5f85f4ec57ff2ced41d9a481ad70a9d0b57" dependencies = [ "async-trait", "lazy_static", @@ -946,19 +934,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc 0.2.0", -] - [[package]] name = "rand" version = "0.8.3" @@ -966,19 +941,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", - "rand_chacha 0.3.0", - "rand_core 0.6.1", - "rand_hc 0.3.0", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", + "rand_chacha", + "rand_core", + "rand_hc", ] [[package]] @@ -988,16 +953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.1", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", + "rand_core", ] [[package]] @@ -1006,16 +962,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" dependencies = [ - "getrandom 0.2.2", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", + "getrandom", ] [[package]] @@ -1024,7 +971,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.1", + "rand_core", ] [[package]] @@ -1077,10 +1024,8 @@ dependencies = [ name = "rustfsm" version = "0.1.0" dependencies = [ - "derive_more", "state_machine_procmacro", "state_machine_trait", - "thiserror", ] [[package]] @@ -1154,6 +1099,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +[[package]] +name = "slotmap" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3003725ae562cf995f3dc82bb99e70926e09000396816765bb6d7adbe740b1" +dependencies = [ + "version_check", +] + [[package]] name = "smallvec" version = "1.6.1" @@ -1205,7 +1159,7 @@ checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ "cfg-if", "libc", - "rand 0.8.3", + "rand", "redox_syscall", "remove_dir_all", "winapi", @@ -1224,12 +1178,14 @@ dependencies = [ "env_logger", "futures", "log", + "once_cell", "opentelemetry", "opentelemetry-jaeger", "prost", "prost-types", - "rand 0.8.3", + "rand", "rustfsm", + "slotmap", "thiserror", "tokio", "tonic", @@ -1440,7 +1396,7 @@ dependencies = [ "futures-util", "indexmap", "pin-project 1.0.5", - "rand 0.8.3", + "rand", "slab", "tokio", "tokio-stream", @@ -1517,9 +1473,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1706e1f42970e09aa0635deb4f4607e8704a4390427d5f0062bf59240338bcc" +checksum = "cccdf13c28f1654fe806838f28c5b9cb23ca4c0eae71450daa489f50e523ceb1" dependencies = [ "opentelemetry", "tracing", @@ -1614,9 +1570,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "getrandom 0.2.2", + "getrandom", ] +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + [[package]] name = "want" version = "0.3.0" @@ -1627,12 +1589,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" diff --git a/packages/worker/native/index.d.ts b/packages/worker/native/index.d.ts index 05d65d7ae..76190396c 100644 --- a/packages/worker/native/index.d.ts +++ b/packages/worker/native/index.d.ts @@ -1,8 +1,8 @@ -export interface Worker { -} +export interface Worker {} export declare type PollCallback = (err?: Error, result?: ArrayBuffer) => void; export declare function newWorker(queueName: string): Worker; +export declare function workerShutdown(worker: Worker): void; export declare function workerPoll(worker: Worker, callback: PollCallback): void; export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): boolean; export declare function workerSuspendPolling(worker: Worker): void; diff --git a/packages/worker/native/src/lib.rs b/packages/worker/native/src/lib.rs index 02f5b0225..402100722 100644 --- a/packages/worker/native/src/lib.rs +++ b/packages/worker/native/src/lib.rs @@ -146,10 +146,14 @@ fn worker_complete_task(mut cx: FunctionContext) -> JsResult { } fn worker_shutdown(mut cx: FunctionContext) -> JsResult { - // TODO: - // let worker = cx.argument::(0)?; - // let w = &mut worker.read().unwrap(); - Ok(cx.undefined()) + let worker = cx.argument::(0)?; + match worker.core.shutdown() { + Ok(_) => Ok(cx.undefined()), + Err(err) => { + let error = JsError::error(&mut cx, format!("{}", err))?; + cx.throw(error) + } + } } fn worker_suspend_polling(mut cx: FunctionContext) -> JsResult { diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 719db44ef..31911920c 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -4,6 +4,7 @@ import { groupBy, mapTo, mergeMap } from 'rxjs/operators'; import { coresdk } from '@temporalio/proto'; import { newWorker, + workerShutdown, workerPoll, workerIsSuspended, workerResumePolling, @@ -120,6 +121,13 @@ export class Worker { // Not implemented yet } + shutdown(): void { + if (this.nativeWorker === undefined) { + throw new Error('Not running'); + } + workerShutdown(this.nativeWorker); + } + async run(queueName: string): Promise { const native = newWorker(queueName); this.nativeWorker = native; diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index a21b45090..3717c448a 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -129,6 +129,22 @@ function failQuery(error: any) { }); } +function consumeCompletion(taskSeq: number) { + const completion = state.completions.get(taskSeq); + if (completion === undefined) { + throw new Error(`No completion for taskSeq ${taskSeq}`); + } + state.completions.delete(taskSeq); + return completion; +} + +function timerIdToSeq(timerId: string | undefined | null) { + if (!timerId) { + throw new Error('Got activation with no timerId'); + } + return parseInt(timerId); +} + export class Activator implements WorkflowTaskHandler { public startWorkflow(activation: iface.coresdk.IStartWorkflowTaskAttributes): void { if (state.workflow === undefined) { @@ -153,19 +169,19 @@ export class Activator implements WorkflowTaskHandler { } public timerFired(activation: iface.coresdk.ITimerFiredTaskAttributes): void { - if (!activation.timerId) { - throw new Error('Got a TimerFired activation with no timerId'); - } - const taskSeq = parseInt(activation.timerId); - const completion = state.completions.get(taskSeq); - if (completion === undefined) { - throw new Error(`No callback for taskSeq ${taskSeq}`); - } - state.completions.delete(taskSeq); - const { resolve } = completion; + const { resolve } = consumeCompletion(timerIdToSeq(activation.timerId)); resolve(undefined); } + public timerCanceled(activation: iface.coresdk.ITimerCanceledTaskAttributes): void { + const { scope } = consumeCompletion(timerIdToSeq(activation.timerId)); + try { + scope.cancel(new CancellationError('Timer cancelled')); + } catch (e) { + if (!(e instanceof CancellationError)) throw e; + } + } + public queryWorkflow(job: iface.coresdk.IQueryWorkflowJob): void { if (state.workflow === undefined) { throw new Error('state.workflow is not defined'); @@ -191,6 +207,10 @@ export class Activator implements WorkflowTaskHandler { failQuery(err); } } + + public randomSeedUpdated(_activation: iface.coresdk.IRandomSeedUpdatedAttributes): void { + throw new Error('Not implemented'); + } } /** diff --git a/sdk-core b/sdk-core index cfff382d5..ac936a9c1 160000 --- a/sdk-core +++ b/sdk-core @@ -1 +1 @@ -Subproject commit cfff382d58aaa62a4c267e5afd2a12a6c1d41223 +Subproject commit ac936a9c17acf15102c24a823a63629271917f1a