Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: refactored cluster schedule_handlers #32485

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 1 addition & 6 deletions lib/internal/cluster/master.js
Expand Up @@ -297,12 +297,7 @@ function queryServer(worker, message) {
constructor = SharedHandle;
}

handle = new constructor(key,
address,
message.port,
message.addressType,
message.fd,
message.flags);
handle = new constructor(key, address, message);
handles.set(key, handle);
}

Expand Down
191 changes: 90 additions & 101 deletions lib/internal/cluster/round_robin_handle.js
Expand Up @@ -10,115 +10,104 @@ const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const { constants } = internalBinding('tcp_wrap');

module.exports = RoundRobinHandle;

function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);

if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
class RoundRobinHandle {
constructor(key, address, { port, fd, flags }) {
this.key = key;
this.all = new Map();
this.free = new Map();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to keep this as an array since an array is most likely designed better for the purposes of a queue than a Map is. The only real benefit of the Map is faster lookups, but we shouldn't really optimize that over faster/easier shift()/push() because workers are not likely to be added/removed as often as workers being selected for requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but shifting logic is at the time of distributing the load and that is a single operation that can be implemented using keys iterator on map is still an O(1) operation and getting the element and deleting that from the Hash is same on average. Reimplemented to delete the worker from free pool.

this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
this.attachListener(fd, port, address, flags); // UNIX socket path.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method is only used here, I think it's best to just keep the original code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't now it is giving it much more meaning to what the block is actually doing, and also if there are more case logic in some block, better to abstract it in some other place. This is done because in the future also there might be some more conditionals around that code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the logic gets that complicated then we can revisit it at that point. Separating out 10 lines of code does not seem worthwhile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that decreasing the vertical complexity gives the code a nice look and meaningfulness. I am still not clear about why moving it to previous implementation gives it verbosity and meaningfulness.

this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
} else
this.server.listen(address); // UNIX socket path.
}

this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
attachListener(fd, port, address, flags) {
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address);
}

RoundRobinHandle.prototype.add = function(worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
add(worker, send) {
assert(!this.all.has(worker.id));
this.all.set(worker.id, worker);
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null)
return done();
// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
send(err.errno, null);
});
}

const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
remove(worker) {
const existed = this.all.delete(worker.id);
if (!existed)
return false;
this.free.delete(worker.id);
if (this.all.size !== 0)
return false;
for (const handle of this.handles) {
handle.close();
}

this.handoff(worker); // In case there are connections pending.
};

if (this.server === null)
return done();

// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
send(err.errno, null);
});
};

RoundRobinHandle.prototype.remove = function(worker) {
const existed = this.all.delete(worker.id);

if (!existed)
return false;

const index = this.free.indexOf(worker);

if (index !== -1)
this.free.splice(index, 1);

if (this.all.size !== 0)
return false;

for (const handle of this.handles) {
handle.close();
this.handles = [];
this.handle.close();
this.handle = null;
return true;
}
this.handles = [];

this.handle.close();
this.handle = null;
return true;
};

RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();

if (worker)
this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function(worker) {
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
distribute(err, handle) {
this.handles.push(handle);
const workerId = this.free.keys().next().value;
if (workerId) {
const worker = this.free.get(workerId);
this.free.delete(workerId);
this.handoff(worker);
}
}

const handle = this.handles.shift();

if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
return;
handoff(worker) {
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
}
}

const message = { act: 'newconn', key: this.key };

sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.

this.handoff(worker);
});
};
module.exports = RoundRobinHandle;
81 changes: 42 additions & 39 deletions lib/internal/cluster/shared_handle.js
@@ -1,46 +1,49 @@
'use strict';
const { Map } = primordials;
const assert = require('internal/assert');
const dgram = require('internal/dgram');
const net = require('net');

module.exports = SharedHandle;

function SharedHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.workers = [];
this.handle = null;
this.errno = 0;

let rval;
if (addressType === 'udp4' || addressType === 'udp6')
rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
else
rval = net._createServerHandle(address, port, addressType, fd, flags);

if (typeof rval === 'number')
this.errno = rval;
else
this.handle = rval;
class SharedHandle {
constructor(key, address, { port, addressType, fd, flags }) {
this.key = key;
this.workers = new Map();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a WeakSet (or SafeWeakSet) be a better fit than Map here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since in map we are relying on a key, that is in our case is id. On if deep-clone an object itself and pass it then also the lookup will not be evaluated to true. Ref:

> a = {b: 'some key'};
{ b: 'some key' }
> st = new WeakSet()
WeakSet { <items unknown> }
> st.add(a)
WeakSet { <items unknown> }
> st
WeakSet { <items unknown> }
> m = Object.assign({}, a)
{ b: 'some key' }
> st.has(m)
false
> st.has(a)
true
>

this.handle = null;
this.errno = 0;
const rval = this.determineRval(addressType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only used here, I don't think it's worth moving to a separate function. I think keeping the original conditionals here is best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implement was using let and I think if we can restructure the logic to const that solidifies the meaning of that variable. Let's say if one goes from top to bottom reading this file, then reaching this rvalue variable one can infer that it will be a fixed rval determined by the function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the use of let being a problem, especially given the size of the function and that the assignments happen immediately after the declaration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let send a wrong signal on the meaning of variable as it can be changed, but const signifies that the value will not be changed and thus can be believed to be the same throughout the scope of the class.

address,
port,
fd,
flags);
if (typeof rval === 'number')
this.errno = rval;
else
this.handle = rval;
}

determineRval(addressType, address, port, fd, flags) {
if (addressType === 'udp4' || addressType === 'udp6')
return dgram._createSocketHandle(address, port, addressType, fd, flags);
else
return net._createServerHandle(address, port, addressType, fd, flags);
}

add(worker, send) {
assert(!this.workers.has(worker.id));
this.workers.set(worker.id, worker);
send(this.errno, null, this.handle);
}

remove(worker) {
if (!this.workers.has(worker.id))
return false; // The worker wasn't sharing this handle.
this.workers.delete(worker.id);
if (this.workers.size !== 0)
return false;
this.handle.close();
this.handle = null;
return true;
}
}

SharedHandle.prototype.add = function(worker, send) {
assert(!this.workers.includes(worker));
this.workers.push(worker);
send(this.errno, null, this.handle);
};

SharedHandle.prototype.remove = function(worker) {
const index = this.workers.indexOf(worker);

if (index === -1)
return false; // The worker wasn't sharing this handle.

this.workers.splice(index, 1);

if (this.workers.length !== 0)
return false;

this.handle.close();
this.handle = null;
return true;
};
module.exports = SharedHandle;