Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: t31135461 / Refine jest-worker API #6676

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Empty file added jest-worker
Empty file.
10 changes: 6 additions & 4 deletions packages/jest-resolve/src/is_builtin_module.js
Expand Up @@ -15,11 +15,13 @@ declare var process: {
binding(type: string): {},
};

const EXPERIMENTAL_MODULES = ['worker_threads'];

const BUILTIN_MODULES =
builtinModules ||
Object.keys(process.binding('natives')).filter(
(module: string) => !/^internal\//.test(module),
);
builtinModules.concat(EXPERIMENTAL_MODULES) ||
Object.keys(process.binding('natives'))
.filter((module: string) => !/^internal\//.test(module))
.concat([EXPERIMENTAL_MODULES]);

export default function isBuiltinModule(module: string): boolean {
return BUILTIN_MODULES.indexOf(module) !== -1;
Expand Down
27 changes: 27 additions & 0 deletions packages/jest-worker/src/WorkerPool.js
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
Copy link
Member

@SimenB SimenB Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

18? I have to admit I don't know what these should say

*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

'use strict';

import BaseWorkerPool from './base/BaseWorkerPool';

import type {ChildMessage, OnStart, OnEnd, WorkerPoolInterface} from './types';

class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface {
send(
workerId: number,
request: ChildMessage,
onStart: OnStart,
onEnd: OnEnd,
): void {
this.getWorkerById(workerId).send(request, onStart, onEnd);
}
}

export default WorkerPool;
131 changes: 131 additions & 0 deletions packages/jest-worker/src/WorkerQueueManager.js
@@ -0,0 +1,131 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

'use strict';

import type {
QueueChildMessage,
WorkerPoolInterface,
WorkerInterface,
} from './types';

export default class WorkerQueueManager {
_workerPool: WorkerPoolInterface;
_queue: Array<?QueueChildMessage>;
_last: Array<QueueChildMessage>;
_locks: Array<boolean>;
_offset: number;

constructor(workerPool: WorkerPoolInterface) {
this._workerPool = workerPool;
this._queue = [];
this._last = [];
this._locks = [];
this._offset = 0;

// If we exceeded the amount of retries, we will emulate an error reply
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part will be rewritten and removed from here. This a part of missing functionality I've mentioned in the first comment to the PR

// coming from the child. This avoids code duplication related with cleaning
// the queue, and scheduling the next call.

// if (this._retries > this._options.maxRetries) {
// const error = new Error('Call retries were exceeded');

// this.onMessage([
// PARENT_MESSAGE_ERROR,
// error.name,
// error.message,
// error.stack,
// {type: 'WorkerError'},
// ]);
// }
}

_process(workerId: number): WorkerQueueManager {
if (this.isLocked(workerId)) {
return this;
}

const job = this.getNextJob(workerId);

if (!job) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return this;
}

const onEnd = (error: ?Error, result: mixed, worker: WorkerInterface) => {
this.unlock(workerId);
job.onEnd(error, result, worker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we swap this with the previous line? Conceptually sounds better to keep the worker locked while finishing, just in case onEnd decides to go wild.

this._process(workerId);
};

this.lock(workerId);

this._workerPool.send(workerId, job.request, job.onStart, onEnd);

job.request[1] = true;

return this;
}

getNextJob(workerId: number): ?QueueChildMessage {
if (!this._queue[workerId]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache the entire queue in const queue = this._queue[workerId].

return null;
}

let job;

while (this._queue[workerId]) {
if (!this._queue[workerId].request[1]) {
job = this._queue[workerId];
break;
}
this._queue[workerId] = this._queue[workerId].next;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return job;
}

enqueue(task: QueueChildMessage, workerId?: number): WorkerQueueManager {
if (workerId != null) {
if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = task;
} else {
this._queue[workerId] = task;
}

this._last[workerId] = task;

this._process(workerId);
} else {
const numOfWorkers = this._workerPool.getWorkers().length;
for (let i = 0; i < numOfWorkers; i++) {
const workerIdx = (this._offset + i) % numOfWorkers;
this.enqueue(task, workerIdx);
}
this._offset++;
}

return this;
}

lock(workerId: number): void {
this._locks[workerId] = true;
}

unlock(workerId: number): void {
this._locks[workerId] = false;
}

isLocked(workerId: number): boolean {
return this._locks[workerId];
}
}
Expand Up @@ -38,7 +38,7 @@ beforeEach(() => {
return forkInterface;
});

Worker = require('../worker').default;
Worker = require('../workers/ChildProcessWorker').default;
});

afterEach(() => {
Expand All @@ -47,7 +47,7 @@ afterEach(() => {
});

it('passes fork options down to child_process.fork, adding the defaults', () => {
const child = require.resolve('../child');
const child = require.resolve('../workers/processChild');

process.execArgv = ['--inspect', '-p'];

Expand Down
8 changes: 4 additions & 4 deletions packages/jest-worker/src/__tests__/child.test.js
Expand Up @@ -27,7 +27,7 @@ beforeEach(() => {
mockCount = 0;

jest.mock(
'../my-fancy-worker',
'../workers/my-fancy-worker',
() => {
mockCount++;

Expand Down Expand Up @@ -74,15 +74,15 @@ beforeEach(() => {
);

jest.mock(
'../my-fancy-standalone-worker',
'../workers/my-fancy-standalone-worker',
() => jest.fn().mockImplementation(() => 12345),
{virtual: true},
);

// This mock emulates a transpiled Babel module that carries a default export
// that corresponds to a method.
jest.mock(
'../my-fancy-babel-worker',
'../workers/my-fancy-babel-worker',
() => ({
__esModule: true,
default: jest.fn().mockImplementation(() => 67890),
Expand All @@ -94,7 +94,7 @@ beforeEach(() => {
process.send = jest.fn();

// Require the child!
require('../child');
require('../workers/processChild');
});

afterEach(() => {
Expand Down
6 changes: 3 additions & 3 deletions packages/jest-worker/src/__tests__/index.test.js
Expand Up @@ -30,7 +30,7 @@ beforeEach(() => {
// The worker mock returns a worker with custom methods, plus it stores them
// in a global list, so that they can be accessed later. This list is reset in
// every test.
jest.mock('../worker', () => {
jest.mock('../workers/ChildProcessWorker', () => {
const fakeClass = jest.fn(() => {
const fakeWorker = {
getStderr: () => ({once() {}, pipe() {}}),
Expand Down Expand Up @@ -63,8 +63,8 @@ beforeEach(() => {
virtual: true,
});

Worker = require('../worker').default;
Farm = require('../index').default;
Worker = require('../workers/ChildProcessWorker').default;
Farm = require('..').default;
});

afterEach(() => {
Expand Down
115 changes: 115 additions & 0 deletions packages/jest-worker/src/base/BaseWorkerPool.js
@@ -0,0 +1,115 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

'use strict';

import mergeStream from 'merge-stream';
import os from 'os';
import path from 'path';

import ChildProcessWorker from '../workers/ChildProcessWorker';
import NodeThreadsWorker from '../workers/NodeThreadsWorker';
import {CHILD_MESSAGE_END} from '../types';

import type {Readable} from 'stream';
import type {FarmOptions, WorkerOptions, WorkerInterface} from '../types';

/* istanbul ignore next */
const emptyMethod = () => {};

export default class BaseWorkerPool {
_stderr: Readable;
_stdout: Readable;
_options: FarmOptions;
_workers: Array<WorkerInterface>;

constructor(workerPath: string, options: FarmOptions) {
this._options = options;

const numWorkers = options.numWorkers || os.cpus().length - 1;
this._workers = new Array(numWorkers);

if (!path.isAbsolute(workerPath)) {
workerPath = require.resolve(workerPath);
}

const stdout = mergeStream();
const stderr = mergeStream();

for (let i = 0; i < numWorkers; i++) {
const workerOptions: WorkerOptions = {
forkOptions: options.forkOptions || {},
maxRetries: options.maxRetries || 3,
useNodeWorkersIfPossible: options.useNodeWorkersIfPossible,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this and make it true always.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense as an option, doesn't it? Seems like an opt-in thing (beyond using a version of node where it's unflagged)

Copy link
Author

@Kureev Kureev Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no real reason why you'd like to use child_process instead of worker_threads when the second one is available. According to our benchmarks, implementation using workers works ~ twice as fast

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API stability, there might be behavioural differences. Upgrading my app from node 10 to 11 (or whenever they remove the flag) shouldn't change the underlying implementation IMO. The docs for the module says it uses multiple processes, and just silently not doing that, depending on node version, seems risky to me

FWIW I could get behind setting it to true by default so it's possible to opt-out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrading my app from node 10 to 11 (or whenever they remove the flag) shouldn't change the underlying implementation IMO

I think that underlying implementation is not something that should concern an average library user. Public interface stays completely the same (which is crucial for all library users), while "engine" is getting faster. Same happens with the language and other libraries like React (Fiber rewrite) etc.

The docs for the module says it uses multiple processes, and just silently not doing that, depending on node version, seems risky to me

I'm completely on the same page, but we're going to cover all that by specs so there should be no issues with unexpected regression bugs.

I think @mjesun can give you more insights.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's re-write the docs as "we'll choose the most-performant implementation that is possible". People can enforce one or another by explicitly passing the corresponding worker pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me!

workerId: i,
workerPath,
};

const worker = this.createWorker(workerOptions);
const workerStdout = worker.getStdout();
const workerStderr = worker.getStderr();

if (workerStdout) {
stdout.add(workerStdout);
}

if (workerStderr) {
stderr.add(workerStderr);
}

this._workers[i] = worker;

// $FlowFixMe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add explanation to all FlowFixMes.

You can get rid of this ones by redefining at the top the result, as a variable (class methods are covariant in Flow). For instance, putting at the top: getStderr: () => BaseWorkerPool; would do the trick.

Anyway, I'm not sure why do we need to bind them all.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't need it anymore. Will remove them accordingly

this.getStderr = this.getStderr.bind(this);
// $FlowFixMe
this.getStdout = this.getStdout.bind(this);
// $FlowFixMe
this.getWorkers = this.getWorkers.bind(this);
// $FlowFixMe
this.end = this.end.bind(this);
}

this._stdout = stdout;
this._stderr = stderr;
}

getStderr(): Readable {
return this._stderr;
}

getStdout(): Readable {
return this._stdout;
}

getWorkers(): Array<WorkerInterface> {
return this._workers;
}

getWorkerById(workerId: number): WorkerInterface {
return this._workers[workerId];
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
return workerOptions.useNodeWorkersIfPossible
? new NodeThreadsWorker(workerOptions)
: new ChildProcessWorker(workerOptions);
}

end(): void {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
for (let i = 0; i < this._workers.length; i++) {
this._workers[i].send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
);
}
}
}