Permalink
Browse files

Continue support http server workers

  • Loading branch information...
1 parent a05dab5 commit e4f61076f0fb7e1f817d071d57e731c120f7589e @DukeyToo DukeyToo committed Mar 20, 2012
Showing with 298 additions and 36 deletions.
  1. +80 −0 examples/fib.js
  2. +25 −0 examples/fib.json
  3. +23 −0 examples/fibworker.js
  4. +51 −22 lib/cligen.js
  5. +14 −6 lib/server.js
  6. +13 −4 lib/worker.js
  7. +3 −2 test/cache.js
  8. +3 −2 test/proxy.js
  9. +6 −0 test/testapi.js
  10. +4 −0 test/testapi.json
  11. +76 −0 test/webworker.js
View
@@ -0,0 +1,80 @@
+#!/usr/bin/env node
+
+var perfectapi = require('../api.js');
+var path = require('path');
+
+var configPath = path.resolve(__dirname, 'fib.json');
+var parser = new perfectapi.Parser();
+
+var worker = require('child_process').fork(__dirname + '/fibworker.js');
+var callbacks = {};
+var requestNum = 0;
+worker.on('message', function(m) {
+ var callback = callbacks[m.id];
+ delete callbacks[m.id];
+
+ callback(null, m.result);
+})
+
+var numWorkers = require('os').cpus().length;
+var nextWorker = 0;
+var workers = [];
+for (var i=0;i<numWorkers;i++) {
+ var worker = require('child_process').fork(__dirname + '/fibworker.js');
+ workers.push(worker);
+
+ worker.on('message', function(m) {
+ var callback = callbacks[m.id];
+ delete callbacks[m.id];
+
+ callback(null, m.result);
+ })
+}
+
+//handle the commands
+parser.on('fib', function(config, callback) {
+ var n = config.number;
+ if (n<1 || n>40) {
+ callback('n must be between 1 and 40');
+ } else {
+ callback(null, fibonacci(n))
+ }
+})
+
+parser.on('randomfib', function(config, callback) {
+ var n = Math.ceil(Math.random()*30)
+ callback(null, fibonacci(n));
+})
+
+parser.on('randomfib2', function(config, callback) {
+ requestNum += 1;
+ callbacks[requestNum] = callback;
+
+ var n = Math.ceil(Math.random()*30)
+ worker.send({n: n, id: requestNum});
+})
+
+parser.on('randomfib3', function(config, callback) {
+ requestNum += 1;
+ callbacks[requestNum] = callback;
+
+ var n = Math.ceil(Math.random()*30);
+
+ //round robin workers
+ var worker = workers[nextWorker];
+ nextWorker += 1;
+ if (nextWorker == workers.length) nextWorker = 0;
+
+ worker.send({n: n, id: requestNum});
+})
+
+//expose the api
+module.exports = parser.parse(configPath);
+
+function fibonacci(n) {
+ if (n < 2)
+ return 1;
+ else
+ return fibonacci(n-2) + fibonacci(n-1);
+}
+
View
@@ -0,0 +1,25 @@
+{ "exports": "fib",
+ "signature": [
+ {
+ "name": "fib",
+ "synopsis": "runs a fibonacci sequence",
+ "verb": "POST",
+ "parameters": [
+ {"name": "number", "required":true, "description":"which fibonacci to calculate"}
+ ]
+ },
+ {
+ "name": "randomfib",
+ "synopsis": "runs a random fibonacci sequence, 1 to 30"
+ },
+ {
+ "name": "randomfib2",
+ "synopsis": "runs a random fibonacci sequence, 1 to 30, using a long-lived worker process"
+ },
+ {
+ "name": "randomfib3",
+ "synopsis": "runs a random fibonacci sequence, 1 to 30, using multiple long-lived worker processes"
+ }
+ ],
+ "path": "fib"
+}
View
@@ -0,0 +1,23 @@
+module.exports = fibonacci;
+
+process.on('message', function(m) {
+ var n = m.n;
+ var id = m.id;
+ var result;
+
+ if (n < 2) {
+ result = 1
+ } else {
+ result = fibonacci(n-2) + fibonacci(n-1)
+ }
+
+ process.send({id: id, result: result});
+})
+
+function fibonacci(n) {
+ if (n < 2)
+ return 1;
+ else
+ return fibonacci(n-2) + fibonacci(n-1);
+}
+
View
@@ -26,7 +26,6 @@ var program = require('./commander.js');
var util = require('util');
var events = require('events');
var cfg = require('./config.js');
-var webserver;
//I don't understand really, but following example from
// http://elegantcode.com/2011/02/21/taking-baby-steps-with-node-js-implementing-events/
@@ -55,12 +54,14 @@ Parser.prototype.parse = function(configPath) {
//handling for special reserved commands
self.on('server', function(serverConfig, callback) {
+ //HACK: always run in-process for now. Node process.send() blocks, making it unusable for high throughput IPC.
if ((serverConfig.options.webworker == 'false' || serverConfig.options.webworker == false)
|| (serverConfig.options.webworker == 'auto' && require('os').cpus().length == 1)) {
runWebServerInProcess(rawConfig, serverConfig, self);
+ callback();
} else {
- runWebServerAsWorker(rawConfig, serverConfig, self);
+ runWebServerAsWorker(rawConfig, serverConfig, self, callback);
}
})
self.on('config', function(config, callback) {
@@ -131,36 +132,64 @@ function runWebServerInProcess(rawConfig, serverConfig, emitter) {
});
}
+var webservers = []; //array or webserver workers
+var netServer;
/**
* Runs the web server in a forked process. This is not necessarily quicker, but it does allow the
* user's code to hog CPU without affecting the running of the web server
*/
-function runWebServerAsWorker(rawConfig, serverConfig, emitter) {
+function runWebServerAsWorker(rawConfig, serverConfig, emitter, callback) {
if (serverConfig.command == 'stop') {
- logger.info('stopping server');
- if (webserver) {
- webserver.send({message: 'stop'});
- webserver.kill();
- webserver = null;
+ logger.info('stopping server workers');
+ if (netServer) {
+ netServer.close();
+ netServer = null;
+ }
+
+ for (var i=0;i<webservers.length;i++) {
+ var webserver = webservers[i];
+ webserver.kill();
}
+ webservers = [];
return;
}
- webserver = fork(__dirname + '/worker.js', [], {env: process.env});
- webserver.send({message: 'start', rawConfig: rawConfig, serverConfig: serverConfig})
-
- webserver.on('message', function(m) {
- var id = m.id;
- if (m.err) {
- logger.error("Error: " + m.err);
- if (callback) callback(m.err);
- } else {
- emitter.emit(m.commandName, m.config, function(err, result) {
- webserver.send({message: 'result', err: err, result: result, id: id})
- });
+ netServer = require('net').createServer().listen(serverConfig.options.port, function() {
+ var numWorkers = require('os').cpus().length;
+ var readyCount = 0;
+ for (var i=0;i<numWorkers;i++) {
+ var webserver = fork(__dirname + '/worker.js', [], {env: process.env});
+ webservers.push(webserver);
+ webserver.send({message: 'start', rawConfig: rawConfig, serverConfig: serverConfig}, netServer._handle)
+
+ webserver.on('message', function(m) {
+ if (m == 'ready') {
+ //this is a reply from the worker indicating that the server is ready.
+ readyCount += 1;
+ if (readyCount == numWorkers) {
+ //all servers are ready
+ logger.info('started server workers');
+ if (callback) callback();
+ }
+ return;
+ }
+
+ var webserverWorker = this;
+ var id = m.id;
+ if (m.err) {
+ logger.error("Error: " + m.err);
+ if (callback) callback(m.err);
+ } else {
+ emitter.emit(m.commandName, m.config, function(err, result) {
+ webserverWorker.send({message: 'result', err: err, result: result, id: id})
+ });
+ }
+ })
}
- })
+
+
+ });
}
function runningFromCommandLine() {
@@ -271,7 +300,7 @@ function getCommandFunction(rawConfig, commandName, emitter) {
if (err && !util.isError(err)) {
err = new Error(err);
}
- callback(err, result);
+ if (callback) callback(err, result);
}
var finalConfig = cfg.getDefaultConfig(rawConfig, commandName);
View
@@ -16,7 +16,7 @@ exports.stop = function() {
}
}
-exports.listen = function listen(config, serverCommandConfig, callbackCligen) {
+exports.listen = function listen(config, serverCommandConfig, callbackCligen, serverHandle, done) {
app = express.createServer();
app.configure(function(){
app.use(express.bodyParser());
@@ -25,7 +25,7 @@ exports.listen = function listen(config, serverCommandConfig, callbackCligen) {
var commands = cfg.getCommands(config);
var csharpclient = (config.exports || 'api') + '.cs';
- logger.info('listening for c# client at /' + config.path + '/' + csharpclient);
+ logger.verbose('listening for c# client at /' + config.path + '/' + csharpclient);
app.get('/' + config.path + '/' + csharpclient, function(req, res, next) {
logger.verbose('Sending csharp client');
@@ -44,7 +44,7 @@ exports.listen = function listen(config, serverCommandConfig, callbackCligen) {
res.end();
});
- logger.info('listening for javascript client at /' + config.path + '/jquery.perfectapi.js');
+ logger.verbose('listening for javascript client at /' + config.path + '/jquery.perfectapi.js');
app.get('/' + config.path + '/jquery.perfectapi.js', function(req, res, next) {
logger.verbose('Sending javascript');
@@ -59,7 +59,7 @@ exports.listen = function listen(config, serverCommandConfig, callbackCligen) {
res.end();
});
- logger.info('listening for test app on /' + config.path + '/testapp/');
+ logger.verbose('listening for test app on /' + config.path + '/testapp/');
app.get('/' + config.path + '/testapp', function(req, res, next) {
//special handling for the index file
var testAppFolder = path.resolve(__dirname, '..', 'testapp');
@@ -84,8 +84,16 @@ exports.listen = function listen(config, serverCommandConfig, callbackCligen) {
}
}
- app.listen(serverCommandConfig.options.port);
- logger.info('Listening on port ' + serverCommandConfig.options.port);
+ if (serverHandle) {
+ app.listen(serverHandle);
+ } else {
+ app.listen(serverCommandConfig.options.port);
+ }
+
+ logger.verbose('Listening on port ' + serverCommandConfig.options.port);
+
+ if (done) done();
+
return app;
}
View
@@ -24,23 +24,27 @@ var server = require('./server.js');
var messageId = 0;
var messageStack = {};
-process.on('message', function(m) {
+process.on('message', function(m, serverHandle) {
//this occurs when the web server is running as a child process.
if (m.message == 'start') {
logger.verbose('received start message')
- //start the web server
+ //define a function for the webserver to call when it receives a command
var callbackCligen = function(err, commandName, config, resultFunction) {
- //max Number in javascript is 9,007,199,254,740,992. At 10,000 requests per second, that is 28,561 years until we get an overflow
+ //max Number in javascript is 9,007,199,254,740,992. At 10,000 requests per second, that is 28,561 years until we get an overflow on messageId
messageId += 1;
messageStack[messageId] = resultFunction;
//tell the parent process to run the command
process.send({id: messageId, err: err, commandName: commandName, config: config});
}
- server.listen(m.rawConfig, m.serverConfig, callbackCligen)
+ server.listen(m.rawConfig, m.serverConfig, callbackCligen, serverHandle, function() {
+ //server is ready now, so tell the master process.
+ process.send('ready');
+ })
+
} else if (m.message == 'stop') {
//stop the web server
logger.verbose('received stop message')
@@ -51,6 +55,11 @@ process.on('message', function(m) {
var resultFunction = messageStack[m.id];
delete messageStack[m.id];
+ if (!resultFunction) {
+ logger.error('Unable to correlate message ' + m.id + ' to a callback');
+ return;
+ }
+
resultFunction(m.err, m.result);
}
})
View
@@ -5,9 +5,10 @@ var request = require('request');
var config_etag = '';
describe('Basic caching', function() {
+ var webworker = false;
before(function() {
- var config = {options: {port: 3001, webworker: false} };
+ var config = {options: {port: 3001, webworker: webworker} };
testapi.server(config);
})
@@ -35,7 +36,7 @@ describe('Basic caching', function() {
after(function() {
- var config = {command: 'stop' };
+ var config = {command: 'stop', options: {webworker:webworker} };
testapi.server(config, function() {
//should stop the server
})
View
@@ -3,9 +3,10 @@ var testapi = require('./testapi.js');
var util = require('util');
describe('Node proxy', function() {
+ var webworker = false;
before(function() {
- var config = {options: {port: 3001, webworker: false} };
+ var config = {options: {port: 3001, webworker: webworker} };
testapi.server(config);
})
@@ -125,7 +126,7 @@ describe('Node proxy', function() {
})
after(function() {
- var config = {command: 'stop' };
+ var config = {command: 'stop', options: {webworker:webworker} };
testapi.server(config, function() {
//should stop the server
})
Oops, something went wrong.

0 comments on commit e4f6107

Please sign in to comment.