New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: t31135461 / Refine jest-worker API #6676
Changes from 7 commits
418c6f9
100c49f
f786cb6
2a1ac0e
59ebd73
9b1f19b
bea945f
3e9ce02
02dedef
5e8347b
e60bc1f
b7470d6
092d700
3947fcb
c715f00
4614e54
e0bfb83
9958587
29e04d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/** | ||
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved. | ||
* | ||
* This source code is licensed under the MIT license found in the | ||
* LICENSE file in the root directory of this source tree. | ||
* | ||
* @flow | ||
*/ | ||
|
||
'use strict'; | ||
|
||
import BaseWorkerPool from './base/BaseWorkerPool'; | ||
|
||
import type { | ||
ChildMessage, | ||
OnStart, | ||
OnEnd, | ||
WorkerInterface, | ||
WorkerPoolInterface, | ||
} from './types'; | ||
|
||
class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface { | ||
send( | ||
worker: WorkerInterface, | ||
request: ChildMessage, | ||
onStart: OnStart, | ||
onEnd: OnEnd, | ||
): void { | ||
worker.send(request, onStart, onEnd); | ||
} | ||
} | ||
|
||
export default WorkerPool; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/** | ||
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved. | ||
* | ||
* This source code is licensed under the MIT license found in the | ||
* LICENSE file in the root directory of this source tree. | ||
* | ||
* @flow | ||
*/ | ||
|
||
'use strict'; | ||
|
||
import type { | ||
QueueChildMessage, | ||
WorkerPoolInterface, | ||
WorkerInterface, | ||
} from './types'; | ||
|
||
export default class WorkerQueueManager { | ||
_workerPool: WorkerPoolInterface; | ||
_queue: Array<?QueueChildMessage>; | ||
_last: Array<QueueChildMessage>; | ||
_locks: Array<boolean>; | ||
_offset: number; | ||
|
||
constructor(workerPool: WorkerPoolInterface) { | ||
this._workerPool = workerPool; | ||
this._queue = []; | ||
this._last = []; | ||
this._locks = []; | ||
this._offset = 0; | ||
|
||
// If we exceeded the amount of retries, we will emulate an error reply | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part will be rewritten and removed from here. This a part of missing functionality I've mentioned in the first comment to the PR |
||
// coming from the child. This avoids code duplication related with cleaning | ||
// the queue, and scheduling the next call. | ||
|
||
// if (this._retries > this._options.maxRetries) { | ||
// const error = new Error('Call retries were exceeded'); | ||
|
||
// this.onMessage([ | ||
// PARENT_MESSAGE_ERROR, | ||
// error.name, | ||
// error.message, | ||
// error.stack, | ||
// {type: 'WorkerError'}, | ||
// ]); | ||
// } | ||
} | ||
|
||
enqueue(task: QueueChildMessage, workerId?: number): WorkerQueueManager { | ||
if (workerId != null) { | ||
if (this._queue[workerId]) { | ||
this._last[workerId].next = task; | ||
this._last[workerId] = task; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is common. |
||
} else { | ||
this._queue[workerId] = task; | ||
this._last[workerId] = task; | ||
} | ||
|
||
this.run(workerId); | ||
} else { | ||
const numOfWorkers = this._workerPool.getWorkers().length; | ||
for (let i = 0; i < numOfWorkers; i++) { | ||
const workerIdx = (this._offset + i) % numOfWorkers; | ||
this.enqueue(task, workerIdx); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Separate the initial part of the function into another one, to avoid recursivity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I mean is to move the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the design perspective I'd say Farm shouldn't be concerned how WorkerPool spread tasks among workers. It is fine that Farm can pin job to the worker if it has a reference to it, however it isn't cool if Farm contains queue distribution logic in it 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍🏻 Let's separate it into a different method as discussed offline. |
||
} | ||
this._offset++; | ||
} | ||
|
||
return this; | ||
} | ||
|
||
run(workerId: number): WorkerQueueManager { | ||
if (this.isLocked(workerId)) { | ||
return this; | ||
} | ||
|
||
const job = this._queue[workerId]; | ||
const worker = this._workerPool.getWorkers()[workerId]; | ||
|
||
if (!job) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convert |
||
return this; | ||
} | ||
|
||
if (job.owner) { | ||
this._queue[workerId] = job.next ? job.next : null; | ||
return this.run(workerId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove recursivity here and use a while loop to find the first job available. Don't lock by worker, lock by job. |
||
} | ||
|
||
if (!worker) { | ||
throw Error(`Worker with ID "${workerId}" is not found`); | ||
} | ||
|
||
const onEnd = (error: ?Error, result: mixed, worker: WorkerInterface) => { | ||
this.unlock(workerId); | ||
job.onEnd(error, result, worker); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we swap this with the previous line? Conceptually sounds better to keep the worker locked while finishing, just in case |
||
this.run(workerId); | ||
}; | ||
|
||
this.lock(workerId); | ||
|
||
this._workerPool.send(worker, job.request, job.onStart, onEnd); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's an example of API unification: let's use |
||
|
||
job.owner = worker; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convert |
||
|
||
return this; | ||
} | ||
|
||
lock(workerId: number): void { | ||
this._locks[workerId] = true; | ||
} | ||
|
||
unlock(workerId: number): void { | ||
this._locks[workerId] = false; | ||
} | ||
|
||
isLocked(workerId: number): boolean { | ||
return this._locks[workerId]; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/** | ||
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved. | ||
* | ||
* This source code is licensed under the MIT license found in the | ||
* LICENSE file in the root directory of this source tree. | ||
* | ||
* @flow | ||
*/ | ||
|
||
'use strict'; | ||
|
||
import mergeStream from 'merge-stream'; | ||
import os from 'os'; | ||
import path from 'path'; | ||
|
||
import ChildProcessWorker from '../workers/ChildProcessWorker'; | ||
import NodeThreadsWorker from '../workers/NodeThreadsWorker'; | ||
import {CHILD_MESSAGE_END} from '../types'; | ||
|
||
import type {Readable} from 'stream'; | ||
import type {FarmOptions, WorkerOptions, WorkerInterface} from '../types'; | ||
|
||
/* istanbul ignore next */ | ||
const emptyMethod = () => {}; | ||
|
||
export default class BaseWorkerPool { | ||
_stderr: Readable; | ||
_stdout: Readable; | ||
_options: FarmOptions; | ||
_workers: Array<WorkerInterface>; | ||
|
||
constructor(workerPath: string, options: FarmOptions) { | ||
this._options = options; | ||
|
||
const numWorkers = options.numWorkers || os.cpus().length - 1; | ||
this._workers = new Array(numWorkers); | ||
|
||
if (!path.isAbsolute(workerPath)) { | ||
workerPath = require.resolve(workerPath); | ||
} | ||
|
||
const stdout = mergeStream(); | ||
const stderr = mergeStream(); | ||
|
||
for (let i = 0; i < numWorkers; i++) { | ||
const workerOptions: WorkerOptions = { | ||
forkOptions: options.forkOptions || {}, | ||
maxRetries: options.maxRetries || 3, | ||
useNodeWorkersIfPossible: options.useNodeWorkersIfPossible, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove this and make it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense as an option, doesn't it? Seems like an opt-in thing (beyond using a version of node where it's unflagged) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no real reason why you'd like to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. API stability, there might be behavioural differences. Upgrading my app from node 10 to 11 (or whenever they remove the flag) shouldn't change the underlying implementation IMO. The docs for the module says it uses multiple processes, and just silently not doing that, depending on node version, seems risky to me FWIW I could get behind setting it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think that underlying implementation is not something that should concern an average library user. Public interface stays completely the same (which is crucial for all library users), while "engine" is getting faster. Same happens with the language and other libraries like React (Fiber rewrite) etc.
I'm completely on the same page, but we're going to cover all that by specs so there should be no issues with unexpected regression bugs. I think @mjesun can give you more insights. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's re-write the docs as "we'll choose the most-performant implementation that is possible". People can enforce one or another by explicitly passing the corresponding worker pool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me! |
||
workerId: i, | ||
workerPath, | ||
}; | ||
|
||
const worker = this.createWorker(workerOptions); | ||
const workerStdout = worker.getStdout(); | ||
const workerStderr = worker.getStderr(); | ||
|
||
if (workerStdout) { | ||
stdout.add(workerStdout); | ||
} | ||
|
||
if (workerStderr) { | ||
stderr.add(workerStderr); | ||
} | ||
|
||
this._workers[i] = worker; | ||
|
||
// $FlowFixMe | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add explanation to all You can get rid of this ones by redefining at the top the result, as a variable (class methods are covariant in Flow). For instance, putting at the top: Anyway, I'm not sure why do we need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually don't need it anymore. Will remove them accordingly |
||
this.getStderr = this.getStderr.bind(this); | ||
// $FlowFixMe | ||
this.getStdout = this.getStdout.bind(this); | ||
// $FlowFixMe | ||
this.getWorkers = this.getWorkers.bind(this); | ||
// $FlowFixMe | ||
this.end = this.end.bind(this); | ||
} | ||
|
||
this._stdout = stdout; | ||
this._stderr = stderr; | ||
} | ||
|
||
getStderr(): Readable { | ||
return this._stderr; | ||
} | ||
|
||
getStdout(): Readable { | ||
return this._stdout; | ||
} | ||
|
||
getWorkers(): Array<WorkerInterface> { | ||
return this._workers; | ||
} | ||
|
||
createWorker(workerOptions: WorkerOptions): WorkerInterface { | ||
return workerOptions.useNodeWorkersIfPossible | ||
? new NodeThreadsWorker(workerOptions) | ||
: new ChildProcessWorker(workerOptions); | ||
} | ||
|
||
end(): void { | ||
// We do not cache the request object here. If so, it would only be only | ||
// processed by one of the workers, and we want them all to close. | ||
for (let i = 0; i < this._workers.length; i++) { | ||
this._workers[i].send( | ||
[CHILD_MESSAGE_END, false], | ||
emptyMethod, | ||
emptyMethod, | ||
); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
18? I have to admit I don't know what these should say