diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..e69de29b diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index dcb0d41d..cab6522a 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1,2 +1,4 @@ +- Add: allow access entities using NGSIv2 API for non_signal rules (new setting nonSignalByAPI / PERSEO_CHECK_NON_SIGNAL_BY_API) (#549) - Remove support for ngsv1 notifications (#714) - Remove ngsiv1 support for updateAction (#714) + diff --git a/bin/perseo b/bin/perseo index 4956ad37..0d6f79bb 100755 --- a/bin/perseo +++ b/bin/perseo @@ -107,7 +107,8 @@ function loadConfiguration() { 'PERSEO_AUTHENTICATION_USER', 'PERSEO_AUTHENTICATION_PASSWORD', 'PERSEO_AUTHENTICATION_SERVICE', - 'PERSEO_MAX_RULES_BY_CORR' + 'PERSEO_MAX_RULES_BY_CORR', + 'PERSEO_CHECK_NON_SIGNAL_BY_API' ]; const protectedVariables = [ @@ -234,6 +235,9 @@ function loadConfiguration() { if (process.env.PERSEO_MAX_RULES_BY_CORR) { config.maxRulesByCorr = process.env.PERSEO_MAX_RULES_BY_CORR; } + if (process.env.PERSEO_CHECK_NON_SIGNAL_BY_API) { + config.nonSignalByAPI = process.env.PERSEO_CHECK_NON_SIGNAL_BY_API; + } } loadConfiguration(); diff --git a/config.js b/config.js index ea8e1c39..c9ae4220 100644 --- a/config.js +++ b/config.js @@ -232,4 +232,10 @@ config.castTypes = false; */ config.maxRulesByCorr = 20; +/** + * Check nonSignal rules using API ngsiv2 or just access to mongo + * @type {Boolean} + */ +config.nonSignalByAPI = false; + module.exports = config; diff --git a/docs/admin/configuration.md b/docs/admin/configuration.md index 81d6560f..02ce517c 100644 --- a/docs/admin/configuration.md +++ b/docs/admin/configuration.md @@ -49,8 +49,13 @@ The following table shows the environment variables available for Perseo configu | PERSEO_AUTHENTICATION_USER | User to perform authentication | | PERSEO_AUTHENTICATION_PASSWORD | Password for the user to perform authentication | | PERSEO_AUTHENTICATION_SERVICE | Keystone User Service to perform authentication | -| PERSEO_CAST_TYPE | If true, enable attribute value casting based in NGSI-v2 attribute types if true. If false (default), the JSON native type for the attribute value is used. | -| PERSEO_MAX_RULES_BY_CORR | Maximum number of rule executions triggered by the same notification (with the same correlator) until refuse execute a rule. Default is 20. See [loop detection tips](https://github.com/telefonicaid/perseo-fe/blob/master/docs/architecture/architecture.md#loop-detection) +| PERSEO_CAST_TYPE | Maximum number of rule executions triggered by the same notification (with the same correlator) until refuse execute a rule. Default is 20. See [loop detection tips](https://github.com/telefonicaid/perseo-fe/blob/master/docs/architecture/architecture.md#loop-detection) | +| PERSEO_MAX_RULES_BY_CORR | If true, enable attribute value casting based in NGSI-v2 attribute types if true. If false (default), the JSON native type for the attribute value is used. | +| PERSEO_CHECK_NON_SIGNAL_BY_API | If false, the entities will be directly find using MongoDB, if true they will be fing using NGSIv2 API(*) + +(*)Although the idea is that now Perseo works always using NGSIv2 API to check times for non signal rules, this setting has been introduced to ease a potential rollback if some problem is found with this new approach. Thus, I has to be considered a kind of temporal debugging settings, to be removed when we get confident with the new functionality, thus you shouldn't use it with care. + + ### Basic Configuration @@ -89,6 +94,7 @@ In order to have perseo running, there are several basic pieces of information t - `config.checkDB.bufferMaxEntries`: Number of operations buffered up before giving up on getting a working connection (see [database aspects](admin.md#database-aspects) documentation for mode detail). - `config.maxRulesByCorr`: Maximum number of rule executions triggered by the same notification (with the same correlator) until refuse execute a rule. Defualt is 20 +- `config.nonSignalByAPI`: Flag value, determine the use of the NGSIv2 API or MongoDB. By default is false. Options for HA: diff --git a/lib/models/entitiesStore.js b/lib/models/entitiesStore.js index b279936d..6d0f775c 100644 --- a/lib/models/entitiesStore.js +++ b/lib/models/entitiesStore.js @@ -28,15 +28,108 @@ var async = require('async'), entitiesCollectionName = require('../../config').orionDb.collection, myutils = require('../myutils'), constants = require('../constants'), - logger = require('logops'); + logger = require('logops'), + ngsi = require('ngsijs'), + context = { op: 'entitiesStore', comp: constants.COMPONENT_NAME }; function orionServiceDb(service) { return appContext.OrionDb(config.orionDb.prefix + '-' + service); } -function findSilentEntities(service, subservice, ruleData, func, callback) { +function createFilter(ruleData, service, subservice, limit, offset) { + var filter = { + service: service, + servicepath: subservice, + type: ruleData.type, + mq: ruleData.attribute + '.dateModified<' + (Date.now() / 1000 - ruleData.reportInterval).toString(), + limit: limit, + offset: offset + }; + if (ruleData.id) { + filter.id = ruleData.id; + } else if (ruleData.idRegexp) { + filter.idPattern = ruleData.idRegexp; + } + return filter; +} + +function createConnection(service, subservice) { + var options = { + service: service, + servicepath: subservice + }; + options.headers = {}; + // if (token !== null) { + // options.headers[constants.AUTH_HEADER] = token; + // } + // Add correlator + var domain = process.domain; + if (domain && domain.context) { + options.headers[constants.CORRELATOR_HEADER] = domain.context.corr; + // Add other headers + if (domain.context.srv && options.headers[constants.SERVICE_HEADER] === undefined) { + options.headers[constants.SERVICE_HEADER] = domain.context.srv; + } + if (domain.context.subsrv && options.headers[constants.SUBSERVICE_HEADER] === undefined) { + options.headers[constants.SUBSERVICE_HEADER] = domain.context.subsrv; + } + if (domain.context.from && options.headers[constants.REALIP_HEADER] === undefined) { + options.headers[constants.REALIP_HEADER] = domain.context.from; + } + } + return new ngsi.Connection(config.orion.URL, options); +} + +function findSilentEntitiesByAPIWithPagination(connection, filter, alterFunc, callback) { + // https://ficodes.github.io/ngsijs/stable/NGSI.Connection.html#.%22v2.listEntities%22__anchor + connection.v2.listEntities(filter).then( + (response) => { + // Entities retrieved successfully + // response.correlator transaction id associated with the server response + // response.limit contains the used page size + // response.results is an array with the retrieved entities + // response.offset contains the offset used in the request + response.results.forEach((entity) => { + logger.debug(context, 'silent entity %j', entity); + alterFunc(entity); + }); + logger.debug(context, 'findSilentEntities %s', myutils.firstChars(response.results)); + // Check if there are more entities to retrieve + if (response.count > filter.limit + filter.offset) { + // Call the function again with updated offset + filter.offset += filter.limit; + findSilentEntitiesByAPIWithPagination(connection, filter, alterFunc, callback); + } else { + // All entities have been retrieved and processed, call the callback + callback(null, response.results); + } + }, + (error) => { + logger.warn('error v2.listEntities: %j trying list entities using filter %j', error, filter); + callback(error, null); + } + ); +} + +function findSilentEntitiesByAPI(service, subservice, ruleData, alterFunc, callback) { + var limit = 20; + var offset = 0; + var connection = createConnection(service, subservice); + var filter = createFilter(ruleData, service, subservice, limit, offset); + + logger.info( + 'find silent entities by API ngsi using options %j and filter %j and rule %j', + connection, + filter, + ruleData + ); + + // Call the pagination function + findSilentEntitiesByAPIWithPagination(connection, filter, alterFunc, callback); +} + +function findSilentEntitiesByMongo(service, subservice, ruleData, alterFunc, callback) { var db, - context = { op: 'checkNoSignal', comp: constants.COMPONENT_NAME }, criterion = {}; db = orionServiceDb(service); @@ -54,17 +147,20 @@ function findSilentEntities(service, subservice, ruleData, func, callback) { } } else { //Added default else clause - logger.debug('findSilentEntities() - Default else clause'); + logger.debug(context, 'findSilentEntities() - Default else clause'); } if (ruleData.type) { criterion['_id.type'] = ruleData.type; } logger.debug(context, 'findSilentEntities criterion %j', criterion); + + // Variable to store the count of entities + var entityCount = 0; + async.waterfall( [ db.collection.bind(db, entitiesCollectionName, { strict: true }), function(col, cb) { - var count = 0; col.find(criterion) .batchSize(config.orionDb.batchSize) .each(function(err, one) { @@ -72,12 +168,13 @@ function findSilentEntities(service, subservice, ruleData, func, callback) { return cb(err, null); } if (one === null) { - //cursor exhausted - return cb(err, 'silent ones count ' + count); + // Cursor exhausted + return cb(err, 'silent ones count ' + entityCount); } logger.debug(context, 'silent entity %j', one._id); - func(one); - count++; + alterFunc(one); + // Increment the count of entities + entityCount++; }); } ], @@ -88,6 +185,32 @@ function findSilentEntities(service, subservice, ruleData, func, callback) { ); } -module.exports = { - FindSilentEntities: findSilentEntities -}; +function findSilentEntities(service, subservice, ruleData, alterFunc, callback) { + // Start the timer + var hrstart = process.hrtime(); + var method = !config.nonSignalByAPI ? 'findSilentEntitiesByMongo' : 'findSilentEntitiesByAPI'; + + // Function to be called when silent entities search is completed + var timedCallback = function(err, result) { + // Stop the timer + var hrend = process.hrtime(hrstart); + + // Log the execution time in seconds and milliseconds + logger.debug('%s has found %d entities in (hr): %d ms', method, result.length, hrend[1] / 1000000); + + // Call the original callback + callback(err, result); + }; + if (!config.nonSignalByAPI) { + return findSilentEntitiesByMongo(service, subservice, ruleData, alterFunc, timedCallback); + } else { + return findSilentEntitiesByAPI(service, subservice, ruleData, alterFunc, timedCallback); + } +} + +module.exports.FindSilentEntities = findSilentEntities; +module.exports.findSilentEntitiesByAPI = findSilentEntitiesByAPI; +module.exports.findSilentEntitiesByMongo = findSilentEntitiesByMongo; +module.exports.findSilentEntitiesByAPIWithPagination = findSilentEntitiesByAPIWithPagination; +module.exports.createConnection = createConnection; +module.exports.createFilter = createFilter; diff --git a/lib/models/noSignal.js b/lib/models/noSignal.js index 1de62fdc..a0e3cacd 100644 --- a/lib/models/noSignal.js +++ b/lib/models/noSignal.js @@ -63,36 +63,63 @@ function alertFunc(nsLineRule, entity) { d.exit(); }); d.run(function() { + logger.debug(context, 'alertfunc nsLineRule %j entity %j ', nsLineRule, entity); // We duplicate info in event and event.ev for VR and non-VR action parameters var event = { service: nsLineRule[SERVICE], subservice: nsLineRule[SUBSERVICE], ruleName: nsLineRule[NAME], reportInterval: nsLineRule[REPORT_INTERVAL], - id: entity._id.id, - type: entity._id.type, internalCurrentTime: new Date().toISOString() }; - - // Search for modDate of the entity's attribute - // and copy every attribute (if not in event yet) - // for use in action template - Object.keys(entity.attrs).forEach(function(attrName) { - if (attrName === nsLineRule[ATTRIBUTE]) { + if (!config.nonSignalByAPI) { + // entity is really a entity doc obtained from mongo + event.id = entity._id.id; + event.type = entity._id.type; + logger.debug(context, 'alertfunc event %j ', event); + // Search for modDate of the entity's attribute + // and copy every attribute (if not in event yet) + // for use in action template + Object.keys(entity.attrs).forEach(function(attrName) { + if (attrName === nsLineRule[ATTRIBUTE]) { + try { + lastTime = new Date(entity.attrs[attrName].modDate * 1000).toISOString(); + } catch (ex) { + myutils.logErrorIf(ex, 'run ', d.context); + } + } + if (event[attrName] === undefined) { + if (entity.attrs[attrName].type === 'DateTime') { + event[attrName] = new Date(entity.attrs[attrName].value * 1000).toISOString(); + } else { + event[attrName] = entity.attrs[attrName].value; + } + } + }); + } else { + // entity is and NGSI object + event.id = entity.id; + event.type = entity.type; + logger.debug(context, 'alertfunc event %j ', event); + // Search for modDate of the entity's attribute + // and copy every attribute (if not in event yet) + // for use in action template + const attrName = nsLineRule[ATTRIBUTE]; + if (entity[attrName]) { try { - lastTime = new Date(entity.attrs[attrName].modDate * 1000).toISOString(); + lastTime = entity[attrName].metadata.TimeInstant.value; } catch (ex) { myutils.logErrorIf(ex, 'run ', d.context); } - } - if (event[attrName] === undefined) { - if (entity.attrs[attrName].type === 'DateTime') { - event[attrName] = new Date(entity.attrs[attrName].value * 1000).toISOString(); - } else { - event[attrName] = entity.attrs[attrName].value; + if (event[attrName] === undefined) { + if (entity[attrName].type === 'DateTime') { + event[attrName] = entity[attrName].metadata.TimeInstant.value; + } else { + event[attrName] = entity[attrName].value; + } } } - }); + } logger.debug(context, 'lastTime could be ', lastTime); if (lastTime !== undefined && lastTime !== null) { @@ -119,6 +146,7 @@ function checkNoSignal(period) { currentContext.srv = 'n/a'; currentContext.subsrv = 'n/a'; logger.debug(currentContext, 'Executing no-signal handler for period of %d (%d rules)', period, list.length); + list.forEach(function(nsrule) { currentContext.srv = nsrule[SERVICE]; currentContext.subsrv = nsrule[SUBSERVICE]; @@ -192,6 +220,7 @@ function addNSRule(service, subservice, name, nsr) { ); intervalAsNum = MIN_INTERVAL_MS; } + arrayRule = nsr2arr(service, subservice, name, nsr); nsRulesByInterval[nsr.checkInterval] = nsRulesByInterval[nsr.checkInterval] || []; nsRulesByInterval[nsr.checkInterval].forEach(function(element, index, array) { @@ -206,6 +235,7 @@ function addNSRule(service, subservice, name, nsr) { logger.debug(context, util.format('Adding no-signal rule (%s, %s, %s)', service, subservice, name)); } if (!checkers.hasOwnProperty(nsr.checkInterval)) { + logger.info(context, util.format('no-signal rule (%s, %s, %s)', service, subservice, name)); checkers[nsr.checkInterval] = setInterval(checkNoSignal, intervalAsNum, nsr.checkInterval); checkers[nsr.checkInterval].unref(); } diff --git a/test/unit/entitiesStore_utest.js b/test/unit/entitiesStore_utest.js new file mode 100644 index 00000000..b886ceb0 --- /dev/null +++ b/test/unit/entitiesStore_utest.js @@ -0,0 +1,147 @@ +/* + * Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U + * + * This file is part of perseo-fe + * + * perseo-fe is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * perseo-fe is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public + * License along with perseo-fe. + * If not, see http://www.gnu.org/licenses/. + * + * For those usages not covered by the GNU Affero General Public License + * please contact with iot_support at tid dot es + * + * Created by: Carlos Blanco - Future Internet Consulting and Development Solutions (FICODES) + */ + +'use strict'; + +var should = require('should'); +var rewire = require('rewire'); +var entitiesStore = rewire('../../lib/models/entitiesStore.js'); +var chai = require('chai'); +var sinon = require('sinon'); +var sinonChai = require('sinon-chai'); +var config = require('../../config.js'); +var assert = require('chai').assert; +chai.Should(); +chai.use(sinonChai); + +describe('entitiesStore', function() { + var ruleData = { + name: 'NSR2', + action: { + type: 'update', + parameters: { + id: 'alarma:${id}', + type: 'Alarm', + attributes: [ + { + name: 'msg', + value: 'El status de ${id} es ${status}' + } + ] + } + }, + subservice: '/', + service: 'unknownt', + nosignal: { + checkInterval: '1', + attribute: 'temperature', + reportInterval: '5', + id: 'thing:disp1', + idRegexp: null, + type: 'thing' + } + }, + alterFunc = 'sinon.stub()', + callback = function(e, request) { + should.exist(request); + should.not.exist(e); + should.equal(request.httpCode, 200); + }; + + describe('FindSilentEntities', function() { + it('By default should call findSilentEntitiesByMongo', function() { + var findSilentEntitiesByMongoSpy = sinon.spy(); + entitiesStore.__set__('findSilentEntitiesByMongo', findSilentEntitiesByMongoSpy); + entitiesStore.FindSilentEntities(ruleData.service, ruleData.subservice, ruleData, alterFunc, callback); + sinon.assert.calledOnce(findSilentEntitiesByMongoSpy); + }); + + it('If default settings are changed FindSilentEntitiesByAPI should be called', function() { + config.nonSignalByAPI = true; + var findSilentEntitiesByAPISpy = sinon.spy(); + entitiesStore.__set__('findSilentEntitiesByAPI', findSilentEntitiesByAPISpy); + entitiesStore.FindSilentEntities(); + sinon.assert.calledOnce(findSilentEntitiesByAPISpy); + }); + }); + + describe('findSilentEntitiesByAPI', function() { + it('should call findSilentEntitiesByAPIWithPagination', function() { + var findSilentEntitiesByAPIWithPaginationSpy = sinon.spy(); + var createConnectionStub = sinon.stub().returns({}); + var createFilterStub = sinon.stub().returns({}); + var alterFunc2 = sinon.stub(); + var callback2 = sinon.stub(); + + entitiesStore.__set__('findSilentEntitiesByAPIWithPagination', findSilentEntitiesByAPIWithPaginationSpy); + entitiesStore.__set__('createConnection', createConnectionStub); + entitiesStore.__set__('createFilter', createFilterStub); + + entitiesStore.findSilentEntitiesByAPI( + ruleData.service, + ruleData.subservice, + ruleData, + alterFunc2, + callback2 + ); + + sinon.assert.calledOnce(findSilentEntitiesByAPIWithPaginationSpy); + sinon.assert.calledOnce(createConnectionStub); + sinon.assert.calledOnce(createFilterStub); + }); + }); + + describe('createFilter', function() { + it('should correctly create filter', function() { + // Define input arguments + var service = 'testService'; + var subservice = 'testSubservice'; + var ruleData = { + type: 'testType', + attribute: 'testAttribute', + eportInterval: 3000, + id: 'testId' + }; + var limit = 20; + var offset = 0; + + var expectedFilter = { + service: service, + servicepath: subservice, + type: ruleData.type, + mq: ruleData.attribute + '.dateModified<' + (Date.now() / 1000 - ruleData.reportInterval).toString(), + limit: limit, + offset: offset, + id: ruleData.id + }; + + // Call the function + var resultFilter = entitiesStore.createFilter(ruleData, service, subservice, limit, offset); + + // Verify the result using assert.deepEqual + assert.deepStrictEqual(resultFilter, expectedFilter); + }); + }); +}); diff --git a/test/unit/nsr_utest.js b/test/unit/nsr_utest.js index a7a86a7a..4fae2376 100644 --- a/test/unit/nsr_utest.js +++ b/test/unit/nsr_utest.js @@ -47,6 +47,7 @@ describe('noSignal', function() { type: null } }; + describe('#addNSRule()', function() { it('should reject to add a rule with invalid check interval', function() { rule.nosignal.checkInterval = 'aserejé';