Skip to content

Commit

Permalink
Refactor to use domains to hold all request information
Browse files Browse the repository at this point in the history
  • Loading branch information
pratid committed Nov 9, 2015
1 parent 832626f commit 7e2449b
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 322 deletions.
2 changes: 1 addition & 1 deletion ngsi_adapter/src/.gjslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
--nomultiprocess
--debug_indentation
--time
--disable=110,212,217
--disable=1,110,212,217
--custom_jsdoc_tags=module,function,callback,abstract,constant,augments,memberof,returns
128 changes: 66 additions & 62 deletions ngsi_adapter/src/lib/adapter.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013 Telefónica I+D
* Copyright 2013-2015 Telefónica I+D
* All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand All @@ -19,7 +19,7 @@
/**
* Module that implements a HTTP asynchronous server processing requests for
* adaptation from raw monitoring data into NGSI format, then using the results
* to invoke Context Broker.
* to invoke ContextBroker.
*
* @module adapter
*/
Expand All @@ -40,46 +40,46 @@ var http = require('http'),


/**
* Asynchronously process POST requests and then invoke updateContext() on ContextBroker.
* Asynchronously process incoming requests and then invoke updateContext() on ContextBroker.
*
* @param {http.IncomingMessage} request The request to this server.
* @param {RequestCallback} callback The callback that handles the response.
* @param {Domain} reqdomain Domain handling request (includes context, timestamp, id, type, body & parser).
* @param {RequestCallback} callback The callback for responses from ContextBroker.
*/
function doPost(request, callback) {
function updateContext(reqdomain, callback) {
try {
domain.active.context.op = 'Parse';
logger.debug('Probe data "%s"', request.body);
var remoteUrl = url.parse(opts.brokerUrl);
var dataParser = request.parser;
var updateReqType = dataParser.getContentType();
var updateReqBody = dataParser.updateContextRequest(request);
var updateReqOpts = {
hostname: remoteUrl.hostname,
port: remoteUrl.port,
path: '/NGSI10/updateContext',
method: 'POST',
headers: {
'Accept': updateReqType,
'Content-Type': updateReqType,
'Content-Length': updateReqBody.length
}
};
updateReqOpts.headers[common.txIdHttpHeader] = domain.active.context.trans;
reqdomain.context.op = 'Parse';
logger.debug('Probe data "%s"', reqdomain.body);
var parser = reqdomain.parser,
remoteUrl = url.parse(opts.brokerUrl),
updateReqType = parser.getContentType(),
updateReqBody = parser.updateContextRequest(reqdomain),
updateReqOpts = {
hostname: remoteUrl.hostname,
port: remoteUrl.port,
path: '/NGSI10/updateContext',
method: 'POST',
headers: {
'Accept': updateReqType,
'Content-Type': updateReqType,
'Content-Length': updateReqBody.length
}
};
updateReqOpts.headers[common.txIdHttpHeader] = reqdomain.context.trans;
/* jshint unused: false */
var operation = retry.operation({ retries: opts.retries });
operation.attempt(function(currentAttempt) {
domain.active.context.op = 'UpdateContext';
reqdomain.context.op = 'UpdateContext';
logger.info('Request to ContextBroker at %s...', opts.brokerUrl);
logger.debug('%s', { toString: function() {
return updateReqBody.split('\n').map(function(line) {return line.trim();}).join('');
logger.debug('%s', { toString: function () {
return updateReqBody.split('\n').map(function (line) {return line.trim();}).join('');
}});
var updateReq = http.request(updateReqOpts, function(response) {
var updateReq = http.request(updateReqOpts, function (response) {
var responseBody = '';
response.setEncoding('utf8');
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
response.on('end', function () {
callback(null, response.statusCode, responseBody);
});
});
Expand All @@ -102,70 +102,74 @@ function doPost(request, callback) {
* Callback for requests to updateContext().
*
* @callback RequestCallback
* @param {Error} err The error ocurred in request, or null.
* @param {Number} [responseStatus] The response status code.
* @param {String} [responseBody] The response body contents.
* @param {Error} err The error occurred in request, or null.
* @param {Number} [responseStatus] The response status code.
* @param {String} [responseBody] The response body contents.
*/
function callback(err, responseStatus, responseBody) {
function updateContextCallback(err, responseStatus, responseBody) {
if (err) {
logger.error(err.message);
} else {
logger.info('Response status %d %s', responseStatus, http.STATUS_CODES[responseStatus]);
logger.debug('%s', { toString: function() {
return responseBody.split('\n').map(function(line) {return line.trim();}).join('');
}});
logger.debug('%s', {
toString: function () {
return responseBody.split('\n').map(function (line) {
return line.trim();
}).join('');
}
});
}
}


/**
* Server requests listener.
* HTTP requests listener.
*
* Request URL looks like `http://host:port/path?query`:
*
* - Request query string MUST include arguments `id` and `type`
* - Request path will denote the name of the originating probe
* - Request headers may include a transaction identifier ({@link common#txIdHttpHeader})
*
* @param {http.IncomingMessage} request The request to this server.
* @param {http.ServerResponse} response The response from this server.
* @param {http.IncomingMessage} request The HTTP request to this server.
* @param {http.ServerResponse} response The HTTP response from this server.
*/
function asyncRequestListener(request, response) {
var reqd = domain.create();
reqd.add(request);
reqd.add(response);
reqd.context = {
var reqdomain = domain.create();
reqdomain.add(request);
reqdomain.add(response);
reqdomain.context = {
trans: request.headers[common.txIdHttpHeader.toLowerCase()] || cuid(),
op: request.method
};
reqd.on('error', function(err) {
reqdomain.on('error', function (err) {
logger.error(err.message);
response.writeHead(500); // server error
response.end();
});
reqd.run(function() {
reqdomain.run(function () {
logger.info('Request on resource %s', request.url.split('?').join(' with params '));
var status = 405; // not allowed
if (request.method === 'POST') {
var query = url.parse(request.url, true).query;
var entityId = query.id;
var entityType = query.type;
reqdomain.entityId = query.id;
reqdomain.entityType = query.type;
try {
status = 400; // bad request
if (!entityId || !entityType) {
if (!reqdomain.entityId || !reqdomain.entityType) {
throw new Error('Missing entityId and/or entityType');
}
status = 404; // not found
request.parser = parser.getParser(request);
reqdomain.parser = parser.getParser(request);
status = 200; // ok
request.timestamp = Date.now();
request.body = '';
request.on('data', function(chunk) {
request.body += chunk;
reqdomain.timestamp = Date.now();
reqdomain.body = '';
request.on('data', function (chunk) {
reqdomain.body += chunk;
});
request.on('end', function() {
process.nextTick(function() {
doPost(request, exports.requestCallback);
request.on('end', function () {
process.nextTick(function () {
updateContext(reqdomain, exports.updateContextCallback);
});
});
} catch (err) {
Expand All @@ -183,20 +187,20 @@ function asyncRequestListener(request, response) {
* Server main.
*/
function main() {
process.on('uncaughtException', function(err) {
process.on('uncaughtException', function (err) {
logger.error({op: 'Exit'}, err.message);
process.exit(1);
});
process.on('exit', function() {
process.on('exit', function () {
logger.info({op: 'Exit'}, 'Server stopped');
});
process.on('SIGINT', function() {
process.on('SIGINT', function () {
process.exit();
});
process.on('SIGTERM', function() {
process.on('SIGTERM', function () {
process.exit();
});
http.createServer(asyncRequestListener).listen(opts.listenPort, opts.listenHost, function() {
http.createServer(asyncRequestListener).listen(opts.listenPort, opts.listenHost, function () {
logger.info({op: 'Init'}, 'Server listening at http://%s:%d/', this.address().address, this.address().port);
});
}
Expand All @@ -206,7 +210,7 @@ function main() {
exports.main = main;

/** @export */
exports.requestCallback = callback;
exports.updateContextCallback = updateContextCallback;


if (require.main === module) {
Expand Down
50 changes: 26 additions & 24 deletions ngsi_adapter/src/lib/parsers/common/base.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013 Telefónica I+D
* Copyright 2013-2015 Telefónica I+D
* All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand Down Expand Up @@ -52,31 +52,33 @@ baseParser.timestampAttrName = '_timestamp';
* @memberof baseParser
* @returns {String} The content type (the format) for Context Broker requests.
*/
baseParser.getContentType = function() {
baseParser.getContentType = function () {
return 'application/xml';
};


/**
* Gets the updateContext() request body.
* Returns the updateContext() request body.
*
* @function updateContextRequest
* @memberof baseParser
* @this baseParser
* @param {http.IncomingMessage} request The HTTP request to this server.
* @param {Domain} reqdomain Domain handling current request (includes context, timestamp, id, type, body & parser).
* @returns {String} The request body, either in XML or JSON format.
*/
baseParser.updateContextRequest = function(request) {
var query = url.parse(request.url, true).query;
var entityId = query.id;
var entityType = query.type;
var entityData = this.parseRequest(request);
var entityAttrs = this.getContextAttrs(entityData);
baseParser.updateContextRequest = function (reqdomain) {
var entityId = reqdomain.entityId,
entityType = reqdomain.entityType,
entityData = this.parseRequest(reqdomain.body),
entityAttrs = this.getContextAttrs(entityData);

if (Object.keys(entityAttrs).length === 0) {
throw new Error('Missing entity context attributes');
}

// feature #4: automatically add request timestamp to entity attributes
entityAttrs[this.timestampAttrName] = request.timestamp;
entityAttrs[this.timestampAttrName] = reqdomain.timestamp;

return (this.getContentType() === 'application/xml') ?
this.getUpdateContextXML(entityId, entityType, entityAttrs) :
this.getUpdateContextJSON(entityId, entityType, entityAttrs);
Expand All @@ -89,10 +91,10 @@ baseParser.updateContextRequest = function(request) {
* @abstract
* @function parseRequest
* @memberof baseParser
* @param {http.IncomingMessage} request The HTTP request to this server.
* @returns {EntityData} An object holding entity data taken from request body.
* @param {String} requestMsg The data message included in request being processed.
* @returns {EntityData} An object holding entity data taken from request message.
*/
baseParser.parseRequest = function(request) {
baseParser.parseRequest = function (requestMsg) {
throw new Error('Must implement');
};

Expand All @@ -103,10 +105,10 @@ baseParser.parseRequest = function(request) {
* @abstract
* @function getContextAttrs
* @memberof baseParser
* @param {EntityData} data Object holding raw entity data.
* @param {EntityData} data Object holding raw entity data.
* @returns {Object} Context attributes.
*/
baseParser.getContextAttrs = function(data) {
baseParser.getContextAttrs = function (data) {
throw new Error('Must implement');
};

Expand All @@ -116,12 +118,12 @@ baseParser.getContextAttrs = function(data) {
*
* @function getUpdateContextJSON
* @memberof baseParser
* @param {String} id The entity identifier.
* @param {String} type The entity type.
* @param {Object} attrs The entity context attributes.
* @param {String} id The entity identifier.
* @param {String} type The entity type.
* @param {Object} attrs The entity context attributes.
* @returns {String} The request body in JSON format.
*/
baseParser.getUpdateContextJSON = function(id, type, attrs) {
baseParser.getUpdateContextJSON = function (id, type, attrs) {
throw new Error('TO-DO');
};

Expand All @@ -131,12 +133,12 @@ baseParser.getUpdateContextJSON = function(id, type, attrs) {
*
* @function getUpdateContextXML
* @memberof baseParser
* @param {String} id The entity identifier.
* @param {String} type The entity type.
* @param {Object} attrs The entity context attributes.
* @param {String} id The entity identifier.
* @param {String} type The entity type.
* @param {Object} attrs The entity context attributes.
* @returns {String} The request body in XML format.
*/
baseParser.getUpdateContextXML = function(id, type, attrs) {
baseParser.getUpdateContextXML = function (id, type, attrs) {
var result = '';
result += '<?xml version="1.0" encoding="UTF-8"?>\n';
result += '<updateContextRequest>\n';
Expand Down
32 changes: 23 additions & 9 deletions ngsi_adapter/src/lib/parsers/common/factory.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013 Telefónica I+D
* Copyright 2013-2015 Telefónica I+D
* All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand Down Expand Up @@ -31,23 +31,37 @@ var url = require('url'),
path = require('path');


/**
* Parser factory: returns a dynamically loaded parser object at `lib/parsers/{name}`.
*
* @param {String} name The name of the parser.
* @returns {Object} The parser been loaded according to given name.
*/
function getParserByName(name) {
var moduleName = util.format('../%s', name);
try {
return require(moduleName).parser;
} catch (err) {
var modulePath = path.normalize(__dirname + path.sep + moduleName + '.js');
throw new Error(util.format('Parser from module "%s" could not be loaded', modulePath));
}
}


/**
* Parser factory: given a request URL `http://host:port/path?query`, takes `path` as
* the name of the probe whose data (request body) will be parsed. Tries to dynamically
* load a parser object from module at `lib/parsers/` directory named after the probe.
* load a parser object from module named after the probe.
*
* @param {http.IncomingMessage} request The request to this server.
* @param {http.IncomingMessage} request The request to this server.
* @returns {Object} The parser been loaded according to probe mentioned in request.
*/
function getParser(request) {
var probeName = url.parse(request.url).pathname.slice(1),
moduleName = util.format('../%s', probeName);
var name = url.parse(request.url).pathname.slice(1);
try {
return require(moduleName).parser;
return getParserByName(name);
} catch (err) {
var modulePath = path.normalize(__dirname + path.sep + moduleName + '.js');
throw new Error((!probeName) ? 'Missing resource in request' :
util.format('Unknown probe "%s" (no parser module "%s" loaded)', probeName, modulePath));
throw new Error((!name) ? 'Missing resource in request' : util.format('Unknown probe "%s" (%s)', name, err));
}
}

Expand Down
Loading

0 comments on commit 7e2449b

Please sign in to comment.