Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

make it possible to locally configure cluster #3367

Open
wants to merge 8 commits into from

7 participants

@tomyan

The goal of the change is to make it possible to configure and use the cluster module without modifying global config. The changes leaves the current interface entirely in tact, but additionally allows configuration of a cluster with the following:

var cluster = require('cluster');

var master = new cluster.Master();
master.setupMaster({ exec: "some/file.js" });

Using the existing interface still works:

cluster.setupMaster({ exec: "some/other/file.js" });

..except that the global master instance has no effect on the other master, and vice versa. This is kinda similar to how you can configure a local http.Agent without affecting the global one.

The reason I wanted to propose this change was because I have a project that uses cluster to do some long running work. The project exposes a library interface and I would like users to be able to use that library interface without having side-effects other than the one they have initiated by using the library (i.e. does not modify global cluster config).

@AndreasMadsen

May I ask what the long running process do?

@tomyan

The project is an http server that uses cluster for two different purposes depending how it's invoked. The first is for hot (re)loading code, where a new cluster worker is created for each request (as well as subsequent requests within a short period). I'm aware that alternative approaches exist for this (i.e. watching for filesystem changes), but this works pretty well and is the way I prefer to do it. The second use is more typical, using cluster to make use of multiple CPUs.

The primary mechanism for running this server is as a standalone program (i.e. would not need a cluster module whose configuration has global side-effects), but it also provides a library interface. Part of the motivation for this is to be able to drive the functionality from unit tests (via a framework that runs each test within the same process), but I think it might be useful beyond this. Either way, I find avoiding global side-effects to make things testable is generally a good pattern.

@isaacs
Owner

No for 0.8. Maybe for 0.9.

Let's review this more closely in a few weeks.

@tomyan

Cool. Let me know if there's stuff I can do to improve the proposed change when the time comes.

Thanks

Tom

@hasanyasin

I hope I understood the problem correctly: You want to create Worker processes with different settings (a different executable or with different command line arguments) than the settings provided to setupMaster.

If I understood it correctly, an alternative solution would be allowing fork() calls to override cluster setup:

var cluster = require('cluster');
// Setting up cluster:
cluster.setupMaster({ exec: "some/file.js", args:"blah blah" }); // Global config.

// a worker with global settings:
worker = new cluster.Worker();

// Executable overridden, arguments kept:
custom_worker = new cluster.Worker({ exec:"some/other/file.js" });

// Both executable and arguments are overridden:
custom_worker_2 = new cluster.Worker({ exec:"some/another/file.js", args: "even more blah blah" });
@tomyan

Yeah, that would be fine if that resulted in a worker being forked. Currently I don't think you can create a forked worker like that, you need to use cluster.fork(), which only takes the env parameter.

Tom

@hasanyasin

I am developing a web server on Node and it has a master process and multiple child processes for each core. I personally really did not like using the same module for both kind of work and separating code blocks with is_master is_worker. So, I designed a master process which runs from a js file and child processes that run from another js.

To achieve this in v.0.6, I had to patch it. Half of what I did was in fact already documented; but not implemented in code. For other half, I did what I explained in my previous comment. I was going to fork and request a pull; but after seeing 1900+ forks of the project and seeing the cluster module already changed a lot at 0.7, I did not do that.

0.8 provides half of what wanted. I originally designed the system in a way that child processes are created with custom command-line arguments. Since 0.8 now provides the main need, having different commands for master and child, I did not want to patch it again; but instead, I did what I wanted to do with command line arguments via an initial message sent to child processes after they are created. Then I liked this even more than my original approach since I can have much complex object instead of basic strings as command line arguments.

I hope this can be a solution for your problem too.

@AndreasMadsen

I'm still not convinced this is a good idea.

@hasanyasin your API proposal raise most concern. I often get mails from there ask how they can have different workers, separated in different files, so they can pipe incoming requests depending on there IP or some signature. The problem is that is not possible with any type of cluster there use the OS to load balance, it distribute the TCP requests equally (almost) between each listener there attached to the main handler (fd). Having different workers is simply a bad design decision.

However if that is what you need you can stil do it. The procedure is to create you own load balancer in node by creating workers with child_process.fork() and then listen to incoming TCP in the master and distribute them using child.send(Socket) (yes, that is a new feature in node 0.8).

Furthermore, you are speaking about two half could you describe them more precisely, so we can understand the usecase. A personal request, if I may, please describe both what you want and how you did it, often I only get what someone did and it almost always shows that there could be some improvements or that it isn't the purpose of cluster.

@tomyan I'm not totally against your API proposal, however I seams very complicated to support both the current and the new API proposal. Could it not be simplified down to that you always have to call cluster.setupMaster and that will simply return a new Master instance. However @isaacs will have to be the judge of that.

Also note that cluster.Worker is not documented API use cluster.fork() and yes there are a different and it may become bigger.

I'm not sure why code reloading should be done with cluster, could you explain that? It seams to me that this could just as well, without any more complexity, be implemented with child_process.spawn().

In any case it sounds to me that your different implementations, will never run at the same time. One or development and one for production. If that is the case I don't see the need for precreating two different cluster configurations, when the different configurations simply can be invoked at runtime.

That in it self, dose of course doesn’t make the API bad, what really concerns me is the following:

Extendibility: the main purpose with the cluster rewrite was to make it more extendable by exposing the necessary information directly from cluster, without making an opinionated plugin API. For instance an "auto restart" module could be implemented with:

var cluster = require('cluster');
module.exports = function autorestart() {
  cluster.on('exit', function () {
   cluster.fork();
  });
};

Of course this could just as well be implemented with autorestart(cluster) there would fit your API, however allowing to create hole new instances of cluster objects could enforce a much worse extendibility environment:

var cluster = require('cluster');

module.exports = function autorestart(settings) {
  var master = cluster.setupMaster(settings);
  master.on('exit', function () {
    master.fork();
  });
  return master;
};

Since can can not be extended more using other modules there use the same structure, since that would just create a new master. The first extend design can this because that takes only a cluster object.

Master principal: I also often get mails about how to restart the master using the new cluster and the answer is you can not. That must be done on a lower level. However it is my too ideal principal never to create a master there is so complex that there is a high risk for bugs. Allowing one to use the same process for two different masters, makes the process code a lot more complex (if they actually create two masters), and it may introduce new bugs since any external module is cached and therefor can reuse some cache object there was only meant to work with one master. For instance it could assume that worker.id is unique. However if one creates two masters that it not true.

Of course this is all about protection userland and node.js doesn’t do that. But have been watching #node.js for a long time it get the impression is most certainly not well understood (it seams to begin turning for two month ago). Keeping that in mind I fell that this is not the time for increasing the complexity. However I'm not opposed to the API.

@hasanyasin

Hey Andreas, thank very much for all the details and explanation.

I am just misunderstood. :)

What I was trying to tell was I am really happy with the state in 0.8. What I wanted to separate was Master and Worker processes, not workers between each other. This had been already documented in 0.6. Below is from cluster.js in 0.6.19:

// options.workerFilename
// Specifies the script to execute for the child processes. Default is
// process.argv[1]

However the function implementation did not have this so I just added the argument and made it possible to have different sources for Master and Worker processes.

With 0.8, now we can design Master and Worker processes to have different sources. I do not mean having master.js, worker1.js, worker2.js, etc... What I mean is having just one master.js and one worker.js to have cleaner code instead of keeping both in one file and using if/else blocks to keep things separate.

In addition to this, I had also made it possible to call Worker processes with different arguments. This is not to create completely different child processes. It was just to keep some variables. Now I send a message to each worker instead of calling them with different arguments. It makes me even happier than using arguments for this since I already use messages between Master and Worker processes for many things and this makes things much more consistent.

In summary, I am very happy with the current state of the child process creation.

UPDATE: I now realized that the cause of misunderstanding was my first comment, not the previous one. I had tried to give an alternative implementation to tomyan's request. In my own use, I do not have different worker processes, just one.

@AndreasMadsen

@hasanyasin okay, I think I got it.

You are speaking about the confusion comment at https://github.com/joyent/node/blob/v0.6/lib/cluster.js#L58 you should know that is some leftover from the original original cluster there worked by calling node cluster worker.js. The cluster argument would then be swapped out by src/node.js so process.argv[1] was worker.js.

In addition to this, I had also made it possible to call Worker processes with different arguments.

Are you speaking about the API proposal or an actual implementation?

@hasanyasin

I had thought that it was documented as designed; but not yet implemented.

FYI, below is what I had done. I am not posting this as a suggestion or anything. I had just tried to solve my problem quickly this way and as you asked, I am writing.

60:
--- // options.workerFilename
+++ // options.path

71:
--- function startMaster() {
+++ function startMaster(options) {

77-78:
---    workerFilename = process.argv[1];
---    workerArgs = process.argv.slice(2);
+++  if (options) {
+++    workerFilename = options.path || process.argv[1];
+++    workerArgs = options.args || process.argv.slice(2);
+++  }
+++  else {
+++    workerFilename = process.argv[1];
+++    workerArgs = process.argv.slice(2);
+++  }

145:
--- cluster.fork = function() {
+++ cluster.fork = function(args) {

161:
---  var worker = fork(workerFilename, workerArgs, { env: envCopy });
+++  var worker = fork( workerFilename, (args || workerArgs), { env: envCopy });
@tomyan

Hi @AndreasMadsen

Thanks for taking the time to go through it. Comments inline...

@tomyan I'm not totally against your API proposal, however I seams very complicated to support both the current and the new API proposal. Could it not be simplified down to that you always have to call cluster.setupMaster and that will simply return a new Master instance. However @isaacs will have to be the judge of that.

I like the idea of having setupMaster return a new master - I guess this would mean that if you use cluster you get the defaults, and if you call setupMaster to get a differently configured master you don't have any side effects on other code that uses cluster. I'd be happy to work on a patch that achieves that.

Also note that cluster.Worker is not documented API use cluster.fork() and yes there are a different and it may become bigger.

I'm not sure why code reloading should be done with cluster, could you explain that? It seams to me that this could just as well, without any more complexity, be implemented with child_process.spawn().

I was having trouble doing it with a single listening socket between reloads. I wasn't able to bind to a socket without accepting connections in the parent process, which I think I would need - see #2289. I could do it by accepting the connections in the parent process and then passing the handles to the child process I guess, but it seems like cluster does almost exactly what I need. I'm also not too keen on doing per request handle passing between processes as I've tried this before in another server (and language) and it didn't perform as well as accepting in each child - this isn't too much of an issue for the code reloading use-case as that would be only used in development, but I'd like to avoid the complexity if possible.

In any case it sounds to me that your different implementations, will never run at the same time. One or development and one for production. If that is the case I don't see the need for precreating two different cluster configurations, when the different configurations simply can be invoked at runtime.

They run in the same process in my tests, or at least I would like them too. I'd also like it to work as a library without having to caveat its usage.

That in it self, dose of course doesn’t make the API bad, what really concerns me is the following:

Extendibility: the main purpose with the cluster rewrite was to make it more extendable by exposing the necessary information directly from cluster, without making an opinionated plugin API. For instance an "auto restart" module could be implemented with:

var cluster = require('cluster');
module.exports = function autorestart() {
cluster.on('exit', function () {
cluster.fork();
});
};
Of course this could just as well be implemented with autorestart(cluster) there would fit your API, however allowing to create hole new instances of cluster objects could enforce a much worse extendibility environment:

var cluster = require('cluster');

module.exports = function autorestart(settings) {
var master = cluster.setupMaster(settings);
master.on('exit', function () {
master.fork();
});
return master;
};
Since can can not be extended more using other modules there use the same structure, since that would just create a new master. The first extend design can this because that takes only a cluster object.

I think the auto-restart feature would be one I'd love to see in cluster, but I think that might be ignoring your point :-). On your direct point (about extendibility), I think that extensions are better where they don't have a global (side-)effect. I think this kind of pattern can lead to action at a distance, and hence non-obvious behaviour and difficult to track down bugs. I'm therefore, much more in favour of your second API (in most cases there will only be one master anyway).

Master principal: I also often get mails about how to restart the master using the new cluster and the answer is you can not. That must be done on a lower level. However it is my too ideal principal never to create a master there is so complex that there is a high risk for bugs. Allowing one to use the same process for two different masters, makes the process code a lot more complex (if they actually create two masters), and it may introduce new bugs since any external module is cached and therefor can reuse some cache object there was only meant to work with one master. For instance it could assume that worker.id is unique. However if one creates two masters that it not true.

That's only an issue if the modules you use make use of global data. The modules I tend to use (and write) avoid using global mutable data as this makes the code more reusable within different parts of an application without those parts treading on each other's toes (I guess this is the principal I'm trying to push here).

Of course this is all about protection userland and node.js doesn’t do that. But have been watching #node.js for a long time it get the impression is most certainly not well understood (it seams to begin turning for two month ago). Keeping that in mind I fell that this is not the time for increasing the complexity. However I'm not opposed to the API.

What did you think of the structural change in my pull request? I found it much easier to implement the changes by splitting the code that was shared between parent and child, from the code for the child, and the code for the parent. If you (and @isaacs) agree to the API proposal, it would be good to know that the approach to adding the feature is okay too.

Anyway, thanks for the feedback and for the module, is really useful.

Tom

@AndreasMadsen

I like the idea of having setupMaster return a new master - I guess this would mean that if you use cluster you get the defaults, and if you call setupMaster to get a differently configured master you don't have any side effects on other code that uses cluster. I'd be happy to work on a patch that achieves that.

It means require('cluster') returns a setupMaster function and calling that will return a new Master instance. However the best decision is properly to let require('cluster') support the old API if possible as well but make that deprecated.

To make it totally clear:

  • calling require('cluster') as a function returns a new master instance
  • calling require('cluster').whatever use the old master instance. Note that if this do not support the old require('cluster').setupMaster, calling require('cluster').whatever should not be supported at all.

I was having trouble doing it with a single listening socket between reloads. I wasn't able to bind to a socket without accepting connections in the parent process, which I think I would need - see #2289. I could do it by accepting the connections in the parent process and then passing the handles to the child process I guess, but it seems like cluster does almost exactly what I need. I'm also not too keen on doing per request handle passing between processes as I've tried this before in another server (and language) and it didn't perform as well as accepting in each child - this isn't too much of an issue for the code reloading use-case as that would be only used in development, but I'd like to avoid the complexity if possible.

Ahh, I think I understand. The problem is that the port isn't cleared fast enough, but using cluster works perfectly since the server instance is shared, to the server is never closed just reused.

They run in the same process in my tests, or at least I would like them too. I'd also like it to work as a library without having to caveat its usage.

Well abstraction testcases between programs is quite easy, I don't see that as a problem. Also the only case where running both development and production tests is in your cluster abstraction module itself. If you need to do it in a module there use your cluster abstraction module as a dependency, then your cluster abstraction module has failed (or so I belive).

What goes for you second comment I don't understand it.

I think the auto-restart feature would be one I'd love to see in cluster, but I think that might be ignoring your point :-). On your direct point (about extendibility), I think that extensions are better where they don't have a global (side-)effect. I think this kind of pattern can lead to action at a distance, and hence non-obvious behaviour and difficult to track down bugs. I'm therefore, much more in favour of your second API (in most cases there will only be one master anyway).

Well if global is the only thing there can exist, there won't be any confusion. If both global and local can exist that will be an disaster, that is also why I'm in favour of not supporting the current API if the patch lands.

What did you think of the structural change in my pull request? I found it much easier to implement the changes by splitting the code that was shared between parent and child, from the code for the child, and the code for the parent. If you (and @isaacs) agree to the API proposal, it would be good to know that the approach to adding the feature is okay too.

I must say, when I discovered that the first line of real change would introduce a major bug I did not read the rest. Also I think we need to decide how/if this should be backward compatible before using to much time on the code. But don't worry I will read it careful when that is decided :)

@tomyan
@AndreasMadsen

Why not make this work as it does today

I'm cool with using cluster.createMaster, as long cluster.setupMaster don't have to purposes.
If that is done, i would expect something like this:

exports = module.exports = new Master();
exports.setupMaster();

exports.createMaster = function (settings) {
  var o = new Master();
  o.setupMaster(settings);
  return 0;
};

I don't have a separate set of development and production tests, I just
have a single test suite that exercises all of the functionality of my
module within a single invocation of node - or at least that's what I want.
The module exposes functionality that is useful in development
(hot-reloading of code) or in production, but I'd like both to be covered
by the tests.

A module of mine ( thintalk ) is an RPC abstraction there provide the same API over two different communication protocols. Since the API is the same, the expected result should also be the same.

As you see in the testcases I have sperate files for IPC and TCP but they both use the same testcase abstraction. In my case it would properly make sense to test both IPC and TCP transport-layer in the same node-process to ensure they go together. But in your case with cluster I still can't see why testing both development and production in the same node-process makes any sense.

What goes for you second comment I don't understand it.

I think that was my comment.

lib/cluster.js
@@ -33,85 +35,17 @@ var debug;
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function(x) {
var prefix = process.pid + ',' +
- (process.env.NODE_UNIQUE_ID ? 'Worker' : 'Master');
+ (process.env.NODE_CLUSTER_ID ? 'Worker' : 'Master');

This will introduce a bug, since child_process.fork() should be called with NODE_UNIQUE_ID in its env and it is used in src/node.js

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@tomyan
@AndreasMadsen

I was suggesting that we remove (i.e. deprecate) setupMaster altogether, so
that the only time you can alter settings is when creating a new master.
How does that sound?

You will have to ask @isaacs, but if cluster.setupMaster is removed then I don't see any point in maintaining backward compatibility.

@Nodejs-Jenkins

Can one of the admins verify this patch?

@Rush

What is the status of this change? How can we help? I would like to use this in a module which setups its workers but without polluting cluster's global status for any other uses. I am considering temporarily ripping cluster.js from node sources into my module so that the status is local but preferably I would fix node.js itself.

@rmg

Possibly relevant to the original motivation, as of #7671 cluster.setupMaster() can be called multiple times to reconfigure some aspects of the cluster's behaviour over time.

Being able to create a cluster.Master() instance looks like a useful step toward a less automagical and more explicit cluster API. Sounds like A Good Thing :tm:

Given the code freeze/slush, does this still belong in the v0.12 milestone?

@tomyan

rmg: I'd be happy to see this cleaned up and applied. Could look at it at some point, but probably not for a week or two.

Thanks

Tom

@7nights 7nights referenced this pull request in doxout/recluster
Open

Multiple calls support #28

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 18, 2012
  1. @tomyan

    minor tweaks

    tomyan authored
Commits on Apr 1, 2012
  1. @tomyan

    add encapsulated master

    tomyan authored
  2. @tomyan

    add encapsulated master

    tomyan authored
Commits on Apr 2, 2012
  1. @tomyan

    Merged upstream changes.

    tomyan authored
Commits on Jun 3, 2012
  1. @tomyan

    merged from upstream

    tomyan authored
  2. @tomyan

    Fixed method calls in merge.

    tomyan authored
Commits on Jul 9, 2012
  1. @tomyan

    fix missed merge item

    tomyan authored
Commits on Jul 12, 2012
  1. @tomyan

    merged upstream changes

    tomyan authored
This page is out of date. Refresh to see the latest.
View
11 doc/api/cluster.markdown
@@ -123,7 +123,7 @@ If the `process.env.NODE_UNIQUE_ID` is set to a value, then
* `worker` {Worker object}
When a new worker is forked the cluster module will emit a 'fork' event.
-This can be used to log worker activity, and create you own timeout.
+This can be used to log worker activity, or to create your own timeout.
var timeouts = [];
function errorMsg() {
@@ -161,7 +161,7 @@ being executed.
* `address` {Object}
When calling `listen()` from a worker, a 'listening' event is automatically assigned
-to the server instance. When the server is listening a message is send to the master
+to the server instance. When the server is listening a message is sent to the master
where the 'listening' event is emitted.
The event handler is executed with two arguments, the `worker` contains the worker
@@ -268,7 +268,7 @@ The method takes an optional callback argument which will be called when finishe
* {Object}
-In the cluster all living worker objects are stored in this object by there
+In the cluster, all living worker objects are stored in this object by there
`id` as the key. This makes it easy to loop through all living workers.
// Go through all workers
@@ -357,7 +357,6 @@ and accidental exit.
// destroy worker
worker.destroy();
-
### worker.disconnect()
When calling this function the worker will no longer accept new connections, but
@@ -459,7 +458,7 @@ in the master process using the message system:
### Event: 'online'
-Same as the `cluster.on('online')` event, but emits only when the state change
+Same as the `cluster.on('online')` event, but emits only when the state changes
on the specified worker.
cluster.fork().on('online', function() {
@@ -470,7 +469,7 @@ on the specified worker.
* `address` {Object}
-Same as the `cluster.on('listening')` event, but emits only when the state change
+Same as the `cluster.on('listening')` event, but emits only when the state changes
on the specified worker.
cluster.fork().on('listening', function(address) {
View
747 lib/cluster.js
@@ -25,6 +25,8 @@ var net = require('net');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
+// utility
+
function isObject(o) {
return (typeof o === 'object' && o !== null);
}
@@ -40,81 +42,11 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function() { };
}
-// cluster object:
-function cluster() {
- EventEmitter.call(this);
-}
-util.inherits(cluster, EventEmitter);
-var cluster = module.exports = new cluster();
-
-// Used in the master:
-var masterStarted = false;
-var ids = 0;
-var serverHandlers = {};
-
-// Used in the worker:
-var serverListeners = {};
-var queryIds = 0;
-var queryCallbacks = {};
-
-// Define isWorker and isMaster
-cluster.isWorker = 'NODE_UNIQUE_ID' in process.env;
-cluster.isMaster = ! cluster.isWorker;
-
-// The worker object is only used in a worker
-cluster.worker = cluster.isWorker ? {} : null;
-// The workers array is only used in the master
-cluster.workers = cluster.isMaster ? {} : null;
-
-// Settings object
-var settings = cluster.settings = {};
-
-// Simple function to call a function on each worker
-function eachWorker(cb) {
- // Go through all workers
- for (var id in cluster.workers) {
- if (cluster.workers.hasOwnProperty(id)) {
- cb(cluster.workers[id]);
- }
- }
+function toDecInt(value) {
+ value = parseInt(value, 10);
+ return isNaN(value) ? null : value;
}
-// Extremely simple progress tracker
-function ProgressTracker(missing, callback) {
- this.missing = missing;
- this.callback = callback;
-}
-ProgressTracker.prototype.done = function() {
- this.missing -= 1;
- this.check();
-};
-ProgressTracker.prototype.check = function() {
- if (this.missing === 0) this.callback();
-};
-
-cluster.setupMaster = function(options) {
- // This can only be called from the master.
- assert(cluster.isMaster);
-
- // Don't allow this function to run more than once
- if (masterStarted) return;
- masterStarted = true;
-
- // Get filename and arguments
- options = options || {};
-
- // Set settings object
- settings = cluster.settings = {
- exec: options.exec || process.argv[1],
- execArgv: options.execArgv || process.execArgv,
- args: options.args || process.argv.slice(2),
- silent: options.silent || false
- };
-
- // emit setup event
- cluster.emit('setup');
-};
-
// Check if a message is internal only
var INTERNAL_PREFIX = 'NODE_CLUSTER_';
function isInternalMessage(message) {
@@ -133,6 +65,21 @@ function internalMessage(inMessage) {
return outMessage;
}
+// Extremely simple progress tracker
+function ProgressTracker(missing, callback) {
+ this.missing = missing;
+ this.callback = callback;
+}
+ProgressTracker.prototype.done = function() {
+ this.missing -= 1;
+ this.check();
+};
+ProgressTracker.prototype.check = function() {
+ if (this.missing === 0) this.callback();
+};
+
+// common to both master and workers
+
// Handle callback messages
function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) {
@@ -179,100 +126,207 @@ function handleMessage(worker, inMessage, inHandle) {
}
}
-// Messages to the master will be handled using these methods
-if (cluster.isMaster) {
-
- // Handle online messages from workers
- messageHandler.online = function(message, worker) {
- worker.state = 'online';
- debug('Worker ' + worker.process.pid + ' online');
- worker.emit('online');
- cluster.emit('online', worker);
- };
+// Send internal message
+function sendInternalMessage(worker, message/*, handler, callback*/) {
- // Handle queryServer messages from workers
- messageHandler.queryServer = function(message, worker, send) {
+ // Exist callback
+ var callback = arguments[arguments.length - 1];
+ if (typeof callback !== 'function') {
+ callback = undefined;
+ }
- // This sequence of information is unique to the connection
- // but not to the worker
- var args = [message.address,
- message.port,
- message.addressType,
- message.fd];
- var key = args.join(':');
- var handler;
-
- if (serverHandlers.hasOwnProperty(key)) {
- handler = serverHandlers[key];
- } else {
- handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
- }
+ // exist handler
+ var handler = arguments[2] !== callback ? arguments[2] : undefined;
- // echo callback with the fd handler associated with it
- send({}, handler);
- };
+ if (!isInternalMessage(message)) {
+ message = internalMessage(message);
+ }
- // Handle listening messages from workers
- messageHandler.listening = function(message, worker) {
+ // Store callback for later
+ if (callback) {
+ message._requestEcho = worker.id + ':' + (++queryIds);
+ queryCallbacks[message._requestEcho] = callback;
+ }
- worker.state = 'listening';
- // Emit listening, now that we know the worker is listening
- worker.emit('listening', {
- address: message.address,
- port: message.port,
- addressType: message.addressType,
- fd: message.fd
- });
- cluster.emit('listening', worker, {
- address: message.address,
- port: message.port,
- addressType: message.addressType,
- fd: message.fd
- });
- };
+ worker.send(message, handler);
+}
- // Handle suicide messages from workers
- messageHandler.suicide = function(message, worker) {
- worker.suicide = true;
- };
+var cluster;
+var isWorker = 'NODE_UNIQUE_ID' in process.env;
-}
+if (isWorker) {
-// Messages to a worker will be handled using these methods
-else if (cluster.isWorker) {
+ var serverListeners = {};
+ var queryIds = 0;
+ var queryCallbacks = {};
- // Handle worker.disconnect from master
- messageHandler.disconnect = function(message, worker) {
- worker.disconnect();
+ // limit what's exposed to the worker to what can be used in the worker
+ cluster = module.exports = {
+ isMaster: false,
+ isWorker: true
};
-}
-function toDecInt(value) {
- value = parseInt(value, 10);
- return isNaN(value) ? null : value;
-}
+ // Internal function. Called from src/node.js when worker process starts.
+ cluster._setupWorker = function() {
+
+ // the worker representation inside a worker
+ function Worker() {
+ EventEmitter.call(this);
+
+ this.id = toDecInt(process.env.NODE_UNIQUE_ID);
+
+ // XXX: Legacy. Remove in 0.9
+ this.workerID = this.uniqueID = this.id;
+
+ // handle internalMessage and exit event
+ process.on('internalMessage', handleMessage.bind(null, this));
+
+ // relay messages from the master
+ process.on('message', this.emit.bind(this, 'message'));
+ };
+ util.inherits(Worker, EventEmitter);
+
+ Worker.prototype.destroy = function() {
+ this.suicide = true;
+
+ // Channel is open
+ if (process.connected) {
+
+ // Inform master to suicide and then kill
+ sendInternalMessage(this, {cmd: 'suicide'}, function() {
+ process.exit(0);
+ });
+
+ var self = this;
+
+ // When channel is closed, terminate the process
+ process.once('disconnect', function() {
+ process.exit(0);
+ });
+
+ } else {
+ process.exit(0);
+ }
+ };
+
-// Create a worker object, that works both for master and worker
-function Worker(customEnv) {
- if (!(this instanceof Worker)) return new Worker();
- EventEmitter.call(this);
+ Worker.prototype.disconnect = function() {
+ this.suicide = true;
- var self = this;
- var env = process.env;
+ // keep track of open servers
+ var servers = Object.keys(serverListeners).length;
+ var progress = new ProgressTracker(servers, function() {
+ // There are no more servers open so we will close the IPC channel.
+ // Closeing the IPC channel will emit emit a disconnect event
+ // in both master and worker on the process object.
+ // This event will be handled by prepearDeath.
+ process.disconnect();
+ });
+
+ // depending on where this function was called from (master or worker)
+ // The suicide state has already been set,
+ // but it doesn't really matter if we set it again.
+ sendInternalMessage(this, {cmd: 'suicide'}, function() {
+ // in case there are no servers
+ progress.check();
+
+ // closeing all servers graceful
+ var server;
+ for (var key in serverListeners) {
+ server = serverListeners[key];
+
+ // in case the server is closed we won't close it again
+ if (server._handle === null) {
+ progress.done();
+ continue;
+ }
+
+ server.on('close', progress.done.bind(progress));
+ server.close();
+ }
+ });
- // Assign a unique id, default null
- this.id = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID);
+ };
+
+ // Send message to master
+ Worker.prototype.send = process.send.bind(process);
+
+ // Get worker class
+ var worker = cluster.worker = new Worker();
+
+ // when the worker is disconnected from the parent accidentally
+ // we will terminate the worker
+ process.once('disconnect', function() {
+ if (worker.suicide !== true) {
+ process.exit(0);
+ }
+ });
- // XXX: Legacy. Remove in 0.9
- this.workerID = this.uniqueID = this.id;
+ // Tell master that the worker is online
+ sendInternalMessage(worker, { cmd: 'online' });
+ };
+
+ // Internal function. Called by lib/net.js when attempting to bind a server.
+ cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
+ // This can only be called from a worker.
+ assert(cluster.isWorker);
+
+ // Store tcp instance for later use
+ var key = [address, port, addressType, fd].join(':');
+ serverListeners[key] = tcpSelf;
+
+ // Send a listening message to the master
+ tcpSelf.once('listening', function() {
+ cluster.worker.state = 'listening';
+ sendInternalMessage(cluster.worker, {
+ cmd: 'listening',
+ address: address,
+ port: port,
+ addressType: addressType,
+ fd: fd
+ });
+ });
+
+ // Request the fd handler from the master process
+ var message = {
+ cmd: 'queryServer',
+ address: address,
+ port: port,
+ addressType: addressType,
+ fd: fd
+ };
+
+ // The callback will be stored until the master has responded
+ sendInternalMessage(cluster.worker, message, function(msg, handle) {
+ cb(handle);
+ });
+
+ };
- // Assign state
- this.state = 'none';
+ // Handle worker.disconnect from master
+ messageHandler.disconnect = function(message, worker) {
+ worker.disconnect();
+ };
- // Create or get process
- if (cluster.isMaster) {
+} else { // isMaster
+ // Create a worker object, that works both for master and worker
+ function Worker(master, customEnv, id) {
+ EventEmitter.call(this);
+
+ var self = this;
+ var env = process.env;
+
+ this.master = master;
+ this.id = id;
+
+ // XXX: Legacy. Remove in 0.9
+ this.workerID = this.uniqueID = this.id;
+
+ // Assign state
+ this.state = 'none';
+
// Create env object
// first: copy and add id property
var envCopy = util._extend({}, env);
@@ -282,275 +336,236 @@ function Worker(customEnv) {
envCopy = util._extend(envCopy, customEnv);
}
+ var settings = master.settings;
+
// fork worker
this.process = fork(settings.exec, settings.args, {
'env': envCopy,
'silent': settings.silent,
'execArgv': settings.execArgv
});
- } else {
- this.process = process;
- }
- if (cluster.isMaster) {
- // Save worker in the cluster.workers array
- cluster.workers[this.id] = this;
-
- // Emit a fork event, on next tick
- // There is no worker.fork event since this has no real purpose
- process.nextTick(function() {
- cluster.emit('fork', self);
+ // handle internalMessage, exit and disconnect event
+ this.process.on('internalMessage', handleMessage.bind(null, this));
+ this.process.once('exit', function(exitCode, signalCode) {
+ self._prepareExit('dead');
+ self.emit('exit', exitCode, signalCode);
+ cluster.emit('exit', self, exitCode, signalCode);
+ });
+ this.process.once('disconnect', function() {
+ self._prepareExit('disconnected');
+ self.emit('disconnect');
+ cluster.emit('disconnect', self);
});
- }
-
- // handle internalMessage, exit and disconnect event
- this.process.on('internalMessage', handleMessage.bind(null, this));
- this.process.once('exit', function(exitCode, signalCode) {
- prepareExit(self, 'dead');
- self.emit('exit', exitCode, signalCode);
- cluster.emit('exit', self, exitCode, signalCode);
- });
- this.process.once('disconnect', function() {
- prepareExit(self, 'disconnected');
- self.emit('disconnect');
- cluster.emit('disconnect', self);
- });
-
- // relay message and error
- this.process.on('message', this.emit.bind(this, 'message'));
- this.process.on('error', this.emit.bind(this, 'error'));
-
-}
-util.inherits(Worker, EventEmitter);
-cluster.Worker = Worker;
-
-function prepareExit(worker, state) {
-
- // set state to disconnect
- worker.state = state;
-
- // Make suicide a boolean
- worker.suicide = !!worker.suicide;
-
- // Remove from workers in the master
- if (cluster.isMaster) {
- delete cluster.workers[worker.id];
- }
-}
-
-// Send internal message
-function sendInternalMessage(worker, message/*, handler, callback*/) {
-
- // Exist callback
- var callback = arguments[arguments.length - 1];
- if (typeof callback !== 'function') {
- callback = undefined;
- }
-
- // exist handler
- var handler = arguments[2] !== callback ? arguments[2] : undefined;
- if (!isInternalMessage(message)) {
- message = internalMessage(message);
+ // relay message and error
+ this.process.on('message', this.emit.bind(this, 'message'));
+ this.process.on('error', this.emit.bind(this, 'error'));
+
}
+ util.inherits(Worker, EventEmitter);
+
+ Worker.prototype._prepareExit = function (state) {
+
+ // set state to disconnect
+ this.state = state;
+
+ // Make suicide a boolean
+ this.suicide = !!this.suicide;
+
+ // Remove from workers in the master
+ delete this.master.workers[this.id];
+ };
- // Store callback for later
- if (callback) {
- message._requestEcho = worker.id + ':' + (++queryIds);
- queryCallbacks[message._requestEcho] = callback;
- }
-
-
- worker.send(message, handler);
-}
-
-// Send message to worker or master
-Worker.prototype.send = function() {
-
- // You could also just use process.send in a worker
- this.process.send.apply(this.process, arguments);
-};
-
-// Kill the worker without restarting
-Worker.prototype.destroy = function() {
- var self = this;
-
- this.suicide = true;
-
- if (cluster.isMaster) {
- // Disconnect IPC channel
- // this way the worker won't need to propagate suicide state to master
+ Worker.prototype.destroy = function() {
+ var self = this;
if (self.process.connected) {
self.process.once('disconnect', function() {
self.process.kill();
});
self.process.disconnect();
} else {
- self.process.kill();
+ self.process.kill();
}
+ };
- } else {
- // Channel is open
- if (this.process.connected) {
-
- // Inform master to suicide and then kill
- sendInternalMessage(this, {cmd: 'suicide'}, function() {
- process.exit(0);
- });
+ // Send message to worker
+ Worker.prototype.send = function() {
+ this.process.send.apply(this.process, arguments);
+ };
- // When channel is closed, terminate the process
- this.process.once('disconnect', function() {
- process.exit(0);
- });
- } else {
- process.exit(0);
- }
- }
-};
-// The .disconnect function will close all servers
-// and then disconnect the IPC channel.
-if (cluster.isMaster) {
- // Used in master
+ // The .disconnect function will close all servers
+ // and then disconnect the IPC channel.
Worker.prototype.disconnect = function() {
this.suicide = true;
sendInternalMessage(this, {cmd: 'disconnect'});
};
-} else {
- // Used in workers
- Worker.prototype.disconnect = function() {
- var self = this;
-
- this.suicide = true;
- // keep track of open servers
- var servers = Object.keys(serverListeners).length;
- var progress = new ProgressTracker(servers, function() {
- // There are no more servers open so we will close the IPC channel.
- // Closing the IPC channel will emit a disconnect event
- // in both master and worker on the process object.
- // This event will be handled by prepareExit.
- self.process.disconnect();
- });
+ var allWorkers = {};
- // depending on where this function was called from (master or worker)
- // The suicide state has already been set,
- // but it doesn't really matter if we set it again.
- sendInternalMessage(this, {cmd: 'suicide'}, function() {
- // in case there are no servers
- progress.check();
-
- // closing all servers gracefully
- var server;
- for (var key in serverListeners) {
- server = serverListeners[key];
-
- // in case the server is closed we won't close it again
- if (server._handle === null) {
- progress.done();
- continue;
- }
+ // Kill workers when a uncaught exception is received
+ process.on('uncaughtException', function(err) {
+ // Did the user install a listener? If so, it overrides this one.
+ if (process.listeners('uncaughtException').length > 1) return;
- server.on('close', progress.done.bind(progress));
- server.close();
+ // Output the error stack, and create on if non exist
+ if (!(err instanceof Error)) {
+ err = new Error(err);
+ }
+ console.error(err.stack);
+
+ // quick destroy cluster
+ // Sync way to quickly kill all cluster workers
+ // However the workers may not die instantly
+ for (var i in allWorkers) {
+ if (allWorkers.hasOwnProperty(i)) {
+ allWorkers[i].process.disconnect();
+ allWorkers[i].process.kill();
}
+ }
+
+ // when done exit process with error code: 1
+ process.exit(1);
+ });
+
+ function Master() {
+ EventEmitter.call(this);
+ this._started = false;
+ this._serverHandles = {};
+ // Settings object
+ this.settings = {};
+ this.workers = {};
+ };
+
+ util.inherits(Master, EventEmitter);
+
+ Master.prototype.isMaster = true;
+ Master.prototype.isWorker = false;
+
+ Master.prototype.setupMaster = function(options) {
+ // Don't allow this function to run more than once
+ if (this._started) return;
+ this._started = true;
+
+ // Get filename and arguments
+ options = options || {};
+
+ // Set settings object
+ this.settings = {
+ exec: options.exec || process.argv[1],
+ execArgv: options.execArgv || process.execArgv,
+ args: options.args || process.argv.slice(2),
+ silent: options.silent || false
+ };
+
+
+ // emit setup event
+ cluster.emit('setup');
+ };
+
+ var ids = 0;
+
+ // Fork a new worker
+ Master.prototype.fork = function(env) {
+
+ // Make sure that the master has been initialized
+ this.setupMaster();
+
+ var id = ids++,
+ worker = this.workers[id] = allWorkers[id] = new Worker(this, env, id),
+ master = this;
+
+ // Emit a fork event, on next tick
+ // There is no worker.fork event since this has no real purpose
+ process.nextTick(function() {
+ master.emit('fork', worker);
});
+ return worker;
};
-}
-// Fork a new worker
-cluster.fork = function(env) {
- // This can only be called from the master.
- assert(cluster.isMaster);
+ // execute .disconnect on all workers and close handlers when done
+ Master.prototype.disconnect = function(callback) {
- // Make sure that the master has been initialized
- cluster.setupMaster();
+ var self = this;
- return (new cluster.Worker(env));
-};
+ // Close all TCP handlers when all workers are disconnected
+ var workers = Object.keys(this.workers).length;
+ var progress = new ProgressTracker(workers, function() {
+ for (var key in self._serverHandles) {
+ self._serverHandles[key].close();
+ delete self._serverHandles[key];
+ }
+
+ // call callback when done
+ if (callback) callback();
+ });
-// execute .disconnect on all workers and close handlers when done
-cluster.disconnect = function(callback) {
- // This can only be called from the master.
- assert(cluster.isMaster);
-
- // Close all TCP handlers when all workers are disconnected
- var workers = Object.keys(cluster.workers).length;
- var progress = new ProgressTracker(workers, function() {
- for (var key in serverHandlers) {
- serverHandlers[key].close();
- delete serverHandlers[key];
+ // begin disconnecting all workers
+ for (var i in this.workers) {
+ this.workers[i].once('disconnect', progress.done.bind(progress));
+ this.workers[i].disconnect();
}
- // call callback when done
- if (callback) callback();
- });
+ // in case there weren't any workers
+ progress.check();
+ };
- // begin disconnecting all workers
- eachWorker(function(worker) {
- worker.once('disconnect', progress.done.bind(progress));
- worker.disconnect();
- });
+ Master.prototype._getServerHandle = function (address, port, addressType, fd) {
+ var key = address + ':' + port + ':' + addressType + ':' + fd;
- // in case there weren't any workers
- progress.check();
-};
+ if (this._serverHandles.hasOwnProperty(key)) {
+ return this._serverHandles[key];
+ } else {
+ return this._serverHandles[key] = net._createServerHandle(address, port, addressType, fd);
+ }
+ };
-// Internal function. Called from src/node.js when worker process starts.
-cluster._setupWorker = function() {
+ cluster = module.exports = new Master();
- // Get worker class
- var worker = cluster.worker = new Worker();
+ cluster.Master = Master;
+ cluster.Worker = Worker;
- // we will terminate the worker
- // when the worker is disconnected from the parent accidentally
- process.once('disconnect', function() {
- if (worker.suicide !== true) {
- process.exit(0);
- }
- });
+ // Handle online messages from workers
+ messageHandler.online = function(message, worker) {
+ worker.state = 'online';
+ debug('Worker ' + worker.process.pid + ' online');
+ worker.emit('online');
+ worker.master.emit('online', worker);
+ };
- // Tell master that the worker is online
- worker.state = 'online';
- sendInternalMessage(worker, { cmd: 'online' });
-};
+ // Handle queryServer messages from workers
+ messageHandler.queryServer = function(message, worker, send) {
+ send({}, worker.master._getServerHandle(message.address, message.port, message.addressType));
+ };
-// Internal function. Called by lib/net.js when attempting to bind a server.
-cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
- // This can only be called from a worker.
- assert(cluster.isWorker);
+ // Handle listening messages from workers
+ messageHandler.listening = function(message, worker) {
- // Store tcp instance for later use
- var key = [address, port, addressType, fd].join(':');
- serverListeners[key] = tcpSelf;
+ worker.state = 'listening';
- // Send a listening message to the master
- tcpSelf.once('listening', function() {
- cluster.worker.state = 'listening';
- sendInternalMessage(cluster.worker, {
- cmd: 'listening',
- address: address,
- port: port,
- addressType: addressType,
- fd: fd
+ // Emit listining, now that we know the worker is listening
+ worker.emit('listening', {
+ address: message.address,
+ port: message.port,
+ addressType: message.addressType,
+ fd: message.fd
});
- });
+ worker.master.emit('listening', worker, {
+ address: message.address,
+ port: message.port,
+ addressType: message.addressType
+ fd: message.fd
+ });
+ };
- // Request the fd handler from the master process
- var message = {
- cmd: 'queryServer',
- address: address,
- port: port,
- addressType: addressType,
- fd: fd
+ // Handle suicide messages from workers
+ messageHandler.suicide = function(message, worker) {
+ worker.suicide = true;
};
- // The callback will be stored until the master has responded
- sendInternalMessage(cluster.worker, message, function(msg, handle) {
- cb(handle);
- });
+}
-};
View
1  test/simple/test-cluster-basic.js
@@ -150,6 +150,7 @@ else if (cluster.isMaster) {
//Check all values
process.once('exit', function() {
+
//Check cluster events
forEach(checks.cluster.events, function(check, name) {
assert.ok(check, 'The cluster event "' + name + '" on the cluster ' +
View
82 test/simple/test-cluster-encapsulated-master.js
@@ -0,0 +1,82 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+
+if (cluster.isWorker) {
+
+ // Just keep the worker alive
+ process.send(process.argv[2]);
+
+} else if (cluster.isMaster) {
+
+ var checks = {
+ globalMasterArgs : false,
+ localMasterArgs : false,
+ globalMasterWorkers : 0,
+ localMasterWorkers : 0
+ };
+
+ cluster.setupMaster({
+ 'args' : ['custom argument']
+ });
+
+ var localMaster = new cluster.Master();
+
+ localMaster.setupMaster({
+ 'args' : ['local custom argument']
+ });
+
+ cluster.on('online', function lisenter(worker) {
+ checks.globalMasterWorkers++;
+
+ worker.once('message', function(data) {
+ checks.globalMasterArgs = (data === 'custom argument');
+ worker.destroy();
+ });
+ });
+
+ localMaster.on('online', function lisenter(worker) {
+ checks.localMasterWorkers++;
+
+ worker.once('message', function(data) {
+ checks.localMasterArgs = (data === 'local custom argument');
+ worker.destroy();
+ });
+ });
+
+ // Start workers
+ cluster.fork();
+ localMaster.fork();
+
+ // Check all values
+ process.once('exit', function() {
+ assert.ok(checks.globalMasterWorkers === 1, 'Wrong number of workers for global master');
+ assert.ok(checks.localMasterWorkers === 1, 'Wrong number of workers for local master');
+ assert.ok(checks.globalMasterArgs, 'Worker for global master did not receive custom args');
+ assert.ok(checks.localMasterArgs, 'Worker for local master did not receive custom args');
+ });
+
+}
+
View
2  test/simple/test-cluster-setup-master.js
@@ -81,7 +81,7 @@ if (cluster.isWorker) {
// Check all values
process.once('exit', function() {
- assert.ok(checks.args, 'The arguments was noy send to the worker');
+ assert.ok(checks.args, 'The arguments were not sent to the worker');
assert.ok(checks.setupEvent, 'The setup event was never emitted');
var m = 'The settingsObject do not have correct properties';
assert.ok(checks.settingsObject, m);
Something went wrong with that request. Please try again.