Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: cluster: make scheduler configurable #11546

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions doc/api/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -759,12 +759,16 @@ changes:
- version: v6.4.0
pr-url: https://github.com/nodejs/node/pull/7838
description: The `stdio` option is supported now.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11546
description: The `scheduler` option is now supported.
-->

* `settings` {Object}
* `exec` {string} file path to worker file. (Default=`process.argv[1]`)
* `args` {Array} string arguments passed to worker.
(Default=`process.argv.slice(2)`)
* `scheduler` {Function} a synchronous function used to schedule connections.
* `silent` {boolean} whether or not to send output to parent's stdio.
(Default=`false`)
* `stdio` {Array} Configures the stdio of forked processes. When this option
Expand Down Expand Up @@ -859,6 +863,57 @@ socket.on('data', (id) => {
});
```

## Custom Scheduling
<!-- YAML
added: REPLACEME
-->

The cluster master's scheduling algorithm can be customized by passing a
function as the `scheduler` option to `cluster.setupMaster()`. The `scheduler`
is a synchronous function, whose signature is `scheduler(workers[, socket])`.

* `workers` {Array} an array of cluster workers that the request can be
distributed to.
* `socket` {net.Socket} a socket that can be used for connection based
scheduling. There is some overhead associated with creating `socket`, so it is
not provided by default. If the scheduling policy requires access to `socket`,
set `exposeSocket` to `true` on the `scheduler` function.

The scheduling function should return the worker that will handle the
connection. However, the `workers` array should under no condition be
mutated. If a cluster worker is not returned from the scheduler, the
connection will not be processed at that time. An example that implements round
robin scheduling is shown below.

```javascript
'use strict';
const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
function scheduler(workers, socket) {
const worker = workers.shift();

if (worker === undefined)
return;

workers.shift(worker); // Add to the end of the queue.

return worker;
}

scheduler.exposeSocket = true; // Expose the socket even though it is unused.
cluster.setupMaster({ scheduler });

cluster.fork();
cluster.fork();
} else {
http.createServer((req, res) => {
res.end(`hello from ${cluster.worker.id}\n`);
}).listen(8080);
}
```

[`child_process.fork()`]: child_process.html#child_process_child_process_fork_modulepath_args_options
[`ChildProcess.send()`]: child_process.html#child_process_child_send_message_sendhandle_options_callback
[`disconnect`]: child_process.html#child_process_child_disconnect
Expand Down
20 changes: 16 additions & 4 deletions lib/internal/cluster/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ cluster.SCHED_RR = SCHED_RR; // Master distributes connections.
var ids = 0;
var debugPortOffset = 1;
var initialized = false;
var scheduler = null;

// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
var schedulingPolicy = {
Expand Down Expand Up @@ -67,8 +68,18 @@ cluster.setupMaster = function(options) {

initialized = true;
schedulingPolicy = cluster.schedulingPolicy; // Freeze policy.
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);

if (settings.scheduler !== undefined) {
if (typeof settings.scheduler !== 'function') {
throw new TypeError('scheduler must be a function');
}

scheduler = settings.scheduler;
} else if (schedulingPolicy === SCHED_RR) {
scheduler = RoundRobinHandle.scheduler;
} else if (schedulingPolicy !== SCHED_NONE) {
assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
}

const hasDebugArg = process.execArgv.some((argv) => {
return /^(--debug|--debug-brk)(=\d+)?$/.test(argv);
Expand Down Expand Up @@ -283,7 +294,7 @@ function queryServer(worker, message) {
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (schedulingPolicy !== SCHED_RR ||
if (schedulingPolicy === SCHED_NONE ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
Expand All @@ -294,7 +305,8 @@ function queryServer(worker, message) {
message.port,
message.addressType,
message.fd,
message.flags);
message.flags,
scheduler);
}

if (!handle.data)
Expand Down
68 changes: 48 additions & 20 deletions lib/internal/cluster/round_robin_handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
const assert = require('assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const create = Object.create;
const getOwnPropertyNames = Object.getOwnPropertyNames;
const uv = process.binding('uv');

module.exports = RoundRobinHandle;

function RoundRobinHandle(key, address, port, addressType, fd) {
function RoundRobinHandle(key, address, port, addrType, fd, flags, scheduler) {
this.key = key;
this.all = {};
this.free = [];
this.all = create(null);
this.workers = [];
this.handles = [];
this.handle = null;
this.scheduler = scheduler;
this.server = net.createServer(assert.fail);

if (fd >= 0)
Expand All @@ -30,6 +32,17 @@ function RoundRobinHandle(key, address, port, addressType, fd) {
});
}

RoundRobinHandle.scheduler = function(workers) {
const worker = workers.shift();

if (worker === undefined)
return;

workers.push(worker); // Add to the back of the ready queue.
return worker;
};


RoundRobinHandle.prototype.add = function(worker, send) {
assert(worker.id in this.all === false);
this.all[worker.id] = worker;
Expand All @@ -44,7 +57,8 @@ RoundRobinHandle.prototype.add = function(worker, send) {
send(null, null, null); // UNIX socket.
}

this.handoff(worker); // In case there are connections pending.
this.workers.push(worker);
this.handoff(); // In case there are connections pending.
};

if (this.server === null)
Expand All @@ -66,10 +80,10 @@ RoundRobinHandle.prototype.remove = function(worker) {
return false;

delete this.all[worker.id];
const index = this.free.indexOf(worker);
const index = this.workers.indexOf(worker);

if (index !== -1)
this.free.splice(index, 1);
this.workers.splice(index, 1);

if (getOwnPropertyNames(this.all).length !== 0)
return false;
Expand All @@ -84,32 +98,46 @@ RoundRobinHandle.prototype.remove = function(worker) {

RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();

if (worker)
this.handoff(worker);
this.handoff();
};

RoundRobinHandle.prototype.handoff = function(worker) {
if (worker.id in this.all === false) {
return; // Worker is closing (or has closed) the server.
RoundRobinHandle.prototype.handoff = function() {
const handle = this.handles[0];
var socket;

// There are currently no requests to schedule.
if (handle === undefined)
return;

if (this.scheduler.exposeSocket === true) {
socket = new net.Socket({
handle,
readable: false,
writable: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readable = writable = false. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured that the socket should only be used to determine where to pass the handle, and not actually read or written.

pauseOnCreate: true
});
}

const handle = this.handles.shift();
const worker = this.scheduler(this.workers, socket);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this .slice() or should there be a note in the documentation that the callee is not allowed to modify the workers array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs currently say:

The scheduling function should return the worker that will handle the connection. However, the worker should not be removed from workers.

I think slicing would reduce the chance of shooting yourself in the foot, but it would make it more work to do something like round robin because we modify the array order in the scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but that could be interpreted as meaning that workers.sort() is okay. I'd add a stronger prohibition.


if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
// An invalid worker was returned, or the worker is closing the server.
if (worker === null || typeof worker !== 'object' ||
worker.id in this.all === false) {
return;
}

this.handles.shift(); // Successfully scheduled, so dequeue.

const message = { act: 'newconn', key: this.key };

sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
if (reply.accepted) {
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
} else {
// Worker is shutting down. Send the handle to another worker.
this.distribute(0, handle);
}

this.handoff(worker);
this.handoff();
});
};
82 changes: 82 additions & 0 deletions test/parallel/test-cluster-custom-scheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const cluster = require('cluster');
const http = require('http');
const net = require('net');

if (cluster.isMaster) {
const numWorkers = 2;
const pattern = [2, 1, 2, 2, 1, 2, 1, 1, 2];
let index = 0;
let readyCount = 0;

// The scheduler moves through pattern. Each request is scheduled to the
// worker id specified in the current pattern index.
function scheduler(workers, socket) {
const id = pattern[index];
const worker = workers.filter((w) => {
return w.id === id;
}).pop();

if (id === 2) {
assert.strictEqual(scheduler.exposeSocket, true);
assert(socket instanceof net.Socket);
} else {
assert.strictEqual(scheduler.exposeSocket, false);
assert.strictEqual(socket, undefined);
}

if (worker !== undefined)
index++;

return worker;
}

// Create a getter for exposeSocket. If the current item in the pattern is 2,
// then expose the socket. Otherwise, hide it.
Object.defineProperty(scheduler, 'exposeSocket', {
get() { return pattern[index] === 2; }
});

function onMessage(msg) {
// Once both workers send a 'ready' signal, indicating that their servers
// are listening, begin making HTTP requests.
assert.strictEqual(msg.cmd, 'ready');
readyCount++;

if (readyCount === numWorkers)
makeRequest(0, msg.port);
}

function makeRequest(reqCount, port) {
// Make one request for each element in pattern and then shut down the
// workers.
if (reqCount >= pattern.length) {
for (const id in cluster.workers)
cluster.workers[id].disconnect();

return;
}

http.get({ port }, (res) => {
res.on('data', (data) => {
assert.strictEqual(+data.toString(), pattern[reqCount]);
reqCount++;
makeRequest(reqCount, port);
});
});
}

cluster.setupMaster({ scheduler });

for (let i = 0; i < numWorkers; i++)
cluster.fork().on('message', common.mustCall(onMessage));

} else {
const server = http.createServer((req, res) => {
res.end(cluster.worker.id + '');
}).listen(0, () => {
process.send({ cmd: 'ready', port: server.address().port });
});
}