Skip to content

Commit

Permalink
Merge pull request #394 from joola/feature/#393
Browse files Browse the repository at this point in the history
#393 Added domain management on routes
  • Loading branch information
itayw committed Apr 16, 2014
2 parents 7dee83e + b02a5fc commit 80624a9
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 107 deletions.
46 changes: 39 additions & 7 deletions lib/dispatch/index.js
Expand Up @@ -13,6 +13,7 @@
var
joola = require('../joola.io'),

domain = require('domain'),
util = require('util'),
fs = require('fs'),
path = require('path'),
Expand Down Expand Up @@ -334,14 +335,28 @@ dispatch.processRequest = function (message, headers) {
return l._dispatch.message === headers.destination.replace('/queue/joola.dispatch.', '').replace('.', ':');
});
listeners.forEach(function (listener) {
listener.run.apply(message, _params);

var d = domain.create();
d.on('error', function (err) {
joola.logger.debug('Failed to process dispatch request: ' + err);
console.log('err', err);
_result = {
err: err,
message: null
};
dispatch.fulfill(message, _result, headers);
});
d.run(function () {
listener.run.apply(message, _params);
});
});
}
catch (ex) {
_result = {
err: ex,
message: null
};
dispatch.fulfill(message, _result, headers);
}
});
};
Expand Down Expand Up @@ -370,8 +385,21 @@ dispatch.processResponse = function (message, headers) {
args.push(message);

if (listener[message.id]) {
listener[message.id].apply(listener, args);
delete listener[message.id];
var d = domain.create();
d.on('error', function (err) {
joola.logger.debug('Failed to process dispatch response: ' + err);
//TODO: The assumption here is that the same function that was called and failed will handle the error properly. this is a risky assumption, but it allows for proper error messages to be handled.
try {
listener[message.id].apply(listener, [err]);
}
catch (ex) {
//there's nothing we can do at this point. the request will timeout and die.
}
});
d.run(function () {
listener[message.id].apply(listener, args);
delete listener[message.id];
});
}
}
});
Expand Down Expand Up @@ -415,7 +443,8 @@ dispatch.request = function (token, channel, params, callback) {

dispatch.fulfill = function (message, result, headers, callback) {
var self = dispatch;
callback = callback || function(){};
callback = callback || function () {
};

message.fulfilled = 1;
message['fulfilled-by'] = joola.UID;
Expand All @@ -430,7 +459,8 @@ dispatch.fulfill = function (message, result, headers, callback) {
};

dispatch.on = function (channel, callback, next) {
callback = callback || function(){};
callback = callback || function () {
};

var exists = _.find(dispatch.listeners, function (listener) {
return listener._dispatch.message == channel && (listener._dispatch.cb ? listener._dispatch.cb.toString() : null) == (callback ? callback.toString() : null);
Expand All @@ -454,7 +484,8 @@ dispatch.on = function (channel, callback, next) {
};

dispatch.once = function (channel, callback, next) {
callback = callback || function(){};
callback = callback || function () {
};

var exists = _.find(dispatch.listeners, function (listener) {
return listener._dispatch.message == channel && (listener._dispatch.cb ? listener._dispatch.cb.toString() : null) == (callback ? callback.toString() : null);
Expand Down Expand Up @@ -483,7 +514,8 @@ dispatch.once = function (channel, callback, next) {
};

dispatch.emit = function (channel, message, callback) {
callback = callback || function(){};
callback = callback || function () {
};
if (dispatch.publisher) {
dispatch.publisher.publish(channel, JSON.stringify(message), callback);
joola.logger.trace('[dispatch] emit on: ' + channel);
Expand Down
205 changes: 105 additions & 100 deletions lib/webserver/routes/index.js
Expand Up @@ -12,6 +12,7 @@
var
joola = require('../../joola.io'),

domain = require('domain'),
path = require('path'),
fs = require('fs'),
url = require('url'),
Expand Down Expand Up @@ -111,117 +112,121 @@ exports.index = function (req, res) {
};

exports.route = function (req, res) {
//check if we're stopped (emergency was called)
if (joola.stopped)
if (res.render)
return res.render('server-offline');

if (joola.state.get().status != 'online') {
if (res.render)
return res.render('server-offline');
else
return res.json('server-offline');
}
var d = domain.create();
d.on('error', function (err) {
joola.logger.warn('Failed to route request: ' + err);
return exports.responseError(new ErrorTemplate('Failed to route request: ' + err), req, res);
});
d.add(req);
d.add(res);
d.run(function () {
//check if we're stopped (emergency was called)
if (joola.stopped)
if (res.render)
return res.render('server-offline');

if (joola.state.get().status != 'online') {
if (res.render)
return res.render('server-offline');
else
return res.json('server-offline');
}

var modulename = req.params.resource;
var module;
var action = req.params.action;
var modulename = req.params.resource;
var module;
var action = req.params.action;

if (!req.token && modulename != 'test' && modulename != 'users')
return exports.responseError(new ErrorTemplate('Missing token.'), req, res);
else if (!req.token && (modulename == 'test' || modulename == 'users')) {
req.token = {_: joola.config.authentication.bypassToken};
joola.logger.trace('Applying bypass token for request [4].');
}
if (!req.token && modulename != 'test' && modulename != 'users')
return exports.responseError(new ErrorTemplate('Missing token.'), req, res);
else if (!req.token && (modulename == 'test' || modulename == 'users')) {
req.token = {_: joola.config.authentication.bypassToken};
joola.logger.trace('Applying bypass token for request [4].');
}

if (['png', 'ico', 'gif', 'css'].indexOf(req.url.substring(req.url.length - 3)) > -1)
return exports.responseError(new ErrorTemplate('Not implemented.'), req, res);
if (['png', 'ico', 'gif', 'css'].indexOf(req.url.substring(req.url.length - 3)) > -1)
return exports.responseError(new ErrorTemplate('Not implemented.'), req, res);

_.extend(req.params, req.query);
_.extend(req.params, req.query);

if (!modulename)
return exports.responseError(new ErrorTemplate('Module not specified.'), req, res);
if (!modulename)
return exports.responseError(new ErrorTemplate('Module not specified.'), req, res);

try {
module = require('../../dispatch/' + modulename);
}
catch (ex) {
console.log('err', ex);
console.log(ex.stack);
return exports.responseError(ex, req, res);
}
try {
module = require('../../dispatch/' + modulename);
}
catch (ex) {
console.log('err', ex);
console.log(ex.stack);
return exports.responseError(ex, req, res);
}

if (!action)
action = 'index';
if (!action)
action = 'index';

action = module[action];
if (req.params[0] instanceof Object) {
Object.keys(req.params[0]).forEach(function (key) {
try {
req.params[key] = req.params[0][key];
}
catch (ex) {
//ignore
}
});
}
joola.logger.silly('Routing [' + action.name + ']...');

//try {
action = module[action];
if (req.params[0] instanceof Object) {
Object.keys(req.params[0]).forEach(function (key) {
try {
req.params[key] = req.params[0][key];
}
catch (ex) {
//ignore
}
});
}
joola.logger.silly('Routing [' + action.name + ']...');

var dispatchRoute = function (req, res) {
var _params = {};
Object.keys(req.params).forEach(function (p) {
if (p != 'resource' && p != 'action')
_params[p] = req.params[p];
});
var aborted, timerID;

delete _params.resource;
delete _params.action;

if (action.inputs) {
var inputs;
if (action.inputs.required)
inputs = action.inputs.required.concat(action.inputs.optional);
else
inputs = action.inputs;
var dispatchRoute = function (req, res) {
var _params = {};
Object.keys(req.params).forEach(function (p) {
if (p != 'resource' && p != 'action')
_params[p] = req.params[p];
});
var aborted, timerID;

if (inputs.indexOf('APIToken') === -1)
delete _params.APIToken;
}
delete _params.resource;
delete _params.action;

if (action.inputs) {
var inputs;
if (action.inputs.required)
inputs = action.inputs.required.concat(action.inputs.optional);
else
inputs = action.inputs;

if (inputs.indexOf('APIToken') === -1)
delete _params.APIToken;
}

process.nextTick(function () {
if (!action._dispatch)
console.log(action);
joola.dispatch.request(req.token._, action._dispatch.message, _params, function (err, result) {
clearTimeout(timerID);

if (aborted)
return;
if (err)
return exports.responseError(err, req, res);
return exports.responseSuccess(result, req, res);
setImmediate(function () {
if (!action._dispatch)
console.log(action);
joola.dispatch.request(req.token._, action._dispatch.message, _params, function (err, result) {
clearTimeout(timerID);

if (aborted)
return;
if (err)
return exports.responseError(err, req, res);
return exports.responseSuccess(result, req, res);
});
timerID = setTimeout(function () {
return exports.responseError(new exports.ErrorTemplate('Timeout while waiting for [' + action.name + ']'), req, res);
}, joola.config.interfaces.webserver.timeout || 60000);
});
timerID = setTimeout(function () {
return exports.responseError(new exports.ErrorTemplate('Timeout while waiting for [' + action.name + ']'), req, res);
}, joola.config.interfaces.webserver.timeout || 60000);
});
};

if (req.params.options && req.params.options.local)
return action._route(req, res);
else if (req.params.options && req.params.options.local === false)
return dispatchRoute(req, res);
else if (action._route) {
return action._route(req, res);
}
else {
return dispatchRoute(req, res);
}
// }
// catch (ex) {
// return exports.responseError(new ErrorTemplate(ex), req, res);
// }
};

if (req.params.options && req.params.options.local)
return action._route(req, res);
else if (req.params.options && req.params.options.local === false)
return dispatchRoute(req, res);
else if (action._route) {
return action._route(req, res);
}
else {
return dispatchRoute(req, res);
}
});
};

exports.sdk = function (min, req, res) {
Expand Down

0 comments on commit 80624a9

Please sign in to comment.