Skip to content

Commit

Permalink
cluster: send connection to other server when worker drop it
Browse files Browse the repository at this point in the history
PR-URL: #43747
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Paolo Insogna <paolo@cowtech.it>
  • Loading branch information
theanarkh authored and danielleadams committed Jul 26, 2022
1 parent 67b4edd commit 7b276b8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
9 changes: 8 additions & 1 deletion lib/internal/cluster/child.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,14 @@ function rr(message, { indexesKey, index }, cb) {
function onconnection(message, handle) {
const key = message.key;
const server = handles.get(key);
const accepted = server !== undefined;
let accepted = server !== undefined;

if (accepted && server[owner_symbol]) {
const self = server[owner_symbol];
if (self.maxConnections && self._connections >= self.maxConnections) {
accepted = false;
}
}

send({ ack: message.seq, accepted });

Expand Down
64 changes: 64 additions & 0 deletions test/parallel/test-cluster-net-server-drop-connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');
const cluster = require('cluster');
const tmpdir = require('../common/tmpdir');

// The core has bug in handling pipe handle by ipc when platform is win32,
// it can be triggered on win32. I will fix it in another pr.
if (common.isWindows)
common.skip('no setSimultaneousAccepts on pipe handle');

let connectionCount = 0;
let listenCount = 0;
let worker1;
let worker2;

function request(path) {
for (let i = 0; i < 10; i++) {
net.connect(path);
}
}

function handleMessage(message) {
assert.match(message.action, /listen|connection/);
if (message.action === 'listen') {
if (++listenCount === 2) {
request(common.PIPE);
}
} else if (message.action === 'connection') {
if (++connectionCount === 10) {
worker1.send({ action: 'disconnect' });
worker2.send({ action: 'disconnect' });
}
}
}

if (cluster.isPrimary) {
cluster.schedulingPolicy = cluster.SCHED_RR;
tmpdir.refresh();
worker1 = cluster.fork({ maxConnections: 1, pipePath: common.PIPE });
worker2 = cluster.fork({ maxConnections: 9, pipePath: common.PIPE });
worker1.on('message', common.mustCall((message) => {
handleMessage(message);
}, 2));
worker2.on('message', common.mustCall((message) => {
handleMessage(message);
}, 10));
} else {
const server = net.createServer(common.mustCall((socket) => {
process.send({ action: 'connection' });
}, +process.env.maxConnections));

server.listen(process.env.pipePath, common.mustCall(() => {
process.send({ action: 'listen' });
}));

server.maxConnections = +process.env.maxConnections;

process.on('message', common.mustCall((message) => {
assert.strictEqual(message.action, 'disconnect');
process.disconnect();
}));
}

0 comments on commit 7b276b8

Please sign in to comment.