Skip to content

Commit

Permalink
feat(server): emit after event when both request is flushed and last …
Browse files Browse the repository at this point in the history
…handler is finished
  • Loading branch information
Peter Marton committed Feb 22, 2018
1 parent 891e737 commit d4d4951
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 123 deletions.
5 changes: 5 additions & 0 deletions docs/guides/6to7guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ server.get('/hello/:name', restify.plugins.conditionalHandler([
// 'accept-version': '3.x', accept: 'application/json' => '3.x, json'
```
### After event fires when both request is flushed and last handler is finished
In 7.x `after` event fires after both request is flushed
and last handler is finished.
### Metrics plugin latency
In 7.x Metrics plugin's `latency` is calculated when the request is
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function createMetrics(opts, callback) {
// REST verb
method: req.method,
// overall request latency
latency: hrTimeDurationInMs(req._time, process.hrtime()),
latency: hrTimeDurationInMs(req._time, req._timeFlushed),
preLatency: hrTimeDurationInMs(req._timePreStart, req._timePreEnd),
useLatency: hrTimeDurationInMs(req._timeUseStart, req._timeUseEnd),
routeLatency: hrTimeDurationInMs(
Expand Down
73 changes: 60 additions & 13 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,13 @@ Server.prototype._afterPre = function _afterPre(err, req, res) {
// Handle error
if (err) {
self._onHandlerError(err, req, res);
self._finishReqResCycle(req, res, err);
return;
}

// Stop
if (err === false) {
self._onHandlerStop(req, res);
return;
}

Expand Down Expand Up @@ -935,16 +937,17 @@ Server.prototype._runRoute = function _runRoute(req, res) {
Server.prototype._afterRoute = function _afterRoute(err, req, res) {
var self = this;

res._handlersFinished = true;

// Handle error
if (err) {
self._onHandlerError(err, req, res);
self._finishReqResCycle(req, res, err);
return;
}

// Stop
if (err === false) {
return;
}
// Trigger finish
self._finishReqResCycle(req, res, err);
};

/**
Expand Down Expand Up @@ -993,17 +996,35 @@ Server.prototype._afterUse = function _afterUse(err, req, res, next) {
// Handle error
if (err) {
self._onHandlerError(err, req, res);
self._finishReqResCycle(req, res, err);
return;
}

// Stop
if (err === false) {
self._onHandlerStop(req, res);
return;
}

next();
};

/**
* Runs after next(false) is called
*
* @private
* @memberof Server
* @instance
* @function _onHandlerStop
* @param {Request} req - request
* @param {Response} res - response
* @returns {undefined} no return value
*/
Server.prototype._onHandlerStop = function _onHandlerStop(req, res) {
res._handlersFinished = true;
this._finishReqResCycle(req, res);
};

/**
* After route handlers finished
* NOTE: only called when last handler calls next([err])
Expand Down Expand Up @@ -1041,13 +1062,13 @@ Server.prototype._onHandlerError = function _onHandlerError(err, req, res) {
return;
}

res._handlersFinished = true;

// Preserve handler err for finish event
res.err = res.err || err;

// Error happened in router handlers
if (err) {
self._routeErrorResponse(req, res, err);
}
self._routeErrorResponse(req, res, err);
};

/**
Expand Down Expand Up @@ -1079,6 +1100,8 @@ Server.prototype._setupRequest = function _setupRequest(req, res) {
res.formatters = self.formatters;
res.req = req;
res.serverName = self.name;
res._handlersFinished = false;
res._flushed = false;

// set header only if name isn't empty string
if (self.name !== '') {
Expand All @@ -1089,7 +1112,11 @@ Server.prototype._setupRequest = function _setupRequest(req, res) {
// attach a listener for 'close' events, this will let us set
// a flag so that we can stop processing the request if the client closes
// the connection (or we lose the connection).
// we consider a closed request as flushed from metrics point of view
function onClose() {
res._flushed = true;
req._timeFlushed = process.hrtime();

res.removeListener('finish', onFinish);
res.removeListener('error', onError);
req._connectionState = 'close';
Expand All @@ -1100,17 +1127,27 @@ Server.prototype._setupRequest = function _setupRequest(req, res) {

// Response lifecycle events
function onFinish() {
// Response may get flushed before handler callback is triggered
var processHrTime = process.hrtime();

res._flushed = true;
req._timeFlushed = processHrTime;

// Response may get flushed before handler callback is triggered
req._timeFlushed = processHrTime;
req._timePreEnd = req._timePreEnd || processHrTime;
req._timeUseEnd = req._timeUseEnd || processHrTime;
req._timeRouteEnd = req._timeRouteEnd || processHrTime;

req.removeListener('close', onClose);
res.removeListener('error', onError);
self._finishReqResCycle(req, res, res.err);

// Do not trigger false
self._finishReqResCycle(req, res);
}
function onError(err) {
res._flushed = true;
req._timeFlushed = process.hrtime();

req.removeListener('close', onClose);
res.removeListener('finish', onFinish);
self._finishReqResCycle(req, res, err);
Expand Down Expand Up @@ -1149,11 +1186,21 @@ Server.prototype._finishReqResCycle = function _finishReqResCycle(
var self = this;
var route = req.route; // can be undefined when 404 or error

// after event has signature of function(req, res, route, err) {...}
self.emit('after', req, res, route, err);
if (res._finished) {
return;
}

if (res._flushed && res._handlersFinished) {
// decrement number of requests
self._inflightRequests--;
res._finished = true;

// decrement number of requests
self._inflightRequests--;
// after event has signature of function(req, res, route, err) {...}
self.emit('after', req, res, route, err || res.err);
} else {
// preserve error for actual finish
res.err = err;
}
};

/**
Expand Down
40 changes: 25 additions & 15 deletions test/plugins/audit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ describe('audit logger', function() {
// don't sporadically fail due to timing issues.
});

CLIENT.get('/audit', function(err, req, res) {
assert.ifError(err);

SERVER.on('after', function() {
var record = ringbuffer.records && ringbuffer.records[0];

// check timers
Expand Down Expand Up @@ -237,6 +235,10 @@ describe('audit logger', function() {
);
done();
});

CLIENT.get('/audit', function(err, req, res) {
assert.ifError(err);
});
});

it('should log anonymous handler timers', function(done) {
Expand Down Expand Up @@ -295,9 +297,7 @@ describe('audit logger', function() {
}
);

CLIENT.get('/audit', function(err, req, res) {
assert.ifError(err);

SERVER.on('after', function() {
// check timers
var record = ringbuffer.records && ringbuffer.records[0];
assert.ok(record, 'no log records');
Expand Down Expand Up @@ -362,6 +362,10 @@ describe('audit logger', function() {
);
done();
});

CLIENT.get('/audit', function(err, req, res) {
assert.ifError(err);
});
});

it('restify-GH-1435 should accumulate log handler timers', function(done) {
Expand Down Expand Up @@ -401,9 +405,7 @@ describe('audit logger', function() {
// don't sporadically fail due to timing issues.
});

CLIENT.get('/audit', function(err, req, res) {
assert.ifError(err);

SERVER.on('after', function() {
var record = ringbuffer.records && ringbuffer.records[0];

// check timers
Expand All @@ -427,6 +429,10 @@ describe('audit logger', function() {
);
done();
});

CLIENT.get('/audit', function(err, req, res) {
assert.ifError(err);
});
});

it('restify-GH-812 audit logger has query params string', function(done) {
Expand Down Expand Up @@ -455,9 +461,7 @@ describe('audit logger', function() {
next();
});

CLIENT.get('/audit?a=1&b=2', function(err, req, res) {
assert.ifError(err);

SERVER.on('after', function() {
// check timers
assert.ok(ringbuffer.records[0], 'no log records');
assert.equal(
Expand All @@ -468,6 +472,10 @@ describe('audit logger', function() {
assert.ok(ringbuffer.records[0].req.query, 'a=1&b=2');
done();
});

CLIENT.get('/audit?a=1&b=2', function(err, req, res) {
assert.ifError(err);
});
});

it('restify-GH-812 audit logger has query params obj', function(done) {
Expand Down Expand Up @@ -499,9 +507,7 @@ describe('audit logger', function() {
}
]);

CLIENT.get('/audit?a=1&b=2', function(err, req, res) {
assert.ifError(err);

SERVER.on('after', function() {
// check timers
assert.ok(ringbuffer.records[0], 'no log records');
assert.equal(
Expand All @@ -515,6 +521,10 @@ describe('audit logger', function() {
});
done();
});

CLIENT.get('/audit?a=1&b=2', function(err, req, res) {
assert.ifError(err);
});
});

it('should work with pre events', function(done) {
Expand Down
15 changes: 10 additions & 5 deletions test/plugins/bodyReader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ describe('body reader', function() {
SERVER.use(restify.plugins.bodyParser());

SERVER.post('/compressed', function(req, res, next) {
res.send(200, { inflightRequests: SERVER.inflightRequests() });
res.send('ok');
next();
});

Expand All @@ -142,7 +142,7 @@ describe('body reader', function() {
var options = {
hostname: '127.0.0.1',
port: PORT,
path: '/compressed',
path: '/compressed?v=1',
method: 'POST',
headers: {
'Content-Type': 'application/json',
Expand All @@ -156,6 +156,13 @@ describe('body reader', function() {
assert.isNotOk(res);
});

SERVER.on('after', function(req2) {
if (req2.href() === '/compressed?v=2') {
assert.equal(SERVER.inflightRequests(), 0);
done();
}
});

// will get a req error after 100ms timeout
req.on('error', function(e) {
// make another request to verify in flight request is only 1
Expand All @@ -165,15 +172,13 @@ describe('body reader', function() {
});

CLIENT.post(
'/compressed',
'/compressed?v=2',
{
apple: 'red'
},
function(err, _, res, obj) {
assert.ifError(err);
assert.equal(res.statusCode, 200);
assert.equal(obj.inflightRequests, 1);
done();
}
);
});
Expand Down
18 changes: 13 additions & 5 deletions test/plugins/metrics.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ describe('request metrics plugin', function() {
// but setTimeout is happening on the server, tolerate 10ms
assert.isAtLeast(metrics.preLatency, 50);
assert.isAtLeast(metrics.useLatency, 50);
assert.equal(metrics.routeLatency, null);
assert.isAtLeast(metrics.routeLatency, 250);
assert.isAtLeast(metrics.latency, 200 - 10);
// latency should dbe lower as request timeouts
assert.isAbove(metrics.routeLatency, metrics.latency);
assert.equal(metrics.path, '/foo');
assert.equal(metrics.method, 'GET');
assert.equal(metrics.connectionState, 'close');
Expand All @@ -225,13 +227,19 @@ describe('request metrics plugin', function() {
}, 50);
});

SERVER.get('/foo', function(req, res, next) {
setTimeout(function() {
SERVER.get(
'/foo',
function(req, res, next) {
setTimeout(function() {
return next();
}, 250);
},
function(req, res, next) {
assert.fail('Client has already closed request');
res.send(202, 'hello world');
return next();
}, 250);
});
}
);

CLIENT.get('/foo?a=1', function(err, _, res) {
// request should timeout
Expand Down

0 comments on commit d4d4951

Please sign in to comment.