Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
child_process: do not keep list of sent sockets
Browse files Browse the repository at this point in the history
Keeping list of all sockets that were sent to child process causes memory
leak and thus unacceptable (see #4587). However `server.close()` should
still work properly.

This commit introduces two options:

* child.send(socket, { track: true }) - will send socket and track its status.
  You should use it when you want to receive `close` event on sent sockets.
* child.send(socket) - will send socket without tracking it status. This
  performs much better, because of smaller number of RTT between master and
  child.

With both of these options `server.close()` will wait for all sent
sockets to get closed.
  • Loading branch information
indutny committed Jan 17, 2013
1 parent 44cd121 commit 4488a69
Show file tree
Hide file tree
Showing 7 changed files with 463 additions and 99 deletions.
8 changes: 7 additions & 1 deletion doc/api/child_process.markdown
Expand Up @@ -124,10 +124,11 @@ process may not actually kill it. `kill` really just sends a signal to a proces

See `kill(2)`

### child.send(message, [sendHandle])
### child.send(message, [sendHandle], [options])

* `message` {Object}
* `sendHandle` {Handle object}
* `options` {Object}

When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
Expand Down Expand Up @@ -166,6 +167,11 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.

The `options` object may have the following properties:

* `track` - Notify master process when `sendHandle` will be closed in child
process. (`false` by default)

**send server object**

Here is an example of sending a server:
Expand Down
12 changes: 11 additions & 1 deletion doc/api/net.markdown
Expand Up @@ -229,12 +229,22 @@ with `child_process.fork()`.

### server.connections

This function is **deprecated**; please use [server.getConnections()][] instead.
The number of concurrent connections on the server.

This becomes `null` when sending a socket to a child with `child_process.fork()`.
This becomes `null` when sending a socket to a child with
`child_process.fork()`. To poll forks and get current number of active
connections use asynchronous `server.getConnections` instead.

`net.Server` is an [EventEmitter][] with the following events:

### server.getConnections(callback)

Asynchronously get the number of concurrent connections on the server. Works
when sockets were sent to forks.

Callback should take two arguments `err` and `count`.

### Event: 'listening'

Emitted when the server has been bound after calling `server.listen`.
Expand Down
191 changes: 137 additions & 54 deletions lib/child_process.js
Expand Up @@ -107,36 +107,43 @@ var handleConversion = {
},

'net.Socket': {
send: function(message, socket) {
// pause socket so no data is lost, will be resumed later

// if the socket wsa created by net.Server
send: function(message, socket, options) {
// if the socket was created by net.Server
if (socket.server) {
// the slave should keep track of the socket
message.key = socket.server._connectionKey;

var firstTime = !this._channel.sockets.send[message.key];

// add socket to connections list
var socketList = getSocketList('send', this, message.key);
socketList.add(socket);

// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from slaves
if (firstTime) {
socket.server._setupSlave(socketList);
if (options && options.track) {
// Keep track of socket's status
message.id = socketList.add(socket);
} else {
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from
// the slaves
if (firstTime) socket.server._setupSlave(socketList);

// Act like socket is detached
socket.server._connections--;
}
}

// remove handle from socket object, it will be closed when the socket
// has been send
// will be sent
var handle = socket._handle;
handle.onread = function() {};
socket._handle = null;

return handle;
},

postSend: function(handle) {
// Close the Socket handle after sending it
handle.close();
},

got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true;
Expand All @@ -146,7 +153,10 @@ var handleConversion = {

// add socket to connections list
var socketList = getSocketList('got', this, message.key);
socketList.add(socket);
socketList.add({
id: message.id,
socket: socket
});
}

emit(socket);
Expand All @@ -161,39 +171,98 @@ function SocketListSend(slave, key) {
var self = this;

this.key = key;
this.list = [];
this.slave = slave;

// These two arrays are used to store the list of sockets and the freelist of
// indexes in this list. After insertion, item will have persistent index
// until it's removed. This way we can use this index as an identifier for
// sockets.
this.list = [];
this.freelist = [];

slave.once('disconnect', function() {
self.flush();
});

this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
self.flush();
self.remove(msg.id);
});
}
util.inherits(SocketListSend, EventEmitter);

SocketListSend.prototype.add = function(socket) {
this.list.push(socket);
var index;

// Pick one of free indexes, or insert in the end of the list
if (this.freelist.length > 0) {
index = this.freelist.pop();
this.list[index] = socket;
} else {
index = this.list.push(socket) - 1;
}

return index;
};

SocketListSend.prototype.remove = function(index) {
var socket = this.list[index];
if (!socket) return;

// Create a hole in the list and move index to the freelist
this.list[index] = null;
this.freelist.push(index);

socket.destroy();
};

SocketListSend.prototype.flush = function() {
var list = this.list;
this.list = [];
this.freelist = [];

list.forEach(function(socket) {
socket.destroy();
if (socket) socket.destroy();
});
};

SocketListSend.prototype.update = function() {
if (this.slave.connected === false) return;
SocketListSend.prototype._request = function(msg, cmd, callback) {
var self = this;

if (!this.slave.connected) return onclose();
this.slave.send(msg);

function onclose() {
self.slave.removeListener('internalMessage', onreply);
callback(new Error('Slave closed before reply'));
};

function onreply(msg) {
if (!(msg.cmd === cmd && msg.key === self.key)) return;
self.slave.removeListener('disconnect', onclose);
self.slave.removeListener('internalMessage', onreply);

callback(null, msg);
};

this.slave.once('disconnect', onclose);
this.slave.on('internalMessage', onreply);
};

SocketListSend.prototype.close = function close(callback) {
this._request({
cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
key: this.key
}, 'NODE_SOCKET_ALL_CLOSED', callback);
};

this.slave.send({
cmd: 'NODE_SOCKET_FETCH',
SocketListSend.prototype.getConnections = function getConnections(callback) {
this._request({
cmd: 'NODE_SOCKET_GET_COUNT',
key: this.key
}, 'NODE_SOCKET_COUNT', function(err, msg) {
if (err) return callback(err);
callback(null, msg.count);
});
};

Expand All @@ -203,45 +272,59 @@ function SocketListReceive(slave, key) {

var self = this;

this.connections = 0;
this.key = key;
this.list = [];
this.slave = slave;

slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;

if (self.list.length === 0) {
self.flush();
return;
}
function onempty() {
if (!self.slave.connected) return;

self.on('itemRemoved', function removeMe() {
if (self.list.length !== 0) return;
self.removeListener('itemRemoved', removeMe);
self.flush();
self.slave.send({
cmd: 'NODE_SOCKET_ALL_CLOSED',
key: self.key
});
}

this.slave.on('internalMessage', function(msg) {
if (msg.key !== self.key) return;

if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
// Already empty
if (self.connections === 0) return onempty();

// Wait for sockets to get closed
self.once('empty', onempty);
} else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
if (!self.slave.connected) return;
self.slave.send({
cmd: 'NODE_SOCKET_COUNT',
key: self.key,
count: self.connections
});
}
});
}
util.inherits(SocketListReceive, EventEmitter);

SocketListReceive.prototype.flush = function() {
this.list = [];
SocketListReceive.prototype.add = function(obj) {
var self = this;

if (this.slave.connected) {
this.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: this.key
});
}
};
this.connections++;

SocketListReceive.prototype.add = function(socket) {
var self = this;
this.list.push(socket);
// Notify previous owner of socket about its state change
obj.socket.once('close', function() {
self.connections--;

socket.on('close', function() {
self.list.splice(self.list.indexOf(socket), 1);
self.emit('itemRemoved');
if (obj.id !== undefined && self.slave.connected) {
// Master wants to keep eye on socket status
self.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: self.key,
id: obj.id
});
}

if (self.connections === 0) self.emit('empty');
});
};

Expand Down Expand Up @@ -366,17 +449,16 @@ function setupChannel(target, channel) {
var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle);

// Close the Socket handle after sending it
if (message && message.type === 'net.Socket') {
handle.close();
}

if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
this.emit('error', er);
}

writeReq.oncomplete = nop;
if (obj && obj.postSend) {
writeReq.oncomplete = obj.postSend.bind(null, handle);
} else {
writeReq.oncomplete = nop;
}

/* If the master is > 2 read() calls behind, please stop sending. */
return channel.writeQueueSize < (65536 * 2);
Expand Down Expand Up @@ -656,6 +738,7 @@ function ChildProcess() {

this._closesNeeded = 1;
this._closesGot = 0;
this.connected = false;

this.signalCode = null;
this.exitCode = null;
Expand Down

2 comments on commit 4488a69

@xianbei
Copy link

@xianbei xianbei commented on 4488a69 Nov 8, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.send(message, [sendHandle], [options])
where is the [options] param?
i can't find it in lib/child_process.js
thx

@bnoordhuis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xianbei This commit never really made it into a stable branch, it got superseded by something else.

Please sign in to comment.