Skip to content

Commit

Permalink
Stick calls to workers before processing them (#6073)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafeca authored and mjesun committed Apr 27, 2018
1 parent e58d3db commit 558416f
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -76,6 +76,8 @@

### Fixes

* `[jest-worker]` Stick calls to workers before processing them
([#6073](https://github.com/facebook/jest/pull/6073))
* `[babel-plugin-jest-hoist]` Allow using `console` global variable
([#6074](https://github.com/facebook/jest/pull/6074))
* `[jest-jasmine2]` Always remove node core message from assert stack traces
Expand Down
151 changes: 151 additions & 0 deletions packages/jest-worker/src/__tests__/index-integration.test.js
@@ -0,0 +1,151 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

'use strict';

import EventEmitter from 'events';

import {CHILD_MESSAGE_CALL, PARENT_MESSAGE_OK} from '../types';

let Farm;
let mockForkedProcesses;

function mockBuildForkedProcess() {
const mockChild = new EventEmitter();

mockChild.send = jest.fn();

return mockChild;
}

function replySuccess(i, result) {
mockForkedProcesses[i].emit('message', [PARENT_MESSAGE_OK, result]);
}

function assertCallsToChild(childNum, ...calls) {
expect(mockForkedProcesses[childNum].send).toHaveBeenCalledTimes(
calls.length + 1,
);

calls.forEach(([methodName, ...args], numCall) => {
expect(
mockForkedProcesses[childNum].send.mock.calls[numCall + 1][0],
).toEqual([CHILD_MESSAGE_CALL, true, methodName, args]);
});
}

beforeEach(() => {
mockForkedProcesses = [];

jest.mock('child_process', () => ({
fork() {
const forkedProcess = mockBuildForkedProcess();

mockForkedProcesses.push(forkedProcess);

return forkedProcess;
},
}));

Farm = require('../index').default;
});

afterEach(() => {
jest.resetModules();
});

it('calls a single method from the worker', async () => {
const farm = new Farm('/tmp/baz.js', {
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
});

const promise = farm.foo();

replySuccess(0, 42);

expect(await promise).toBe(42);
});

it('distributes sequential calls across child processes', async () => {
const farm = new Farm('/tmp/baz.js', {
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
});

// The first call will go to the first child process.
const promise0 = farm.foo('param-0');

assertCallsToChild(0, ['foo', 'param-0']);
replySuccess(0, 'worker-0');
expect(await promise0).toBe('worker-0');

// The second call will go to the second child process.
const promise1 = farm.foo(1);

assertCallsToChild(1, ['foo', 1]);
replySuccess(1, 'worker-1');
expect(await promise1).toBe('worker-1');
});

it('distributes concurrent calls across child processes', async () => {
const farm = new Farm('/tmp/baz.js', {
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
});

// Do 3 calls to the farm in parallel.
const promise0 = farm.foo('param-0');
const promise1 = farm.foo('param-1');
const promise2 = farm.foo('param-2');

// Check that the method calls are sent to each separate child process.
assertCallsToChild(0, ['foo', 'param-0']);
assertCallsToChild(1, ['foo', 'param-1']);
assertCallsToChild(2, ['foo', 'param-2']);

// Send different responses from each child.
replySuccess(0, 'worker-0');
replySuccess(1, 'worker-1');
replySuccess(2, 'worker-2');

// Check
expect(await promise0).toBe('worker-0');
expect(await promise1).toBe('worker-1');
expect(await promise2).toBe('worker-2');
});

it('sticks parallel calls to children', async () => {
const farm = new Farm('/tmp/baz.js', {
computeWorkerKey: () => '1234567890abcdef',
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
});

// Do 3 calls to the farm in parallel.
const promise0 = farm.foo('param-0');
const promise1 = farm.foo('param-1');
const promise2 = farm.foo('param-2');

// Send different responses for each call (from the same child).
replySuccess(0, 'worker-0');
replySuccess(0, 'worker-1');
replySuccess(0, 'worker-2');

// Check that all the calls have been received by the same child).
assertCallsToChild(
0,
['foo', 'param-0'],
['foo', 'param-1'],
['foo', 'param-2'],
);

// Check that responses are correct.
expect(await promise0).toBe('worker-0');
expect(await promise1).toBe('worker-1');
expect(await promise2).toBe('worker-2');
});
42 changes: 35 additions & 7 deletions packages/jest-worker/src/__tests__/index.test.js
Expand Up @@ -11,12 +11,17 @@ let Farm;
let Worker;
let mockWorkers;

function workerReplyStart(i) {
mockWorkers[i].send.mock.calls[0][1](mockWorkers[i]);
}

function workerReplyEnd(i, error, result) {
mockWorkers[i].send.mock.calls[0][2](error, result);
}

function workerReply(i, error, result) {
return mockWorkers[i].send.mock.calls[0][1].call(
mockWorkers[i],
error,
result,
);
workerReplyStart(i);
workerReplyEnd(i, error, result);
}

beforeEach(() => {
Expand Down Expand Up @@ -322,9 +327,8 @@ it('checks that once a sticked task finishes, next time is sent to that worker',
});

// Worker 1 successfully replies with "17" as a result.
const promise = farm.foo('car', 'plane');
farm.foo('car', 'plane');
workerReply(1, null, 17);
await promise;

// Note that the stickiness is not created by the method name or the arguments
// it is solely controlled by the provided "computeWorkerKey" method, which in
Expand All @@ -341,6 +345,30 @@ it('checks that once a sticked task finishes, next time is sent to that worker',
expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); // Only "foo".
});

it('checks that even before a sticked task finishes, next time is sent to that worker', async () => {
const farm = new Farm('/tmp/baz.js', {
computeWorkerKey: () => '1234567890abcdef',
exposedMethods: ['foo', 'bar'],
numWorkers: 3,
});

// Call "foo". Not that the worker is sending a start response synchronously.
farm.foo('car', 'plane');
workerReplyStart(1);

// Call "bar". Not that the worker is sending a start response synchronously.
farm.bar();
workerReplyStart(1);

// The first time, a call with a "1234567890abcdef" hash had never been done
// earlier ("foo" call), so it got queued to all workers. Later, since the one
// that resolved the call was the one in position 1, all subsequent calls are
// only redirected to that worker.
expect(mockWorkers[0].send).toHaveBeenCalledTimes(1); // Only "foo".
expect(mockWorkers[1].send).toHaveBeenCalledTimes(2); // "foo" + "bar".
expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); // Only "foo".
});

it('checks that once a non-sticked task finishes, next time is sent to all workers', async () => {
// Note there is no "computeWorkerKey".
const farm = new Farm('/tmp/baz.js', {
Expand Down

0 comments on commit 558416f

Please sign in to comment.