Skip to content

Commit

Permalink
Flat I
Browse files Browse the repository at this point in the history
  • Loading branch information
Eran Hammer committed Sep 17, 2013
1 parent ae49c7e commit 02e4f14
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 109 deletions.
56 changes: 56 additions & 0 deletions lib/async.js
@@ -0,0 +1,56 @@
// Load modules

var Async = require('async');
var Utils = require('./utils');


// Declare internals

var internals = {};


exports = module.exports = Async;


Async.forEachSeriesContext = function (set, context, each, callback) {

if (!set.length) {
return Utils.nextTick(callback)();
}

var cycle = new internals.Cycle(set, context, Utils.nextTick(each), callback);
cycle.run();
};


internals.Cycle = function (set, context, each, callback) {

var self = this;

this.set = set;
this.context = context;
this.each = each;
this.callback = callback;
this.pos = 0;

this.next = function (err) {

if (err) {
return self.callback(err, self.context);
}

++self.pos;

return self.run();
};
};


internals.Cycle.prototype.run = function () {

if (this.pos === this.set.length) {
return this.callback(null, this.context);
}

this.each(this.set[this.pos], this.context, this.next);
};
2 changes: 1 addition & 1 deletion lib/composer.js
@@ -1,6 +1,6 @@
// Load modules

var Async = require('async');
var Async = require('./async');
var Pack = require('./pack');
var Utils = require('./utils');

Expand Down
6 changes: 3 additions & 3 deletions lib/ext.js
Expand Up @@ -2,7 +2,7 @@

var Domain = require('domain');
var Boom = require('boom');
var Async = require('async');
var Async = require('./async');
var Utils = require('./utils');


Expand Down Expand Up @@ -82,7 +82,7 @@ internals.Ext.prototype.invoke = function (request, event, callback) {

var handlers = this._events[event];
if (!handlers) {
return Utils.nextTick(callback)();
return Utils.nextTick(callback)(null, request);
}

var log = request.log.bind(request);
Expand All @@ -103,7 +103,7 @@ internals.Ext.prototype.invoke = function (request, event, callback) {

delete request.context;

return callback(err);
return callback(err, request);
});
};

Expand Down
2 changes: 1 addition & 1 deletion lib/pack.js
Expand Up @@ -2,8 +2,8 @@

var Path = require('path');
var Events = require('events');
var Async = require('async');
var Catbox = require('catbox');
var Async = require('./async');
var Server = require('./server');
var Views = require('./views');
var Utils = require('./utils');
Expand Down
197 changes: 97 additions & 100 deletions lib/request.js
@@ -1,8 +1,8 @@
// Load modules

var Url = require('url');
var Async = require('async');
var Boom = require('boom');
var Async = require('./async');
var Utils = require('./utils');
var Response = require('./response');
var Cached = require('./response/cached');
Expand Down Expand Up @@ -242,155 +242,152 @@ internals.Request.prototype.getLog = function (tags) {

internals.Request.prototype._execute = function () {

var self = this;

// Execute onRequest extensions (can change request method and url)

this.server._ext.invoke(this, 'onRequest', function (err) {
this.server._ext.invoke(this, 'onRequest', internals.onRequest);
};

// Undecorate request

delete self.setUrl;
delete self.setMethod;
internals.onRequest = function (err, request) {

if (err) {
self._reply(err);
return;
}
// Undecorate request

if (!self.path || self.path[0] !== '/') {
self._reply(Boom.badRequest('Invalid path'));
return;
}
delete request.setUrl;
delete request.setMethod;

// Lookup route
if (err) {
internals.reply(err, request);
return;
}

self._route = self.server._router.route(self);
self.route = self._route.settings;
if (!request.path || request.path[0] !== '/') {
internals.reply(Boom.badRequest('Invalid path'), request);
return;
}

// Setup timer
// Lookup route

var serverTimeout = self.server.settings.timeout.server;
if (serverTimeout) {
serverTimeout -= (Date.now() - self._timestamp); // Calculate the timeout from when the request was constructed
var timeoutReply = function () {
request._route = request.server._router.route(request);
request.route = request._route.settings;

self._reply(Boom.serverTimeout());
};
// Setup timer

if (serverTimeout <= 0) {
return timeoutReply();
}
var serverTimeout = request.server.settings.timeout.server;
if (serverTimeout) {
serverTimeout -= (Date.now() - request._timestamp); // Calculate the timeout from when the request was constructed
var timeoutReply = function () {

self._serverTimeoutId = setTimeout(timeoutReply, serverTimeout);
internals.reply(Boom.serverTimeout(), request);
};

if (serverTimeout <= 0) {
return timeoutReply();
}

Async.forEachSeries(self._route.cycle, function (func, next) {
request._serverTimeoutId = setTimeout(timeoutReply, serverTimeout);
}

if (self._isReplied) {
self.log(['hapi', 'server', 'timeout']);
return next(true); // Argument is ignored but aborts the series
}
Async.forEachSeriesContext(request._route.cycle, request, internals.cycle, internals.reply);
};

if (typeof func === 'string') {
self.server._ext.invoke(self, func, next);
return;
}

func(self, next);
internals.cycle = function (func, request, next) {

},
function (err) {
if (request._isReplied) {
request.log(['hapi', 'server', 'timeout']);
return next(true); // Argument is ignored but aborts the series
}

self._reply(err);
});
});
};
if (typeof func === 'string') {
request.server._ext.invoke(request, func, next);
return;
}

func(request, next);
};

internals.Request.prototype._reply = function (exit) {

var self = this;
internals.reply = function (exit, request) {

if (this._isReplied) { // Prevent any future responses to this request
if (request._isReplied) { // Prevent any future responses to this request
return;
}

this._isReplied = true;
request._isReplied = true;

clearTimeout(this._serverTimeoutId);
clearTimeout(request._serverTimeoutId);

var process = function () {
if (request._response &&
request._response.variety === 'closed') {

if (self._response &&
self._response.variety === 'closed') {
request.raw.res.end(); // End the response in case it wasn't already closed
return Utils.nextTick(internals.reply.finalize)(request);
}

self.raw.res.end(); // End the response in case it wasn't already closed
return Utils.nextTick(finalize)();
}
if (exit) {
internals.reply.override(request, exit);
}

if (exit) {
override(exit);
}
request.server._ext.invoke(request, 'onPreResponse', internals.onPreResponse);
};

self.server._ext.invoke(self, 'onPreResponse', function (err) {

delete self.response;
delete self.setState;
delete self.clearState;
internals.onPreResponse = function (err, request) {

if (err) { // err can be valid response override
override(err);
}
delete request.response;
delete request.setState;
delete request.clearState;

Response._respond(self._response, self, finalize);
});
};
if (err) { // err can be valid response override
internals.reply.override(request, err);
}

var override = function (value) {
Response._respond(request._response, request, internals.reply.finalize);
};

if (self._response &&
!self._response.isBoom &&
!self._response.varieties.error) {

// Got error after valid result was already set
internals.reply.override = function (request, value) {

self._route.cache.drop(self.url.path, function (err) {
if (request._response &&
!request._response.isBoom &&
!request._response.varieties.error) {

self.log(['hapi', 'cache', 'drop'], err);
});
}
// Got error after valid result was already set

self._setResponse(Response._generate(value));
};
request._route.cache.drop(request.url.path, function (err) {

var finalize = function () {
request.log(['hapi', 'cache', 'drop'], err);
});
}

self.server._dtrace.report('request.finalize', self._response);
if (self._response &&
((self._response.isBoom && self._response.response.code === 500) ||
(self._response.varieties && self._response.varieties.error && self._response._code === 500))) {
request._setResponse(Response._generate(value));
};

var error = (self._response.isBoom ? self._response : self._response._err);
self.server.emit('internalError', self, error);
self.log(['hapi', 'internal'], error);
}

self.server.emit('response', self);
internals.reply.finalize = function (request) {

self._isWagging = true;
delete self.addTail;
delete self.tail;
request.server._dtrace.report('request.finalize', request._response);
if (request._response &&
((request._response.isBoom && request._response.response.code === 500) ||
(request._response.varieties && request._response.varieties.error && request._response._code === 500))) {

if (Object.keys(self._tails).length === 0) {
self.server.emit('tail', self);
}
var error = (request._response.isBoom ? request._response : request._response._err);
request.server.emit('internalError', request, error);
request.log(['hapi', 'internal'], error);
}

self.raw.req.removeAllListeners();
self.raw.res.removeAllListeners();
};
request.server.emit('response', request);

request._isWagging = true;
delete request.addTail;
delete request.tail;

if (Object.keys(request._tails).length === 0) {
request.server.emit('tail', request);
}

process();
request.raw.req.removeAllListeners();
request.raw.res.removeAllListeners();
};


Expand Down
2 changes: 1 addition & 1 deletion lib/response/directory.js
Expand Up @@ -2,7 +2,7 @@

var Fs = require('fs');
var Path = require('path');
var Async = require('async');
var Async = require('../async');
var Generic = require('./generic');
var Redirection = require('./redirection');
var Text = require('./text');
Expand Down
1 change: 0 additions & 1 deletion lib/response/generic.js
@@ -1,7 +1,6 @@
// Load modules

var Stream = require('stream');
var Async = require('async');
var Negotiator = require('negotiator');
var Zlib = require('zlib');
var Headers = require('./headers');
Expand Down
2 changes: 1 addition & 1 deletion lib/response/index.js
Expand Up @@ -226,7 +226,7 @@ exports._respond = function (item, request, callback) {

request._response = response; // Error occurs late and should update request.response object
request.log(['hapi', 'response', response.variety]);
return callback();
return callback(request);
});
};

Expand Down

0 comments on commit 02e4f14

Please sign in to comment.