Skip to content

Commit

Permalink
add monitoredItem triggering - part 3
Browse files Browse the repository at this point in the history
  • Loading branch information
erossignon committed Mar 2, 2021
1 parent a01590b commit a5855dd
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ export class ClientSessionImpl extends EventEmitter implements ClientSession {
* @param value {Variant} - the value to write
* @return {Promise<StatusCode>} - the status code of the write
*
* @deprecated
*/
public writeSingleNode(nodeId: NodeIdLike, value: VariantLike, callback: ResponseCallback<StatusCode>): void;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ClientSessionRawSubscriptionService,
ClientSidePublishEngine,
ClientSubscription,
coerceNodeId,
DataChangeFilter,
DataChangeNotification,
DataChangeTrigger,
Expand Down Expand Up @@ -398,5 +399,204 @@ export function t(test: any) {
monitoredItems[1].clientHandle.should.eql(l2.monitoringParameters.clientHandle);
}
});
it("SetTriggering-4: Deadband testing of Linked items.", async () => {
// note: based on 020.js in CTT ( set)
const { session, subscription, publishEngine } = s;

const raw_notification_spy = sinon.spy();
subscription.on("raw_notification", raw_notification_spy);

const namespaceArray = await session.readNamespaceArray();
const simulationNamespaceIndex = namespaceArray.indexOf("urn://node-opcua-simulator");
console.log("simulationNamespaceIndex = ", simulationNamespaceIndex);

const triggerNodeId = coerceNodeId(`ns=${simulationNamespaceIndex};s=Static_Scalar_UInt16`);
const linkedNodeId1 = coerceNodeId(`ns=${simulationNamespaceIndex};s=Static_Scalar_Byte`);
const linkedNodeId2 = coerceNodeId(`ns=${simulationNamespaceIndex};s=Static_Scalar_Float`);

// create monitored items
let m1: ClientMonitoredItem;
let l1: ClientMonitoredItem;
let l2: ClientMonitoredItem;

{
m1 = await subscription.monitor(
{
nodeId: triggerNodeId,
attributeId: 13
},
{
discardOldest: true,
queueSize: 1,
samplingInterval: 0,
filter: null
},
TimestampsToReturn.Both,
MonitoringMode.Reporting
);
l1 = await subscription.monitor(
{
nodeId: linkedNodeId1,
attributeId: 13
},
{
discardOldest: true,
queueSize: 1,
samplingInterval: 0,
filter: new DataChangeFilter({
deadbandType: DeadbandType.Absolute,
deadbandValue: 5,
trigger: DataChangeTrigger.StatusValue,
})
},
TimestampsToReturn.Both,
MonitoringMode.Sampling
);
l2 = await subscription.monitor(
{
nodeId: linkedNodeId2,
attributeId: 13
},
{
discardOldest: true,
queueSize: 1,
samplingInterval: 0,
filter: new DataChangeFilter({
deadbandType: DeadbandType.Absolute,
deadbandValue: 0.5,
trigger: DataChangeTrigger.StatusValue
})
},
TimestampsToReturn.Both,
MonitoringMode.Sampling
);
}

let m1Value = 100;
let l1Value = 100;
let l2Value = 100;
// write initial values
{
const statusCodes = await session.write([
{
nodeId: triggerNodeId,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.UInt16, value: m1Value } }
},
{
nodeId: linkedNodeId1,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.Byte, value: l1Value } }
},
{
nodeId: linkedNodeId2,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.Float, value: l2Value } }
}
]);
statusCodes.should.eql([StatusCodes.Good, StatusCodes.Good, StatusCodes.Good]);
}

// setLinks
const result = await subscription.setTriggering(m1, [l1, l2], []);
result.addResults!.should.eql([StatusCodes.Good, StatusCodes.Good]);

await waitUntilKeepAlive(publishEngine, subscription);
raw_notification_spy.resetHistory();

m1Value += 1;
{
const statusCodes = await session.write([
{
nodeId: triggerNodeId,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.UInt16, value: m1Value } }
}
]);
}

await waitUntilKeepAlive(publishEngine, subscription);
raw_notification_spy.callCount.should.eql(2, "must have received a changed notification and one empty notif");
{
const notification = raw_notification_spy.getCall(0).args[0] as NotificationMessage;
// tslint:disable-next-line: no-unused-expression
doDebug && console.log(notification.toString());

const monitoredItems = (notification.notificationData![0] as DataChangeNotification).monitoredItems!;

monitoredItems.length.should.eql(3);

monitoredItems[0].clientHandle.should.eql(m1.monitoringParameters.clientHandle);
monitoredItems[1].clientHandle.should.eql(l1.monitoringParameters.clientHandle);
monitoredItems[2].clientHandle.should.eql(l2.monitoringParameters.clientHandle);
}

const deadbandValuesInt = [0, 6, 7, 6, 20];
const deadbandValuesFloat = [0.0, 0.6, 0.5, 0.6, 1.5];
const successes = [true, true, false, false, true];

for (let i = 0; i < deadbandValuesInt.length; i++) {
// console.log("############################################### =>", i);
raw_notification_spy.resetHistory();

m1Value += 1;
l1Value = deadbandValuesInt[i];
l2Value = deadbandValuesFloat[i];
await session.write([
{
nodeId: triggerNodeId,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.UInt16, value: m1Value } }
},
{
nodeId: linkedNodeId1,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.Byte, value: l1Value } }
},
{
nodeId: linkedNodeId2,
attributeId: AttributeIds.Value,
value: { value: { dataType: DataType.Float, value: l2Value } }
}
]);
const dataValuesVerif = await session.read([
{
nodeId: triggerNodeId,
attributeId: AttributeIds.Value
},
{
nodeId: linkedNodeId1,
attributeId: AttributeIds.Value
},
{
nodeId: linkedNodeId2,
attributeId: AttributeIds.Value
}
]);
dataValuesVerif[0].value.value.should.eql(m1Value);
dataValuesVerif[1].value.value.should.eql(l1Value);
Math.abs(dataValuesVerif[2].value.value - l2Value).should.be.lessThan(1E-6);

await pause(10);
await waitUntilKeepAlive(publishEngine, subscription);

const notification = raw_notification_spy.getCall(0).args[0] as NotificationMessage;
// tslint:disable-next-line: no-unused-expression
doDebug && console.log(notification.toString());

const monitoredItems = (notification.notificationData![0] as DataChangeNotification).monitoredItems!;

if (successes[i]) {
monitoredItems.length.should.eql(3);

monitoredItems[0].clientHandle.should.eql(m1.monitoringParameters.clientHandle);
monitoredItems[1].clientHandle.should.eql(l1.monitoringParameters.clientHandle);
monitoredItems[2].clientHandle.should.eql(l2.monitoringParameters.clientHandle);
} else {
monitoredItems.length.should.eql(1);
monitoredItems[0].clientHandle.should.eql(m1.monitoringParameters.clientHandle);
}
}
});
});
}
73 changes: 56 additions & 17 deletions packages/node-opcua-server/source/monitored_item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,54 @@ function apply_dataChange_filter(this: MonitoredItem, newDataValue: DataValue, o
}

const trigger = this.filter.trigger;

// istanbul ignore next
if (doDebug) {
try {
debugLog("filter pass ?", DataChangeTrigger[trigger] ,this.oldDataValue?.toString(), newDataValue.toString());
if (
trigger === DataChangeTrigger.Status ||
trigger === DataChangeTrigger.StatusValue ||
trigger === DataChangeTrigger.StatusValueTimestamp
) {
debugLog("statusCodeHasChanged ", statusCodeHasChanged(newDataValue, oldDataValue));
}
if (trigger === DataChangeTrigger.StatusValue || trigger === DataChangeTrigger.StatusValueTimestamp) {
debugLog(
"valueHasChanged ",
valueHasChanged.call(this, newDataValue, oldDataValue, this.filter!.deadbandType, this.filter!.deadbandValue)
);
}
if (trigger === DataChangeTrigger.StatusValueTimestamp) {
debugLog("timestampHasChanged ", timestampHasChanged(newDataValue.sourceTimestamp, oldDataValue.sourceTimestamp));
}
} catch(err) {
console.log(err);
}
}
switch (trigger) {
case DataChangeTrigger.Status: // Status
case DataChangeTrigger.Status: {
//
// Status
// Report a notification ONLY if the StatusCode associated with
// the value changes. See Table 166 for StatusCodes defined in
// this standard. Part 8 specifies additional StatusCodes that are
// valid in particular for device data.
return statusCodeHasChanged(newDataValue, oldDataValue);

case DataChangeTrigger.StatusValue: // StatusValue
// Report a notification if either the StatusCode or the value
// change. The Deadband filter can be used in addition for
}
case DataChangeTrigger.StatusValue:
{
// filtering value changes.
// change. The Deadband filter can be used in addition for
// Report a notification if either the StatusCode or the value
// StatusValue
// This is the default setting if no filter is set.
return (
statusCodeHasChanged(newDataValue, oldDataValue) ||
valueHasChanged.call(this, newDataValue, oldDataValue, this.filter.deadbandType, this.filter.deadbandValue)
);

}
default:
{
// StatusValueTimestamp
// Report a notification if either StatusCode, value or the
// SourceTimestamp change.
Expand All @@ -226,6 +254,7 @@ function apply_dataChange_filter(this: MonitoredItem, newDataValue: DataValue, o
statusCodeHasChanged(newDataValue, oldDataValue) ||
valueHasChanged.call(this, newDataValue, oldDataValue, this.filter.deadbandType, this.filter.deadbandValue)
);
}
}
return false;
}
Expand Down Expand Up @@ -591,7 +620,6 @@ export class MonitoredItem extends EventEmitter {
}

if (!apply_filter.call(this, dataValue)) {
debugLog("filter did not pass");
return;
}

Expand Down Expand Up @@ -623,22 +651,27 @@ export class MonitoredItem extends EventEmitter {
}
return this._linkedItems.findIndex((x) => x === linkedMonitoredItemId) > 0;
}
public addLinkItem(linkedMonitoredItemId: number) {
public addLinkItem(linkedMonitoredItemId: number): StatusCode {
if (linkedMonitoredItemId === this.monitoredItemId) {
return StatusCodes.BadMonitoredItemIdInvalid;
}
this._linkedItems = this._linkedItems || [];
if (this.hasLinkItem(linkedMonitoredItemId)) {
return; // nothing to do
return StatusCodes.BadMonitoredItemIdInvalid; // nothing to do
}
this._linkedItems.push(linkedMonitoredItemId);
return StatusCodes.Good;
}
public removeLinkItem(linkedMonitoredItemId: number): void {
if (!this._linkedItems) {
return;
public removeLinkItem(linkedMonitoredItemId: number): StatusCode {
if (!this._linkedItems || linkedMonitoredItemId === this.monitoredItemId) {
return StatusCodes.BadMonitoredItemIdInvalid;
}
const index = this._linkedItems.findIndex((x) => x === linkedMonitoredItemId);
if (index === -1) {
return;
return StatusCodes.BadMonitoredItemIdInvalid;
}
this._linkedItems.splice(index, 1);
return StatusCodes.Good;
}
/**
* @internals
Expand All @@ -662,6 +695,10 @@ export class MonitoredItem extends EventEmitter {
}
assert(linkedMonitoredItem.monitoringMode === MonitoringMode.Sampling);

// istanbul ignore next
if (doDebug) {
debugLog("triggerLinkedItems => ", this.node?.nodeId.toString(), linkedMonitoredItem.node?.nodeId.toString());
}
linkedMonitoredItem.trigger();
}
}
Expand All @@ -674,9 +711,11 @@ export class MonitoredItem extends EventEmitter {
* @internals
*/
private trigger() {
this._triggeredNotifications = this._triggeredNotifications || [];
const notifications = this.extractMonitoredItemNotifications(true);
this._triggeredNotifications = ([] as QueueItem[]).concat(this._triggeredNotifications!, notifications);
setImmediate(() => {
this._triggeredNotifications = this._triggeredNotifications || [];
const notifications = this.extractMonitoredItemNotifications(true);
this._triggeredNotifications = ([] as QueueItem[]).concat(this._triggeredNotifications!, notifications);
});
}

public extractMonitoredItemNotifications(bForce: boolean = false): QueueItem[] {
Expand Down
26 changes: 15 additions & 11 deletions packages/node-opcua-server/source/server_subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -692,24 +692,28 @@ export class Subscription extends EventEmitter {
const monitoredItemsToAdd = linksToAdd.map((id) => this.getMonitoredItem(id));
const monitoredItemsToRemove = linksToRemove.map((id) => this.getMonitoredItem(id));

const addResults: StatusCode[] = monitoredItemsToAdd.map((m) =>
m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
);
const removeResults: StatusCode[] = monitoredItemsToRemove.map((m) =>
m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
);

if (!triggeringItem) {
const removeResults1: StatusCode[] = monitoredItemsToRemove.map((m) =>
m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
);
const addResults1: StatusCode[] = monitoredItemsToAdd.map((m) =>
m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
);
return {
statusCode: StatusCodes.BadMonitoredItemIdInvalid,

addResults,
removeResults
addResults: addResults1,
removeResults: removeResults1
};
}
//
monitoredItemsToAdd.forEach((m) => !m || triggeringItem.addLinkItem(m.monitoredItemId));
monitoredItemsToRemove.forEach((m) => !m || triggeringItem.removeLinkItem(m.monitoredItemId));
// note: it seems that CTT imposed that we do remove before add
const removeResults = monitoredItemsToRemove.map((m) =>
!m ? StatusCodes.BadMonitoredItemIdInvalid : triggeringItem.removeLinkItem(m.monitoredItemId)
);
const addResults = monitoredItemsToAdd.map((m) =>
!m ? StatusCodes.BadMonitoredItemIdInvalid : triggeringItem.addLinkItem(m.monitoredItemId)
);

const statusCode: StatusCode = StatusCodes.Good;

Expand Down

0 comments on commit a5855dd

Please sign in to comment.