diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bc2c8d7bb91..1d58d366357f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features +- `[jest-worker]` Added support for workers to send custom messages to parent in jest-worker ([#10293](https://github.com/facebook/jest/pull/10293)) - `[pretty-format]` Added support for serializing custom elements (web components) ([#10217](https://github.com/facebook/jest/pull/10237)) ### Fixes diff --git a/packages/jest-worker/src/Farm.ts b/packages/jest-worker/src/Farm.ts index 3e9196b1339d..e06df7d7c2fd 100644 --- a/packages/jest-worker/src/Farm.ts +++ b/packages/jest-worker/src/Farm.ts @@ -9,8 +9,10 @@ import { CHILD_MESSAGE_CALL, ChildMessage, FarmOptions, + OnCustomMessage, OnEnd, OnStart, + PromiseWithCustomMessage, QueueChildMessage, QueueItem, WorkerInterface, @@ -44,41 +46,64 @@ export default class Farm { } } - doWork(method: string, ...args: Array): Promise { - return new Promise((resolve, reject) => { - const computeWorkerKey = this._computeWorkerKey; - const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args]; + doWork( + method: string, + ...args: Array + ): PromiseWithCustomMessage { + const customMessageListeners = new Set(); - let worker: WorkerInterface | null = null; - let hash: string | null = null; + const addCustomMessageListener = (listener: OnCustomMessage) => { + customMessageListeners.add(listener); + return () => { + customMessageListeners.delete(listener); + }; + }; + + const onCustomMessage: OnCustomMessage = message => { + customMessageListeners.forEach(listener => listener(message)); + }; - if (computeWorkerKey) { - hash = computeWorkerKey.call(this, method, ...args); - worker = hash == null ? null : this._cacheKeys[hash]; - } + const promise: PromiseWithCustomMessage = new Promise( + (resolve, reject) => { + const computeWorkerKey = this._computeWorkerKey; + const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args]; - const onStart: OnStart = (worker: WorkerInterface) => { - if (hash != null) { - this._cacheKeys[hash] = worker; + let worker: WorkerInterface | null = null; + let hash: string | null = null; + + if (computeWorkerKey) { + hash = computeWorkerKey.call(this, method, ...args); + worker = hash == null ? null : this._cacheKeys[hash]; } - }; - const onEnd: OnEnd = (error: Error | null, result: unknown) => { - if (error) { - reject(error); + const onStart: OnStart = (worker: WorkerInterface) => { + if (hash != null) { + this._cacheKeys[hash] = worker; + } + }; + + const onEnd: OnEnd = (error: Error | null, result: unknown) => { + customMessageListeners.clear(); + if (error) { + reject(error); + } else { + resolve(result); + } + }; + + const task = {onCustomMessage, onEnd, onStart, request}; + + if (worker) { + this._enqueue(task, worker.getWorkerId()); } else { - resolve(result); + this._push(task); } - }; + }, + ); - const task = {onEnd, onStart, request}; + promise.UNSTABLE_onCustomMessage = addCustomMessageListener; - if (worker) { - this._enqueue(task, worker.getWorkerId()); - } else { - this._push(task); - } - }); + return promise; } private _getNextTask(workerId: number): QueueChildMessage | null { @@ -114,7 +139,13 @@ export default class Farm { task.request[1] = true; this._lock(workerId); - this._callback(workerId, task.request, task.onStart, onEnd); + this._callback( + workerId, + task.request, + task.onStart, + onEnd, + task.onCustomMessage, + ); return this; } diff --git a/packages/jest-worker/src/WorkerPool.ts b/packages/jest-worker/src/WorkerPool.ts index e68420d84a60..dc3039652cfe 100644 --- a/packages/jest-worker/src/WorkerPool.ts +++ b/packages/jest-worker/src/WorkerPool.ts @@ -9,6 +9,7 @@ import BaseWorkerPool from './base/BaseWorkerPool'; import type { ChildMessage, + OnCustomMessage, OnEnd, OnStart, WorkerInterface, @@ -31,8 +32,9 @@ class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface { request: ChildMessage, onStart: OnStart, onEnd: OnEnd, + onCustomMessage: OnCustomMessage, ): void { - this.getWorkerById(workerId).send(request, onStart, onEnd); + this.getWorkerById(workerId).send(request, onStart, onEnd, onCustomMessage); } createWorker(workerOptions: WorkerOptions): WorkerInterface { diff --git a/packages/jest-worker/src/__tests__/Farm.test.js b/packages/jest-worker/src/__tests__/Farm.test.js index e502b1a987ca..fcdaef7c54cd 100644 --- a/packages/jest-worker/src/__tests__/Farm.test.js +++ b/packages/jest-worker/src/__tests__/Farm.test.js @@ -25,11 +25,16 @@ function workerReply(i, error, result) { workerReplyEnd(i, error, result); } +function workerReplyCustomMessage(i, message) { + mockWorkerCalls[i].onCustomMessage(message); +} + describe('Farm', () => { beforeEach(() => { mockWorkerCalls = []; callback = jest.fn((...args) => { mockWorkerCalls.push({ + onCustomMessage: args[4], onEnd: args[3], onStart: args[2], passed: args[1], @@ -49,6 +54,7 @@ describe('Farm', () => { [1, true, 'foo', [42]], expect.any(Function), expect.any(Function), + expect.any(Function), ); }); @@ -67,6 +73,7 @@ describe('Farm', () => { [1, true, 'foo', [42]], expect.any(Function), expect.any(Function), + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 2, @@ -74,6 +81,8 @@ describe('Farm', () => { [1, true, 'foo1', [43]], expect.any(Function), expect.any(Function), + + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 3, @@ -81,6 +90,7 @@ describe('Farm', () => { [1, true, 'foo2', [44]], expect.any(Function), expect.any(Function), + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 4, @@ -88,6 +98,7 @@ describe('Farm', () => { [1, true, 'foo3', [45]], expect.any(Function), expect.any(Function), + expect.any(Function), ); }); @@ -110,6 +121,7 @@ describe('Farm', () => { [1, true, 'foo', [42]], expect.any(Function), expect.any(Function), + expect.any(Function), ); }); @@ -146,6 +158,7 @@ describe('Farm', () => { [1, true, 'foo', [42]], expect.any(Function), expect.any(Function), + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 2, @@ -153,6 +166,7 @@ describe('Farm', () => { [1, true, 'foo1', [43]], expect.any(Function), expect.any(Function), + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 3, @@ -160,6 +174,7 @@ describe('Farm', () => { [1, true, 'foo2', [44]], expect.any(Function), expect.any(Function), + expect.any(Function), ); }); @@ -220,6 +235,7 @@ describe('Farm', () => { [1, true, 'car', ['plane']], expect.any(Function), expect.any(Function), + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 2, @@ -227,6 +243,7 @@ describe('Farm', () => { [1, true, 'foo', ['bar']], expect.any(Function), expect.any(Function), + expect.any(Function), ); }); @@ -261,6 +278,7 @@ describe('Farm', () => { [1, true, 'car', ['plane']], expect.any(Function), expect.any(Function), + expect.any(Function), ); expect(callback).toHaveBeenNthCalledWith( 2, @@ -268,6 +286,7 @@ describe('Farm', () => { [1, true, 'foo', ['bar']], expect.any(Function), expect.any(Function), + expect.any(Function), ); }); @@ -333,4 +352,35 @@ describe('Farm', () => { await expect(p6).resolves.toBe('response-6'); await expect(p7).resolves.toBe('response-7'); }); + + it('can receive custom messages from workers', async () => { + expect.assertions(2); + const farm = new Farm(2, callback); + + const p0 = farm.doWork('work-0'); + const p1 = farm.doWork('work-1'); + + const unsubscribe = p0.UNSTABLE_onCustomMessage(message => { + expect(message).toEqual({key: 0, message: 'foo'}); + }); + + p1.UNSTABLE_onCustomMessage(message => { + expect(message).toEqual({key: 1, message: 'bar'}); + }); + + workerReplyStart(0); + workerReplyStart(1); + workerReplyCustomMessage(0, {key: 0, message: 'foo'}); + workerReplyCustomMessage(1, {key: 1, message: 'bar'}); + + unsubscribe(); + // This message will not received because the listener already + // unsubscribed. + workerReplyCustomMessage(0, {key: 0, message: 'baz'}); + + workerReply(0, null, 17); + workerReply(1, null, 17); + await p0; + await p1; + }); }); diff --git a/packages/jest-worker/src/__tests__/WorkerPool.test.js b/packages/jest-worker/src/__tests__/WorkerPool.test.js index a5bafa91f0f0..7c2cbcedbfa1 100644 --- a/packages/jest-worker/src/__tests__/WorkerPool.test.js +++ b/packages/jest-worker/src/__tests__/WorkerPool.test.js @@ -71,6 +71,7 @@ describe('WorkerPool', () => { {foo: 'bar'}, onStart, onEnd, + undefined, ); }); @@ -100,6 +101,7 @@ describe('WorkerPool', () => { {foo: 'bar'}, onStart, onEnd, + undefined, ); }); @@ -128,6 +130,7 @@ describe('WorkerPool', () => { {foo: 'bar'}, onStart, onEnd, + undefined, ); }); }); diff --git a/packages/jest-worker/src/base/BaseWorkerPool.ts b/packages/jest-worker/src/base/BaseWorkerPool.ts index 5eec43c0214c..9538a603cf8b 100644 --- a/packages/jest-worker/src/base/BaseWorkerPool.ts +++ b/packages/jest-worker/src/base/BaseWorkerPool.ts @@ -94,7 +94,12 @@ export default class BaseWorkerPool { // We do not cache the request object here. If so, it would only be only // processed by one of the workers, and we want them all to close. const workerExitPromises = this._workers.map(async worker => { - worker.send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod); + worker.send( + [CHILD_MESSAGE_END, false], + emptyMethod, + emptyMethod, + emptyMethod, + ); // Schedule a force exit in case worker fails to exit gracefully so // await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY diff --git a/packages/jest-worker/src/index.ts b/packages/jest-worker/src/index.ts index 5858f6c7f90e..7bc79e1c736a 100644 --- a/packages/jest-worker/src/index.ts +++ b/packages/jest-worker/src/index.ts @@ -11,9 +11,11 @@ import Farm from './Farm'; import type { FarmOptions, PoolExitResult, + PromiseWithCustomMessage, WorkerPoolInterface, WorkerPoolOptions, } from './types'; +export {default as messageParent} from './workers/messageParent'; function getExposedMethods( workerPath: string, @@ -146,3 +148,5 @@ export default class JestWorker { return this._workerPool.end(); } } + +export type {PromiseWithCustomMessage}; diff --git a/packages/jest-worker/src/types.ts b/packages/jest-worker/src/types.ts index 7f390d81d140..08e992f1172b 100644 --- a/packages/jest-worker/src/types.ts +++ b/packages/jest-worker/src/types.ts @@ -19,6 +19,7 @@ export const CHILD_MESSAGE_END: 2 = 2; export const PARENT_MESSAGE_OK: 0 = 0; export const PARENT_MESSAGE_CLIENT_ERROR: 1 = 1; export const PARENT_MESSAGE_SETUP_ERROR: 2 = 2; +export const PARENT_MESSAGE_CUSTOM: 3 = 3; export type PARENT_MESSAGE_ERROR = | typeof PARENT_MESSAGE_CLIENT_ERROR @@ -34,6 +35,7 @@ export interface WorkerPoolInterface { request: ChildMessage, onStart: OnStart, onEnd: OnEnd, + onCustomMessage: OnCustomMessage, ): void; end(): Promise; } @@ -43,6 +45,7 @@ export interface WorkerInterface { request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd, + onCustomMessage: OnCustomMessage, ): void; waitForExit(): Promise; forceExit(): void; @@ -56,6 +59,10 @@ export type PoolExitResult = { forceExited: boolean; }; +export interface PromiseWithCustomMessage extends Promise { + UNSTABLE_onCustomMessage?: (listener: OnCustomMessage) => () => void; +} + // Option objects. export type {ForkOptions}; @@ -128,6 +135,11 @@ export type ChildMessage = // Messages passed from the children to the parent. +export type ParentMessageCustom = [ + typeof PARENT_MESSAGE_CUSTOM, // type + unknown, // result +]; + export type ParentMessageOk = [ typeof PARENT_MESSAGE_OK, // type unknown, // result @@ -141,17 +153,22 @@ export type ParentMessageError = [ unknown, // extra ]; -export type ParentMessage = ParentMessageOk | ParentMessageError; +export type ParentMessage = + | ParentMessageOk + | ParentMessageError + | ParentMessageCustom; // Queue types. export type OnStart = (worker: WorkerInterface) => void; export type OnEnd = (err: Error | null, result: unknown) => void; +export type OnCustomMessage = (message: unknown) => void; export type QueueChildMessage = { request: ChildMessage; onStart: OnStart; onEnd: OnEnd; + onCustomMessage: OnCustomMessage; }; export type QueueItem = { diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index 375b2788b3e8..4d1822979cb3 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -13,9 +13,11 @@ import {stdout as stdoutSupportsColor} from 'supports-color'; import { CHILD_MESSAGE_INITIALIZE, ChildMessage, + OnCustomMessage, OnEnd, OnStart, PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_CUSTOM, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, ParentMessage, @@ -55,6 +57,7 @@ export default class ChildProcessWorker implements WorkerInterface { private _request: ChildMessage | null; private _retries!: number; private _onProcessEnd!: OnEnd; + private _onCustomMessage!: OnCustomMessage; private _fakeStream: PassThrough | null; private _stdout: ReturnType | null; @@ -155,7 +158,8 @@ export default class ChildProcessWorker implements WorkerInterface { } private _onMessage(response: ParentMessage) { - let error; + // TODO: Add appropriate type check + let error: any; switch (response[0]) { case PARENT_MESSAGE_OK: @@ -176,7 +180,6 @@ export default class ChildProcessWorker implements WorkerInterface { error.stack = response[3]; for (const key in extra) { - // @ts-expect-error: adding custom properties to errors. error[key] = extra[key]; } } @@ -187,13 +190,14 @@ export default class ChildProcessWorker implements WorkerInterface { case PARENT_MESSAGE_SETUP_ERROR: error = new Error('Error when calling setup: ' + response[2]); - // @ts-expect-error: adding custom properties to errors. error.type = response[1]; error.stack = response[3]; this._onProcessEnd(error, null); break; - + case PARENT_MESSAGE_CUSTOM: + this._onCustomMessage(response[1]); + break; default: throw new TypeError('Unexpected response from worker: ' + response[0]); } @@ -219,6 +223,7 @@ export default class ChildProcessWorker implements WorkerInterface { request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd, + onCustomMessage: OnCustomMessage, ): void { onProcessStart(this); this._onProcessEnd = (...args) => { @@ -228,6 +233,8 @@ export default class ChildProcessWorker implements WorkerInterface { return onProcessEnd(...args); }; + this._onCustomMessage = (...arg) => onCustomMessage(...arg); + this._request = request; this._retries = 0; this._child.send(request); diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 49f98be9d696..57a71ef1ec39 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -13,9 +13,11 @@ import mergeStream = require('merge-stream'); import { CHILD_MESSAGE_INITIALIZE, ChildMessage, + OnCustomMessage, OnEnd, OnStart, PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_CUSTOM, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, ParentMessage, @@ -30,6 +32,7 @@ export default class ExperimentalWorker implements WorkerInterface { private _request: ChildMessage | null; private _retries!: number; private _onProcessEnd!: OnEnd; + private _onCustomMessage!: OnCustomMessage; private _fakeStream: PassThrough | null; private _stdout: ReturnType | null; @@ -170,6 +173,9 @@ export default class ExperimentalWorker implements WorkerInterface { this._onProcessEnd(error, null); break; + case PARENT_MESSAGE_CUSTOM: + this._onCustomMessage(response[1]); + break; default: throw new TypeError('Unexpected response from worker: ' + response[0]); } @@ -200,6 +206,7 @@ export default class ExperimentalWorker implements WorkerInterface { request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd, + onCustomMessage: OnCustomMessage, ): void { onProcessStart(this); this._onProcessEnd = (...args) => { @@ -209,6 +216,8 @@ export default class ExperimentalWorker implements WorkerInterface { return onProcessEnd(...args); }; + this._onCustomMessage = (...arg) => onCustomMessage(...arg); + this._request = request; this._retries = 0; diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index 98cd27a6796a..97a427309991 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -15,6 +15,7 @@ import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_INITIALIZE, PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_CUSTOM, PARENT_MESSAGE_OK, } from '../../types'; @@ -225,6 +226,43 @@ it('calls the onProcessStart method synchronously if the queue is empty', () => expect(onProcessEnd).toHaveBeenCalledTimes(1); }); +it('can send multiple messages to parent', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + const onCustomMessage = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, false, 'foo', []], + onProcessStart, + onProcessEnd, + onCustomMessage, + ); + + // Only onProcessStart has been called + expect(onProcessStart).toHaveBeenCalledTimes(1); + expect(onProcessEnd).not.toHaveBeenCalled(); + expect(onCustomMessage).not.toHaveBeenCalled(); + + // then first call replies... + forkInterface.emit('message', [ + PARENT_MESSAGE_CUSTOM, + {message: 'foo bar', otherKey: 1}, + ]); + + expect(onProcessEnd).not.toHaveBeenCalled(); + expect(onCustomMessage).toHaveBeenCalledTimes(1); + expect(onCustomMessage).toHaveBeenCalledWith({ + message: 'foo bar', + otherKey: 1, + }); +}); + it('creates error instances for known errors', () => { const worker = new Worker({ forkOptions: {}, diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index 509f22df5b24..c6dab4e0779d 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -14,6 +14,7 @@ import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_INITIALIZE, PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_CUSTOM, PARENT_MESSAGE_OK, } from '../../types'; @@ -233,6 +234,43 @@ it('calls the onProcessStart method synchronously if the queue is empty', () => expect(onProcessEnd).toHaveBeenCalledTimes(1); }); +it('can send multiple messages to parent', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + const onCustomMessage = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, false, 'foo', []], + onProcessStart, + onProcessEnd, + onCustomMessage, + ); + + // Only onProcessStart has been called + expect(onProcessStart).toHaveBeenCalledTimes(1); + expect(onProcessEnd).not.toHaveBeenCalled(); + expect(onCustomMessage).not.toHaveBeenCalled(); + + // then first call replies... + worker._worker.emit('message', [ + PARENT_MESSAGE_CUSTOM, + {message: 'foo bar', otherKey: 1}, + ]); + + expect(onProcessEnd).not.toHaveBeenCalled(); + expect(onCustomMessage).toHaveBeenCalledTimes(1); + expect(onCustomMessage).toHaveBeenCalledWith({ + message: 'foo bar', + otherKey: 1, + }); +}); + it('creates error instances for known errors', () => { const worker = new Worker({ forkOptions: {}, diff --git a/packages/jest-worker/src/workers/messageParent.ts b/packages/jest-worker/src/workers/messageParent.ts new file mode 100644 index 000000000000..f9281bea4b0b --- /dev/null +++ b/packages/jest-worker/src/workers/messageParent.ts @@ -0,0 +1,37 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import {PARENT_MESSAGE_CUSTOM} from '../types'; + +const isWorkerThread = () => { + try { + // `Require` here to support Node v10 + const {isMainThread, parentPort} = require('worker_threads'); + return !isMainThread && parentPort; + } catch { + return false; + } +}; + +const messageParent = ( + message: unknown, + parentProcess: NodeJS.Process = process, +): void => { + try { + if (isWorkerThread()) { + // `Require` here to support Node v10 + const {parentPort} = require('worker_threads'); + parentPort.postMessage([PARENT_MESSAGE_CUSTOM, message]); + } else if (typeof parentProcess.send === 'function') { + parentProcess.send([PARENT_MESSAGE_CUSTOM, message]); + } + } catch (error) { + throw new Error('"messageParent" can only be used inside a worker'); + } +}; + +export default messageParent;