Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/test-workflows/src/cancel-timer-with-delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { CancellationError, cancel, sleep } from '@temporalio/workflow';

export async function main(): Promise<void> {
const timer = sleep(10000);
await sleep(1).then(() => cancel(timer));
try {
await timer;
} catch (e) {
if (e instanceof CancellationError) {
console.log('Timer cancelled 👍');
} else {
throw e;
}
}
}
73 changes: 69 additions & 4 deletions packages/test/src/test-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,25 @@ import { ArgsAndReturn } from '../../test-interfaces/lib';
import * as iface from '@temporalio/proto';
import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/data-converter';
import { u8 } from './helpers';
import { tsToMs } from '@temporalio/workflow/commonjs/time';

const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` });

const {
EVENT_TYPE_TIMER_STARTED,
EVENT_TYPE_TIMER_FIRED,
EVENT_TYPE_TIMER_CANCELED,
} = iface.temporal.api.enums.v1.EventType;
const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]);

if (process.env.RUN_INTEGRATION_TESTS === '1') {
test.before(() => {
const worker = new Worker(__dirname, { workflowsPath: `${__dirname}/../../test-workflows/lib` });
// TODO: use worker shutdown, fix this dangling promise
worker.run('test');
test.before((t) => {
worker.run('test').catch((err) => {
t.fail(`Failed to run worker: ${err}`);
});
});
test.after.always(() => {
worker.shutdown();
});

test('args-and-return', async (t) => {
Expand All @@ -22,6 +35,58 @@ if (process.env.RUN_INTEGRATION_TESTS === '1') {
t.is(res, 'Hello, world!');
});

test('set-timeout', async (t) => {
const client = new Connection();
const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' }));
const runId = await client.startWorkflowExecution(opts, 'set-timeout');
const res = await client.untilComplete(opts.workflowId, runId);
t.is(res, undefined);
const execution = await client.service.getWorkflowExecutionHistory({
namespace: client.options.namespace,
execution: { workflowId: opts.workflowId, runId },
});
const timerEvents = execution.history!.events!.filter(({ eventType }) => timerEventTypes.has(eventType!));
t.is(timerEvents.length, 2);
t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '0');
t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 100);
t.is(timerEvents[1].timerFiredEventAttributes!.timerId, '0');
});

test('cancel-timer-immediately', async (t) => {
const client = new Connection();
const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' }));
const runId = await client.startWorkflowExecution(opts, 'cancel-timer');
const res = await client.untilComplete(opts.workflowId, runId);
t.is(res, undefined);
const execution = await client.service.getWorkflowExecutionHistory({
namespace: client.options.namespace,
execution: { workflowId: opts.workflowId, runId },
});
const timerEvents = execution.history!.events!.filter(({ eventType }) => timerEventTypes.has(eventType!));
// Timer is cancelled before it is scheduled
t.is(timerEvents.length, 0);
});

test('cancel-timer-with-delay', async (t) => {
const client = new Connection();
const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' }));
const runId = await client.startWorkflowExecution(opts, 'cancel-timer-with-delay');
const res = await client.untilComplete(opts.workflowId, runId);
t.is(res, undefined);
const execution = await client.service.getWorkflowExecutionHistory({
namespace: client.options.namespace,
execution: { workflowId: opts.workflowId, runId },
});
const timerEvents = execution.history!.events!.filter(({ eventType }) => timerEventTypes.has(eventType!));
t.is(timerEvents.length, 4);
t.is(timerEvents[0].timerStartedEventAttributes!.timerId, '0');
t.is(tsToMs(timerEvents[0].timerStartedEventAttributes!.startToFireTimeout), 10000);
t.is(timerEvents[1].timerStartedEventAttributes!.timerId, '1');
t.is(tsToMs(timerEvents[1].timerStartedEventAttributes!.startToFireTimeout), 1);
t.is(timerEvents[2].timerFiredEventAttributes!.timerId, '1');
t.is(timerEvents[3].timerCanceledEventAttributes!.timerId, '0');
});

test('WorkflowOptions are passed correctly with defaults', async (t) => {
const client = new Connection();
const opts = compileWorkflowOptions(addDefaults({ taskQueue: 'test' }));
Expand Down
2 changes: 1 addition & 1 deletion packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ test('cancel-workflow-from-workflow', async (t) => {
t.deepEqual(logs, [['Timer cancelled 👍']]);
});

test('cancel-timer', async (t) => {
test('cancel-timer-immediately', async (t) => {
const { script, logs } = t.context;
const req = await activate(t, makeStartWorkflow(script));
compareCompletion(
Expand Down
116 changes: 36 additions & 80 deletions packages/worker/native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/worker/native/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
export interface Worker {
}
export interface Worker {}

export declare type PollCallback = (err?: Error, result?: ArrayBuffer) => void;
export declare function newWorker(queueName: string): Worker;
export declare function workerShutdown(worker: Worker): void;
export declare function workerPoll(worker: Worker, callback: PollCallback): void;
export declare function workerCompleteTask(worker: Worker, result: ArrayBuffer): boolean;
export declare function workerSuspendPolling(worker: Worker): void;
Expand Down
12 changes: 8 additions & 4 deletions packages/worker/native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,14 @@ fn worker_complete_task(mut cx: FunctionContext) -> JsResult<JsUndefined> {
}

fn worker_shutdown(mut cx: FunctionContext) -> JsResult<JsUndefined> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to be this PR but you'll probably want to have some mechanism to drain any remaining activations from poll_task until it returns CoreError::ShuttingDown. Remaining activations may exist due to replaying a workflow to catch up to the end of history which was already fetched from the server

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good point (see #19).

// TODO:
// let worker = cx.argument::<BoxedWorker>(0)?;
// let w = &mut worker.read().unwrap();
Ok(cx.undefined())
let worker = cx.argument::<BoxedWorker>(0)?;
match worker.core.shutdown() {
Ok(_) => Ok(cx.undefined()),
Err(err) => {
let error = JsError::error(&mut cx, format!("{}", err))?;
cx.throw(error)
}
}
}

fn worker_suspend_polling(mut cx: FunctionContext) -> JsResult<JsUndefined> {
Expand Down
Loading