Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/custom messages #10292

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/CODEOWNERS

This file was deleted.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/node_modules

/packages/*/build/
/packages/*/build-es5/
/packages/*/coverage/
/packages/*/node_modules/
/packages/*/package-lock.json
Expand Down
86 changes: 59 additions & 27 deletions packages/jest-worker/src/Farm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import {
CHILD_MESSAGE_CALL,
ChildMessage,
FarmOptions,
OnCustomMessage,
OnEnd,
OnStart,
PromiseWithCustomMessage,
QueueChildMessage,
QueueItem,
WorkerInterface,
Expand Down Expand Up @@ -44,41 +46,65 @@ export default class Farm {
}
}

doWork(method: string, ...args: Array<any>): Promise<unknown> {
return new Promise((resolve, reject) => {
const computeWorkerKey = this._computeWorkerKey;
const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args];
doWork(
method: string,
...args: Array<any>
): PromiseWithCustomMessage<unknown> {
const customMessageListeners: Set<OnCustomMessage> = new Set();

const addCustomMessageListener = (listener: OnCustomMessage) => {
customMessageListeners.add(listener);
return () => {
// Check if the following check is required
customMessageListeners.forEach(customListener => {
if (customListener === listener) {
customMessageListeners.delete(customListener);
}
});
};
};

let worker: WorkerInterface | null = null;
let hash: string | null = null;
const onCustomMessage: OnCustomMessage = message => {
customMessageListeners.forEach(listener => listener(message));
};

if (computeWorkerKey) {
hash = computeWorkerKey.call(this, method, ...args);
worker = hash == null ? null : this._cacheKeys[hash];
}
const promise: PromiseWithCustomMessage<unknown> = new Promise(
(resolve, reject) => {
const computeWorkerKey = this._computeWorkerKey;
const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args];

const onStart: OnStart = (worker: WorkerInterface) => {
if (hash != null) {
this._cacheKeys[hash] = worker;
let worker: WorkerInterface | null = null;
let hash: string | null = null;

if (computeWorkerKey) {
hash = computeWorkerKey.call(this, method, ...args);
worker = hash == null ? null : this._cacheKeys[hash];
}
};

const onEnd: OnEnd = (error: Error | null, result: unknown) => {
if (error) {
reject(error);
const onStart: OnStart = (worker: WorkerInterface) => {
if (hash != null) {
this._cacheKeys[hash] = worker;
}
};

const onEnd: OnEnd = (error: Error | null, result: unknown) => {
customMessageListeners.clear();
return error ? reject(error) : resolve(result);
};

const task = {onCustomMessage, onEnd, onStart, request};

if (worker) {
this._enqueue(task, worker.getWorkerId());
} else {
resolve(result);
this._push(task);
}
};
},
);

const task = {onEnd, onStart, request};
promise.UNSTABLE_onCustomMessage = addCustomMessageListener;

if (worker) {
this._enqueue(task, worker.getWorkerId());
} else {
this._push(task);
}
});
return promise;
}

private _getNextTask(workerId: number): QueueChildMessage | null {
Expand Down Expand Up @@ -114,7 +140,13 @@ export default class Farm {
task.request[1] = true;

this._lock(workerId);
this._callback(workerId, task.request, task.onStart, onEnd);
this._callback(
workerId,
task.request,
task.onStart,
onEnd,
task.onCustomMessage,
);

return this;
}
Expand Down
4 changes: 3 additions & 1 deletion packages/jest-worker/src/WorkerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import BaseWorkerPool from './base/BaseWorkerPool';

import type {
ChildMessage,
OnCustomMessage,
OnEnd,
OnStart,
WorkerInterface,
Expand All @@ -31,8 +32,9 @@ class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface {
request: ChildMessage,
onStart: OnStart,
onEnd: OnEnd,
onCustomMessage: OnCustomMessage,
): void {
this.getWorkerById(workerId).send(request, onStart, onEnd);
this.getWorkerById(workerId).send(request, onStart, onEnd, onCustomMessage);
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
Expand Down
50 changes: 50 additions & 0 deletions packages/jest-worker/src/__tests__/Farm.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ function workerReply(i, error, result) {
workerReplyEnd(i, error, result);
}

function workerReplyCustomMessage(i, message) {
mockWorkerCalls[i].onCustomMessage(message);
}

describe('Farm', () => {
beforeEach(() => {
mockWorkerCalls = [];
callback = jest.fn((...args) => {
mockWorkerCalls.push({
onCustomMessage: args[4],
onEnd: args[3],
onStart: args[2],
passed: args[1],
Expand All @@ -49,6 +54,7 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand All @@ -67,27 +73,32 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
1, // second worker
[1, true, 'foo1', [43]],
expect.any(Function),
expect.any(Function),

expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
3,
2, // third worker
[1, true, 'foo2', [44]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
4,
3, // fourth worker
[1, true, 'foo3', [45]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand All @@ -110,6 +121,7 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -146,20 +158,23 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
1, // second worker
[1, true, 'foo1', [43]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
3,
0, // first worker again
[1, true, 'foo2', [44]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -220,13 +235,15 @@ describe('Farm', () => {
[1, true, 'car', ['plane']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
0, // first worker
[1, true, 'foo', ['bar']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -261,13 +278,15 @@ describe('Farm', () => {
[1, true, 'car', ['plane']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
0, // first worker
[1, true, 'foo', ['bar']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -333,4 +352,35 @@ describe('Farm', () => {
await expect(p6).resolves.toBe('response-6');
await expect(p7).resolves.toBe('response-7');
});

it('can receive custom messages from workers', async () => {
expect.assertions(2);
const farm = new Farm(2, callback);

const p0 = farm.doWork('work-0');
const p1 = farm.doWork('work-1');

const unsubscribe = p0.UNSTABLE_onCustomMessage(message => {
expect(message).toEqual({key: 0, message: 'foo'});
});

p1.UNSTABLE_onCustomMessage(message => {
expect(message).toEqual({key: 1, message: 'bar'});
});

workerReplyStart(0);
workerReplyStart(1);
workerReplyCustomMessage(0, {key: 0, message: 'foo'});
workerReplyCustomMessage(1, {key: 1, message: 'bar'});

unsubscribe();
// This message will not received because the listener already
// unsubscribed.
workerReplyCustomMessage(0, {key: 0, message: 'baz'});

workerReply(0, null, 17);
workerReply(1, null, 17);
await p0;
await p1;
});
});
3 changes: 3 additions & 0 deletions packages/jest-worker/src/__tests__/WorkerPool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ describe('WorkerPool', () => {
{foo: 'bar'},
onStart,
onEnd,
undefined,
);
});

Expand Down Expand Up @@ -100,6 +101,7 @@ describe('WorkerPool', () => {
{foo: 'bar'},
onStart,
onEnd,
undefined,
);
});

Expand Down Expand Up @@ -128,6 +130,7 @@ describe('WorkerPool', () => {
{foo: 'bar'},
onStart,
onEnd,
undefined,
);
});
});
7 changes: 6 additions & 1 deletion packages/jest-worker/src/base/BaseWorkerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ export default class BaseWorkerPool {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
const workerExitPromises = this._workers.map(async worker => {
worker.send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod);
worker.send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
emptyMethod,
);

// Schedule a force exit in case worker fails to exit gracefully so
// await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY
Expand Down
5 changes: 5 additions & 0 deletions packages/jest-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import Farm from './Farm';
import type {
FarmOptions,
PoolExitResult,
PromiseWithCustomMessage,
WorkerPoolInterface,
WorkerPoolOptions,
} from './types';
import _messageParent from './workers/messageParent';

function getExposedMethods(
workerPath: string,
Expand Down Expand Up @@ -146,3 +148,6 @@ export default class JestWorker {
return this._workerPool.end();
}
}

export type {PromiseWithCustomMessage};
export const messageParent = _messageParent;
Loading