Skip to content

Commit

Permalink
Merge 860d818 into 4940ba7
Browse files Browse the repository at this point in the history
  • Loading branch information
AlvaroVega committed Jul 5, 2023
2 parents 4940ba7 + 860d818 commit 2e0ef01
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
- Add: allow access entities using NGSIv2 API for non_signal rules (new setting nonSignalByAPI / PERSEO_CHECK_NON_SIGNAL_BY_API) (#549)
- Remove support for ngsv1 notifications (#714)
- Remove ngsiv1 support for updateAction (#714)

6 changes: 5 additions & 1 deletion bin/perseo
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ function loadConfiguration() {
'PERSEO_AUTHENTICATION_USER',
'PERSEO_AUTHENTICATION_PASSWORD',
'PERSEO_AUTHENTICATION_SERVICE',
'PERSEO_MAX_RULES_BY_CORR'
'PERSEO_MAX_RULES_BY_CORR',
'PERSEO_CHECK_NON_SIGNAL_BY_API'
];

const protectedVariables = [
Expand Down Expand Up @@ -234,6 +235,9 @@ function loadConfiguration() {
if (process.env.PERSEO_MAX_RULES_BY_CORR) {
config.maxRulesByCorr = process.env.PERSEO_MAX_RULES_BY_CORR;
}
if (process.env.PERSEO_CHECK_NON_SIGNAL_BY_API) {
config.nonSignalByAPI = process.env.PERSEO_CHECK_NON_SIGNAL_BY_API;
}
}

loadConfiguration();
Expand Down
6 changes: 6 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,10 @@ config.castTypes = false;
*/
config.maxRulesByCorr = 20;

/**
* Check nonSignal rules using API ngsiv2 or just access to mongo
* @type {Boolean}
*/
config.nonSignalByAPI = false;

module.exports = config;
10 changes: 8 additions & 2 deletions docs/admin/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ The following table shows the environment variables available for Perseo configu
| PERSEO_AUTHENTICATION_USER | User to perform authentication |
| PERSEO_AUTHENTICATION_PASSWORD | Password for the user to perform authentication |
| PERSEO_AUTHENTICATION_SERVICE | Keystone User Service to perform authentication |
| PERSEO_CAST_TYPE | If true, enable attribute value casting based in NGSI-v2 attribute types if true. If false (default), the JSON native type for the attribute value is used. |
| PERSEO_MAX_RULES_BY_CORR | Maximum number of rule executions triggered by the same notification (with the same correlator) until refuse execute a rule. Default is 20. See [loop detection tips](https://github.com/telefonicaid/perseo-fe/blob/master/docs/architecture/architecture.md#loop-detection)
| PERSEO_CAST_TYPE | Maximum number of rule executions triggered by the same notification (with the same correlator) until refuse execute a rule. Default is 20. See [loop detection tips](https://github.com/telefonicaid/perseo-fe/blob/master/docs/architecture/architecture.md#loop-detection) |
| PERSEO_MAX_RULES_BY_CORR | If true, enable attribute value casting based in NGSI-v2 attribute types if true. If false (default), the JSON native type for the attribute value is used. |
| PERSEO_CHECK_NON_SIGNAL_BY_API | If false, the entities will be directly find using MongoDB, if true they will be fing using NGSIv2 API(*)

(*)Although the idea is that now Perseo works always using NGSIv2 API to check times for non signal rules, this setting has been introduced to ease a potential rollback if some problem is found with this new approach. Thus, I has to be considered a kind of temporal debugging settings, to be removed when we get confident with the new functionality, thus you should use it with care.



### Basic Configuration

Expand Down Expand Up @@ -89,6 +94,7 @@ In order to have perseo running, there are several basic pieces of information t
- `config.checkDB.bufferMaxEntries`: Number of operations buffered up before giving up on getting a working connection
(see [database aspects](admin.md#database-aspects) documentation for mode detail).
- `config.maxRulesByCorr`: Maximum number of rule executions triggered by the same notification (with the same correlator) until refuse execute a rule. Defualt is 20
- `config.nonSignalByAPI`: Flag value, determine the use of the NGSIv2 API or MongoDB. By default is false.

Options for HA:

Expand Down
146 changes: 134 additions & 12 deletions lib/models/entitiesStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,107 @@ var async = require('async'),
entitiesCollectionName = require('../../config').orionDb.collection,
myutils = require('../myutils'),
constants = require('../constants'),
logger = require('logops');
logger = require('logops'),
ngsi = require('ngsijs'),
context = { op: 'entitiesStore', comp: constants.COMPONENT_NAME };

function orionServiceDb(service) {
return appContext.OrionDb(config.orionDb.prefix + '-' + service);
}

function findSilentEntities(service, subservice, ruleData, func, callback) {
function createFilter(ruleData, service, subservice, limit, offset) {
var filter = {
service: service,
servicepath: subservice,
type: ruleData.type,
mq: ruleData.attribute + '.dateModified<' + (Date.now() / 1000 - ruleData.reportInterval).toString(),
limit: limit,
offset: offset
};
if (ruleData.id) {
filter.id = ruleData.id;
} else if (ruleData.idRegexp) {
filter.idPattern = ruleData.idRegexp;
}
return filter;
}

function createConnection(service, subservice) {
var options = {
service: service,
servicepath: subservice
};
options.headers = {};
// Add correlator
var domain = process.domain;
if (domain && domain.context) {
options.headers[constants.CORRELATOR_HEADER] = domain.context.corr;
// Add other headers
if (domain.context.srv && options.headers[constants.SERVICE_HEADER] === undefined) {
options.headers[constants.SERVICE_HEADER] = domain.context.srv;
}
if (domain.context.subsrv && options.headers[constants.SUBSERVICE_HEADER] === undefined) {
options.headers[constants.SUBSERVICE_HEADER] = domain.context.subsrv;
}
if (domain.context.from && options.headers[constants.REALIP_HEADER] === undefined) {
options.headers[constants.REALIP_HEADER] = domain.context.from;
}
}
return new ngsi.Connection(config.orion.URL, options);
}

function findSilentEntitiesByAPIWithPagination(connection, filter, alterFunc, callback) {
// https://ficodes.github.io/ngsijs/stable/NGSI.Connection.html#.%22v2.listEntities%22__anchor
connection.v2.listEntities(filter).then(
(response) => {
// Entities retrieved successfully
// response.correlator transaction id associated with the server response
// response.limit contains the used page size
// response.results is an array with the retrieved entities
// response.offset contains the offset used in the request
if (response.results) {
response.results.forEach((entity) => {
logger.debug(context, 'silent entity %j', entity);
alterFunc(entity);
});
}
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(response.results));
// Check if there are more entities to retrieve
if (response.count > filter.limit + filter.offset) {
// Call the function again with updated offset
filter.offset += filter.limit;
findSilentEntitiesByAPIWithPagination(connection, filter, alterFunc, callback);
} else {
// All entities have been retrieved and processed, call the callback
callback(null, response.results);
}
},
(error) => {
logger.warn('error v2.listEntities: %j trying list entities using filter %j', error, filter);
callback(error, null);
}
);
}

function findSilentEntitiesByAPI(service, subservice, ruleData, alterFunc, callback) {
var limit = 20;
var offset = 0;
var connection = createConnection(service, subservice);
var filter = createFilter(ruleData, service, subservice, limit, offset);

logger.info(
'find silent entities by API ngsi using options %j and filter %j and rule %j',
connection,
filter,
ruleData
);

// Call the pagination function
findSilentEntitiesByAPIWithPagination(connection, filter, alterFunc, callback);
}

function findSilentEntitiesByMongo(service, subservice, ruleData, alterFunc, callback) {
var db,
context = { op: 'checkNoSignal', comp: constants.COMPONENT_NAME },
criterion = {};

db = orionServiceDb(service);
Expand All @@ -54,30 +146,34 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
}
} else {
//Added default else clause
logger.debug('findSilentEntities() - Default else clause');
logger.debug(context, 'findSilentEntities() - Default else clause');
}
if (ruleData.type) {
criterion['_id.type'] = ruleData.type;
}
logger.debug(context, 'findSilentEntities criterion %j', criterion);

// Variable to store the count of entities
var entityCount = 0;

async.waterfall(
[
db.collection.bind(db, entitiesCollectionName, { strict: true }),
function(col, cb) {
var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.each(function(err, one) {
if (err) {
return cb(err, null);
}
if (one === null) {
//cursor exhausted
return cb(err, 'silent ones count ' + count);
// Cursor exhausted
return cb(err, 'silent ones count ' + entityCount);
}
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
alterFunc(one);
// Increment the count of entities
entityCount++;
});
}
],
Expand All @@ -88,6 +184,32 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
);
}

module.exports = {
FindSilentEntities: findSilentEntities
};
function findSilentEntities(service, subservice, ruleData, alterFunc, callback) {
// Start the timer
var hrstart = process.hrtime();
var method = !config.nonSignalByAPI ? 'findSilentEntitiesByMongo' : 'findSilentEntitiesByAPI';

// Function to be called when silent entities search is completed
var timedCallback = function(err, result) {
// Stop the timer
var hrend = process.hrtime(hrstart);

// Log the execution time in seconds and milliseconds
logger.info('%s has found %d entities in (hr): %d ms', method, result.length, hrend[1] / 1000000);

// Call the original callback
callback(err, result);
};
if (!config.nonSignalByAPI) {
return findSilentEntitiesByMongo(service, subservice, ruleData, alterFunc, timedCallback);
} else {
return findSilentEntitiesByAPI(service, subservice, ruleData, alterFunc, timedCallback);
}
}

module.exports.FindSilentEntities = findSilentEntities;
module.exports.findSilentEntitiesByAPI = findSilentEntitiesByAPI;
module.exports.findSilentEntitiesByMongo = findSilentEntitiesByMongo;
module.exports.findSilentEntitiesByAPIWithPagination = findSilentEntitiesByAPIWithPagination;
module.exports.createConnection = createConnection;
module.exports.createFilter = createFilter;
62 changes: 46 additions & 16 deletions lib/models/noSignal.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,63 @@ function alertFunc(nsLineRule, entity) {
d.exit();
});
d.run(function() {
logger.debug(context, 'alertfunc nsLineRule %j entity %j ', nsLineRule, entity);
// We duplicate info in event and event.ev for VR and non-VR action parameters
var event = {
service: nsLineRule[SERVICE],
subservice: nsLineRule[SUBSERVICE],
ruleName: nsLineRule[NAME],
reportInterval: nsLineRule[REPORT_INTERVAL],
id: entity._id.id,
type: entity._id.type,
internalCurrentTime: new Date().toISOString()
};

// Search for modDate of the entity's attribute
// and copy every attribute (if not in event yet)
// for use in action template
Object.keys(entity.attrs).forEach(function(attrName) {
if (attrName === nsLineRule[ATTRIBUTE]) {
if (!config.nonSignalByAPI) {
// entity is really a entity doc obtained from mongo
event.id = entity._id.id;
event.type = entity._id.type;
logger.debug(context, 'alertfunc event %j ', event);
// Search for modDate of the entity's attribute
// and copy every attribute (if not in event yet)
// for use in action template
Object.keys(entity.attrs).forEach(function(attrName) {
if (attrName === nsLineRule[ATTRIBUTE]) {
try {
lastTime = new Date(entity.attrs[attrName].modDate * 1000).toISOString();
} catch (ex) {
myutils.logErrorIf(ex, 'run ', d.context);
}
}
if (event[attrName] === undefined) {
if (entity.attrs[attrName].type === 'DateTime') {
event[attrName] = new Date(entity.attrs[attrName].value * 1000).toISOString();
} else {
event[attrName] = entity.attrs[attrName].value;
}
}
});
} else {
// entity is and NGSI object
event.id = entity.id;
event.type = entity.type;
logger.debug(context, 'alertfunc event %j ', event);
// Search for modDate of the entity's attribute
// and copy every attribute (if not in event yet)
// for use in action template
const attrName = nsLineRule[ATTRIBUTE];
if (entity[attrName]) {
try {
lastTime = new Date(entity.attrs[attrName].modDate * 1000).toISOString();
lastTime = entity[attrName].metadata.TimeInstant.value;
} catch (ex) {
myutils.logErrorIf(ex, 'run ', d.context);
}
}
if (event[attrName] === undefined) {
if (entity.attrs[attrName].type === 'DateTime') {
event[attrName] = new Date(entity.attrs[attrName].value * 1000).toISOString();
} else {
event[attrName] = entity.attrs[attrName].value;
if (event[attrName] === undefined) {
if (entity[attrName].type === 'DateTime') {
event[attrName] = entity[attrName].metadata.TimeInstant.value;
} else {
event[attrName] = entity[attrName].value;
}
}
}
});
}

logger.debug(context, 'lastTime could be ', lastTime);
if (lastTime !== undefined && lastTime !== null) {
Expand All @@ -119,6 +146,7 @@ function checkNoSignal(period) {
currentContext.srv = 'n/a';
currentContext.subsrv = 'n/a';
logger.debug(currentContext, 'Executing no-signal handler for period of %d (%d rules)', period, list.length);

list.forEach(function(nsrule) {
currentContext.srv = nsrule[SERVICE];
currentContext.subsrv = nsrule[SUBSERVICE];
Expand Down Expand Up @@ -192,6 +220,7 @@ function addNSRule(service, subservice, name, nsr) {
);
intervalAsNum = MIN_INTERVAL_MS;
}

arrayRule = nsr2arr(service, subservice, name, nsr);
nsRulesByInterval[nsr.checkInterval] = nsRulesByInterval[nsr.checkInterval] || [];
nsRulesByInterval[nsr.checkInterval].forEach(function(element, index, array) {
Expand All @@ -206,6 +235,7 @@ function addNSRule(service, subservice, name, nsr) {
logger.debug(context, util.format('Adding no-signal rule (%s, %s, %s)', service, subservice, name));
}
if (!checkers.hasOwnProperty(nsr.checkInterval)) {
logger.info(context, util.format('no-signal rule (%s, %s, %s)', service, subservice, name));
checkers[nsr.checkInterval] = setInterval(checkNoSignal, intervalAsNum, nsr.checkInterval);
checkers[nsr.checkInterval].unref();
}
Expand Down
2 changes: 1 addition & 1 deletion test/component/nsr_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('Entity', function() {
}
],
checkInterval = 1,
rule = utilsT.loadExample('./test/data/no_signal/generic_nonsignal.json');
rule = utilsT.loadExample('./test/data/no_signal/visualrule_nonsignal.json');

// This is not correct, it does not affect actions module
// utilsT.getConfig().sms.URL = util.format('http://localhost:%s', utilsT.fakeHttpServerPort);
Expand Down
31 changes: 31 additions & 0 deletions test/data/no_signal/update_nonsignal.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"_id": "6474761fa509a6104ad7e415",
"name": "update_nonsignal",
"description": "testUpdate2",
"misc": "",
"text": "",
"VR": "",
"action": {
"type": "update",
"parameters": {
"id": "alarma:${id}",
"type": "Alarm",
"attributes": [
{
"name": "msg",
"value": "El status de ${id} es ${status}"
}
]
}
},
"nosignal": {
"checkInterval": "1",
"attribute": "temperature",
"reportInterval": "5",
"id": "thing:disp1",
"idRegexp": null,
"type": "thing"
},
"subservice": "/test",
"service": "smartcity"
}
File renamed without changes.

0 comments on commit 2e0ef01

Please sign in to comment.