Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Worker, DefaultLogger } from '@temporalio/worker';
const logger = new DefaultLogger('WARNING', (severity, message, meta) => {
/* Implement this in order to generate custom log output */
});
const worker = new Worker(__dirname, { logger });
const worker = await Worker.create(__dirname, { logger });
```

#### BYOL - Bring your own logger
Expand All @@ -27,5 +27,5 @@ const logger = winston.createLogger({
format: winston.format.json(),
transports: [new winston.transports.Console()],
});
const worker = new Worker(__dirname, { logger });
const worker = await Worker.create(__dirname, { logger });
```
2 changes: 1 addition & 1 deletion packages/create-project/samples/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Worker } from '@temporalio/worker';

async function run() {
// Automatically locate and register activities and workflows
const worker = new Worker(__dirname);
const worker = await Worker.create(__dirname);
// Bind to the `tutorial` queue and start accepting tasks
await worker.run('tutorial');
}
Expand Down
2 changes: 1 addition & 1 deletion packages/meta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ import { Worker } from '@temporalio/worker';

(async () => {
// Automatically locate and register activities and workflows
const worker = new Worker(__dirname);
const worker = await Worker.create(__dirname);
// Bind to the `tutorial` queue and start accepting tasks
await worker.run('tutorial');
})();
Expand Down
93 changes: 73 additions & 20 deletions packages/test/src/mock-native-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,100 @@
import { v4 as uuid4 } from 'uuid';
import { coresdk } from '@temporalio/proto';
import { defaultDataConverter } from '@temporalio/workflow/commonjs/converter/data-converter';
import { Worker as RealWorker, NativeWorkerLike } from '@temporalio/worker/lib/worker';
import { Worker as RealWorker, NativeWorkerLike, WorkerOptions } from '@temporalio/worker/lib/worker';
import { sleep } from '@temporalio/worker/lib/utils';

export type Task =
| { workflow: coresdk.workflow_activation.IWFActivation }
| { activity: coresdk.activity_task.IActivityTask };

export class MockNativeWorker implements NativeWorkerLike {
tasks: Array<Promise<ArrayBuffer>> = [];
reject?: (err: Error) => void;
completionCallback?: (arr: ArrayBuffer) => void;
activityTasks: Array<Promise<ArrayBuffer>> = [];
workflowActivations: Array<Promise<ArrayBuffer>> = [];
activityCompletionCallback?: (arr: ArrayBuffer) => void;
workflowCompletionCallback?: (arr: ArrayBuffer) => void;
activityHeartbeatCallback?: (activityId: string, details: any) => void;
reject?: (err: Error) => void;

public static async create(): Promise<NativeWorkerLike> {
return new this();
}

public async breakLoop(): Promise<void> {
// Nothing to break from
}

public shutdown(): void {
this.tasks.unshift(Promise.reject(new Error('[Core::shutdown]')));
this.activityTasks.unshift(Promise.reject(new Error('Core is shut down')));
this.workflowActivations.unshift(Promise.reject(new Error('Core is shut down')));
}

public async pollWorkflowActivation(_queueName: string): Promise<ArrayBuffer> {
for (;;) {
const task = this.workflowActivations.pop();
if (task !== undefined) {
return task;
}
await sleep(1);
}
}

public async poll(_queueName: string): Promise<ArrayBuffer> {
public async pollActivityTask(_queueName: string): Promise<ArrayBuffer> {
for (;;) {
const task = this.tasks.pop();
const task = this.activityTasks.pop();
if (task !== undefined) {
return task;
}
await sleep(1);
}
}

public completeTask(result: ArrayBuffer): void {
this.completionCallback!(result);
this.completionCallback = undefined;
public async completeWorkflowActivation(result: ArrayBuffer): Promise<void> {
this.workflowCompletionCallback!(result);
this.workflowCompletionCallback = undefined;
}

public async completeActivityTask(result: ArrayBuffer): Promise<void> {
this.activityCompletionCallback!(result);
this.activityCompletionCallback = undefined;
}

public emit(task: Task): void {
if ('workflow' in task) {
const arr = coresdk.workflow_activation.WFActivation.encode(task.workflow).finish();
const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength);
this.workflowActivations.unshift(Promise.resolve(buffer));
} else {
const arr = coresdk.activity_task.ActivityTask.encode(task.activity).finish();
const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength);
this.activityTasks.unshift(Promise.resolve(buffer));
}
}

public emit(task: coresdk.ITask): void {
const arr = coresdk.Task.encode(task).finish();
public async runWorkflowActivation(
activation: coresdk.workflow_activation.IWFActivation
): Promise<coresdk.workflow_completion.WFActivationCompletion> {
activation = { ...activation, taskToken: activation.taskToken ?? Buffer.from(uuid4()) };
const arr = coresdk.workflow_activation.WFActivation.encode(activation).finish();
const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength);
this.tasks.unshift(Promise.resolve(buffer));
const result = await new Promise<ArrayBuffer>((resolve) => {
this.workflowCompletionCallback = resolve;
this.workflowActivations.unshift(Promise.resolve(buffer));
});
return coresdk.workflow_completion.WFActivationCompletion.decodeDelimited(new Uint8Array(result));
}

public async runAndWaitCompletion(task: coresdk.ITask): Promise<coresdk.TaskCompletion> {
task = { ...task, taskToken: task.taskToken ?? Buffer.from(uuid4()) };
const arr = coresdk.Task.encode(task).finish();
public async runActivityTask(task: coresdk.activity_task.IActivityTask): Promise<coresdk.ActivityTaskCompletion> {
const arr = coresdk.activity_task.ActivityTask.encode(task).finish();
const buffer = arr.buffer.slice(arr.byteOffset, arr.byteOffset + arr.byteLength);
const result = await new Promise<ArrayBuffer>((resolve) => {
this.completionCallback = resolve;
this.tasks.unshift(Promise.resolve(buffer));
this.activityCompletionCallback = resolve;
this.activityTasks.unshift(Promise.resolve(buffer));
});
return coresdk.TaskCompletion.decodeDelimited(new Uint8Array(result));
return coresdk.ActivityTaskCompletion.decodeDelimited(new Uint8Array(result));
}

sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): void {
public async sendActivityHeartbeat(activityId: string, details?: ArrayBuffer): Promise<void> {
const payload = details && coresdk.common.Payload.decode(new Uint8Array(details));
const arg = payload ? defaultDataConverter.fromPayload(payload) : undefined;
this.activityHeartbeatCallback!(activityId, arg);
Expand All @@ -70,4 +118,9 @@ export class Worker extends RealWorker {
public get native(): MockNativeWorker {
return this.nativeWorker as MockNativeWorker;
}

public constructor(pwd: string, opts?: WorkerOptions) {
const nativeWorker = new MockNativeWorker();
super(nativeWorker, pwd, opts);
}
}
2 changes: 1 addition & 1 deletion packages/test/src/run-a-worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Worker } from '@temporalio/worker';

async function main() {
const worker = new Worker(__dirname, {
const worker = await Worker.create(__dirname, {
workflowsPath: `${__dirname}/../../test-workflows/lib`,
activitiesPath: `${__dirname}/../../test-activities/lib`,
});
Expand Down
30 changes: 18 additions & 12 deletions packages/test/src/test-integration.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint @typescript-eslint/no-non-null-assertion: 0 */
import test from 'ava';
import anyTest, { TestInterface } from 'ava';
import { v4 as uuid4 } from 'uuid';
import { Connection } from '@temporalio/client';
import { tsToMs } from '@temporalio/workflow/commonjs/time';
Expand All @@ -23,21 +23,28 @@ const {

const timerEventTypes = new Set([EVENT_TYPE_TIMER_STARTED, EVENT_TYPE_TIMER_FIRED, EVENT_TYPE_TIMER_CANCELED]);

export interface Context {
worker: Worker;
}

const test = anyTest as TestInterface<Context>;

if (RUN_INTEGRATION_TESTS) {
const worker = new Worker(__dirname, {
workflowsPath: `${__dirname}/../../test-workflows/lib`,
activitiesPath: `${__dirname}/../../test-activities/lib`,
logger: new DefaultLogger('DEBUG'),
});
test.before(async (t) => {
const worker = await Worker.create(__dirname, {
workflowsPath: `${__dirname}/../../test-workflows/lib`,
activitiesPath: `${__dirname}/../../test-activities/lib`,
logger: new DefaultLogger('DEBUG'),
});
t.context = { worker };

test.before((t) => {
worker.run('test').catch((err) => {
console.error(err);
t.fail(`Failed to run worker: ${err}`);
});
});
test.after.always(() => {
worker.shutdown();
test.after.always((t) => {
t.context.worker.shutdown();
});

test('Workflow not found results in failure', async (t) => {
Expand Down Expand Up @@ -75,12 +82,11 @@ if (RUN_INTEGRATION_TESTS) {
t.throwsAsync(promise, { message: /just because/, instanceOf: WorkflowExecutionFailedError });
});

// Activities not yet properly implemented
test.skip('http', async (t) => {
test('http', async (t) => {
const client = new Connection();
const workflow = client.workflow<HTTP>('http', { taskQueue: 'test' });
const res = await workflow.start();
t.is(res, [await httpGet('https://google.com'), await httpGet('http://example.com')]);
t.deepEqual(res, [await httpGet('https://google.com'), await httpGet('http://example.com')]);
});

test('set-timeout', async (t) => {
Expand Down
90 changes: 39 additions & 51 deletions packages/test/src/test-worker-activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,30 @@ test.beforeEach((t) => {

function compareCompletion(
t: ExecutionContext<Context>,
actual: coresdk.TaskCompletion,
expected: coresdk.ITaskCompletion
actual: coresdk.activity_result.IActivityResult | null | undefined,
expected: coresdk.activity_result.IActivityResult
) {
t.deepEqual(actual.toJSON(), coresdk.TaskCompletion.create(expected).toJSON());
t.deepEqual(
coresdk.activity_result.ActivityResult.create(actual || undefined).toJSON(),
coresdk.activity_result.ActivityResult.create(expected).toJSON()
);
}

test('Worker runs an activity and reports completion', async (t) => {
const { worker } = t.context;
await runWorker(t, async () => {
const taskToken = Buffer.from(uuid4());
const url = 'https://temporal.io';
const completion = await worker.native.runAndWaitCompletion({
const completion = await worker.native.runActivityTask({
taskToken,
activity: {
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'httpGet']),
input: defaultDataConverter.toPayloads(url),
},
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'httpGet']),
input: defaultDataConverter.toPayloads(url),
},
});
compareCompletion(t, completion, {
taskToken,
activity: { completed: { result: defaultDataConverter.toPayloads(await httpGet(url)) } },
compareCompletion(t, completion.result, {
completed: { result: defaultDataConverter.toPayload(await httpGet(url)) },
});
});
});
Expand All @@ -64,19 +64,16 @@ test('Worker runs an activity and reports failure', async (t) => {
await runWorker(t, async () => {
const taskToken = Buffer.from(uuid4());
const message = ':(';
const completion = await worker.native.runAndWaitCompletion({
const completion = await worker.native.runActivityTask({
taskToken,
activity: {
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'throwAnError']),
input: defaultDataConverter.toPayloads(message),
},
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'throwAnError']),
input: defaultDataConverter.toPayloads(message),
},
});
compareCompletion(t, completion, {
taskToken,
activity: { failed: { failure: { message } } },
compareCompletion(t, completion.result, {
failed: { failure: { message } },
});
});
});
Expand All @@ -85,8 +82,8 @@ test('Worker cancels activity and reports cancellation', async (t) => {
const { worker } = t.context;
await runWorker(t, async () => {
worker.native.emit({
taskToken: Buffer.from(uuid4()),
activity: {
taskToken: Buffer.from(uuid4()),
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'waitForCancellation']),
Expand All @@ -95,16 +92,13 @@ test('Worker cancels activity and reports cancellation', async (t) => {
},
});
const taskToken = Buffer.from(uuid4());
const completion = await worker.native.runAndWaitCompletion({
const completion = await worker.native.runActivityTask({
taskToken,
activity: {
activityId: 'abc',
cancel: {},
},
activityId: 'abc',
cancel: {},
});
compareCompletion(t, completion, {
taskToken,
activity: { canceled: {} },
compareCompletion(t, completion.result, {
canceled: {},
});
});
});
Expand All @@ -113,8 +107,8 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => {
const { worker } = t.context;
await runWorker(t, async () => {
worker.native.emit({
taskToken: Buffer.from(uuid4()),
activity: {
taskToken: Buffer.from(uuid4()),
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'cancellableFetch']),
Expand All @@ -123,16 +117,13 @@ test('Activity Context AbortSignal cancels a fetch request', async (t) => {
},
});
const taskToken = Buffer.from(uuid4());
const completion = await worker.native.runAndWaitCompletion({
const completion = await worker.native.runActivityTask({
taskToken,
activity: {
activityId: 'abc',
cancel: {},
},
activityId: 'abc',
cancel: {},
});
compareCompletion(t, completion, {
taskToken,
activity: { canceled: {} },
compareCompletion(t, completion.result, {
canceled: {},
});
});
});
Expand All @@ -141,22 +132,19 @@ test('Activity Context heartbeat is sent to core', async (t) => {
const { worker } = t.context;
await runWorker(t, async () => {
const taskToken = Buffer.from(uuid4());
const completionPromise = worker.native.runAndWaitCompletion({
const completionPromise = worker.native.runActivityTask({
taskToken,
activity: {
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'progressiveSleep']),
input: defaultDataConverter.toPayloads(),
},
activityId: 'abc',
start: {
activityType: JSON.stringify(['@activities', 'progressiveSleep']),
input: defaultDataConverter.toPayloads(),
},
});
t.is(await worker.native.untilHeartbeat('abc'), 1);
t.is(await worker.native.untilHeartbeat('abc'), 2);
t.is(await worker.native.untilHeartbeat('abc'), 3);
compareCompletion(t, await completionPromise, {
taskToken,
activity: { completed: { result: defaultDataConverter.toPayloads(undefined) } },
compareCompletion(t, (await completionPromise).result, {
completed: { result: defaultDataConverter.toPayload(undefined) },
});
});
});
Loading