Skip to content

Commit

Permalink
add monitoredItem triggering - part1
Browse files Browse the repository at this point in the history
  • Loading branch information
erossignon committed Mar 2, 2021
1 parent 4d3b894 commit 34a09bd
Show file tree
Hide file tree
Showing 10 changed files with 622 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ describe("Extension Object binding and sub components On MachineVision", () =>
throw new Error("Cannot find ResultType");
}
});
it("MachineVision-BindExtensionObject should instantitate a ResultType", () => {
it("MachineVision-BindExtensionObject should instantiate a ResultType", () => {
const result = resultType.instantiate({
browseName: `Result`,
organizedBy: addressSpace.rootFolder.objects
Expand All @@ -570,7 +570,7 @@ describe("Extension Object binding and sub components On MachineVision", () =>
console.log(extObj?.toString());
}
});
it("MachineVision-BindExtensionObject should instantitate a ResultType", () => {
it("MachineVision-BindExtensionObject should instantiate a ResultType", () => {
const partIdDataType = addressSpace.findDataType("PartIdDataType", nsMV)!;
const partId = addressSpace.constructExtensionObject(partIdDataType, {
description: "World",
Expand Down
3 changes: 2 additions & 1 deletion packages/node-opcua-client/test/test_verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe("Should verify certificate", function (this: any) {
.map((x) => x.args[0])
.join(" ");

msg.should.match(/.*\[NODE-OPCUA-W14\] The certificate subjectAltName uniformResourceIdentifier is missing.*/);
msg.
should.match(/.*\[NODE-OPCUA-W14\] The certificate subjectAltName uniformResourceIdentifier is missing.*/);
});
});
78 changes: 73 additions & 5 deletions packages/node-opcua-server/source/monitored_item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ type TimerKey = NodeJS.Timer;
export interface ISubscription {
$session?: any;
subscriptionDiagnostics: SubscriptionDiagnosticsDataType;
getMonitoredItem(monitoredItemId: number): MonitoredItem | null;
}

function isSourceNewerThan(a: DataValue, b?: DataValue): boolean {
Expand Down Expand Up @@ -373,6 +374,8 @@ export class MonitoredItem extends EventEmitter {
private _value_changed_callback: any;
private _semantic_changed_callback: any;
private _on_node_disposed_listener: any;
private _linkedItems?: number[];
private _triggeredNotifications?: QueueItem[];

constructor(options: MonitoredItemOptions) {
super();
Expand Down Expand Up @@ -438,8 +441,7 @@ export class MonitoredItem extends EventEmitter {

// OPCUA 1.03 part 4 : $5.12.4
// setting the mode to DISABLED causes all queued Notifications to be deleted
this.queue = [];
this.overflow = false;
this._empty_queue();
} else {
assert(this.monitoringMode === MonitoringMode.Sampling || this.monitoringMode === MonitoringMode.Reporting);

Expand Down Expand Up @@ -608,16 +610,82 @@ export class MonitoredItem extends EventEmitter {
return;
}
}

// processTriggerItems
this.triggerLinkedItems();
// store last value
this._enqueue_value(dataValue);
}

public hasLinkItem(linkedMonitoredItemId: number): boolean {
if (!this._linkedItems) {
return false;
}
return this._linkedItems.findIndex((x) => x === linkedMonitoredItemId) > 0;
}
public addLinkItem(linkedMonitoredItemId: number) {
this._linkedItems = this._linkedItems || [];
if (this.hasLinkItem(linkedMonitoredItemId)) {
return; // nothing to do
}
this._linkedItems.push(linkedMonitoredItemId);
}
public removeLinkItem(linkedMonitoredItemId: number): void {
if (!this._linkedItems) {
return;
}
const index = this._linkedItems.findIndex((x) => x === linkedMonitoredItemId);
if (index === -1) {
return;
}
this._linkedItems.splice(index, 1);
}
/**
* @internals
*/
private triggerLinkedItems() {
if (!this.$subscription || !this._linkedItems) {
return;
}
// see https://reference.opcfoundation.org/v104/Core/docs/Part4/5.12.1/#5.12.1.6
for (const linkItem of this._linkedItems) {
const linkedMonitoredItem = this.$subscription.getMonitoredItem(linkItem);
if (!linkedMonitoredItem) {
// monitoredItem may have been deleted
continue;
}
if (linkedMonitoredItem.monitoringMode === MonitoringMode.Disabled) {
continue;
}
if (linkedMonitoredItem.monitoringMode === MonitoringMode.Reporting) {
continue;
}
assert(linkedMonitoredItem.monitoringMode === MonitoringMode.Sampling);

linkedMonitoredItem.trigger();
}
}

get hasMonitoredItemNotifications(): boolean {
return this.queue.length > 0;
return this.queue.length > 0 || (this._triggeredNotifications !== undefined && this._triggeredNotifications.length > 0);
}

/**
* @internals
*/
private trigger() {
this._triggeredNotifications = this._triggeredNotifications || [];
const notifications = this.extractMonitoredItemNotifications(true);
this._triggeredNotifications = ([] as QueueItem[]).concat(this._triggeredNotifications!, notifications);
}

public extractMonitoredItemNotifications() {
if (this.monitoringMode !== MonitoringMode.Reporting) {
public extractMonitoredItemNotifications(bForce: boolean = false): QueueItem[] {
if (!bForce && this.monitoringMode === MonitoringMode.Sampling && this._triggeredNotifications) {
const notifications1 = this._triggeredNotifications;
this._triggeredNotifications = undefined;
return notifications1;
}
if (!bForce && this.monitoringMode !== MonitoringMode.Reporting) {
return [];
}
const notifications = this.queue;
Expand Down
12 changes: 6 additions & 6 deletions packages/node-opcua-server/source/server_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ export class ServerEngine extends EventEmitter {
/**
* @method closeSession
* @param authenticationToken
* @param deleteSubscriptions {Boolean} : true if sessions's subscription shall be deleted
* @param deleteSubscriptions {Boolean} : true if session's subscription shall be deleted
* @param {String} [reason = "CloseSession"] the reason for closing the session (
* shall be "Timeout", "Terminated" or "CloseSession")
*
Expand Down Expand Up @@ -1793,7 +1793,7 @@ export class ServerEngine extends EventEmitter {
const referenceTime = new Date(Date.now() - maxAge);

assert(callback instanceof Function);
const objs: any = {};
const objectMap: any = {};
for (const nodeToRefresh of nodesToRefresh) {
// only consider node for which the caller wants to read the Value attribute
// assuming that Value is requested if attributeId is missing,
Expand All @@ -1810,19 +1810,19 @@ export class ServerEngine extends EventEmitter {
continue;
}
const key = obj.nodeId.toString();
if (objs[key]) {
if (objectMap[key]) {
continue;
}

objs[key] = obj;
objectMap[key] = obj;
}
if (Object.keys(objs).length === 0) {
if (Object.keys(objectMap).length === 0) {
// nothing to do
return callback(null, []);
}
// perform all asyncRefresh in parallel
async.map(
objs,
objectMap,
(obj: BaseNode, inner_callback: DataValueCallback) => {
if (obj.nodeClass !== NodeClass.Variable) {
inner_callback(
Expand Down
20 changes: 11 additions & 9 deletions packages/node-opcua-server/source/server_publish_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide
assert(!destPublishEngine.getSubscriptionById(subscription.id));
assert(srcPublishEngine.getSubscriptionById(subscription.id));

// remove pending StatusChangeNotificiation on the same session that may exist already
// remove pending StatusChangeNotification on the same session that may exist already
destPublishEngine._purge_dangling_subscription(subscription.id);

debugLog(chalk.cyan("ServerSidePublishEngine.transferSubscription live subscriptionId ="), subscription.subscriptionId);

//xx const internalNotification = subscription._flushSentNotifications();
// xx const internalNotification = subscription._flushSentNotifications();
debugLog(chalk.cyan("ServerSidePublishEngine.transferSubscription with = "), subscription.getAvailableSequenceNumbers());

// If the Server transfers the Subscription to the new Session, the Server shall issue a
Expand Down Expand Up @@ -213,12 +213,12 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide
const subscription = this.getSubscriptionById(subscriptionAcknowledgement.subscriptionId);
if (!subscription) {
// // try to find the session
// const transferedSubscription = this._transfered_subscriptions.find(
// const transferredSubscription = this._transferred_subscriptions.find(
// (s) => s.subscriptionId === subscriptionAcknowledgement.subscriptionId
// );
// if (transferedSubscription) {
// debugLog("Subscription acknowledgeNotification done in tansfererd subscription ");
// return transferedSubscription.acknowledgeNotification(subscriptionAcknowledgement.sequenceNumber);
// if (transferredSubscription) {
// debugLog("Subscription acknowledgeNotification done in transferred subscription ");
// return transferredSubscription.acknowledgeNotification(subscriptionAcknowledgement.sequenceNumber);
// }
return StatusCodes.BadSubscriptionIdInvalid;
}
Expand Down Expand Up @@ -246,7 +246,7 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide

debugLog("ServerSidePublishEngine#add_subscription - adding subscription with Id:", subscription.id);
this._subscriptions[subscription.id] = subscription;
// xxsubscription._flushSentNotifications();
// xx subscription._flushSentNotifications();
return subscription;
}

Expand Down Expand Up @@ -387,6 +387,7 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide
* @param request
* @param callback
* @private
* @internal
*/
public _on_PublishRequest(request: PublishRequest, callback?: any) {
callback = callback || dummy_function;
Expand Down Expand Up @@ -477,6 +478,7 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide
}

if (this._closed_subscriptions) {
/** */
}
const starving_subscription = /* this.findSubscriptionWaitingForFirstPublish() || */ findLateSubscriptionSortedByPriority();
return starving_subscription;
Expand Down Expand Up @@ -575,7 +577,7 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide
traceLog("send_keep_alive_response => invalid subscriptionId = ", subscriptionId);
return false;
}
// let check if we have avalabile PublishRequest to send the keep alive
// let check if we have available PublishRequest to send the keep alive
if (this.pendingPublishRequestCount === 0 || subscription.hasPendingNotifications) {
// we cannot send the keep alive PublishResponse
return false;
Expand Down Expand Up @@ -621,7 +623,7 @@ export class ServerSidePublishEngine extends EventEmitter implements IServerSide
this._publish_request_queue = parts[1]; // still valid

const invalid_published_request = parts[0];
for (let publishData of invalid_published_request) {
for (const publishData of invalid_published_request) {
console.log(chalk.cyan(" CANCELING TIMEOUT PUBLISH REQUEST "));
this._send_error_for_request(publishData, StatusCodes.BadTimeout);
}
Expand Down
45 changes: 43 additions & 2 deletions packages/node-opcua-server/source/server_subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,47 @@ export class Subscription extends EventEmitter {
}
}

public setTriggering(triggeringItemId: number, linksToAdd: number[], linksToRemove: number[]) {
/** Bad_NothingToDo, Bad_TooManyOperations,Bad_SubscriptionIdInvalid, Bad_MonitoredItemIdInvalid */

if (linksToAdd.length === 0 && linksToRemove.length === 0) {
return { statusCode: StatusCodes.BadNothingToDo, addResults: [], removeResults: [] };
}
const triggeringItem = this.getMonitoredItem(triggeringItemId);

const monitoredItemsToAdd = linksToAdd.map((id) => this.getMonitoredItem(id));
const monitoredItemsToRemove = linksToRemove.map((id) => this.getMonitoredItem(id));

const addResults: StatusCodes[] = monitoredItemsToAdd.map((m) =>
m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
);
const removeResults: StatusCodes[] = monitoredItemsToRemove.map((m) =>
m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
);

if (!triggeringItem) {
return {
statusCode: StatusCodes.BadMonitoredItemIdInvalid,

addResults,
removeResults
};
}
//
monitoredItemsToAdd.forEach((m) => !m || triggeringItem.addLinkItem(m.monitoredItemId));
monitoredItemsToRemove.forEach((m) => !m || triggeringItem.removeLinkItem(m.monitoredItemId));

const statusCode: StatusCode = StatusCodes.Good;

// do binding

return {
statusCode,

addResults,
removeResults
};
}
public dispose() {
if (doDebug) {
debugLog("Subscription#dispose", this.id, this.monitoredItemCount);
Expand Down Expand Up @@ -1103,7 +1144,7 @@ export class Subscription extends EventEmitter {
];

if (this.publishEngine!.pendingPublishRequestCount) {
// the GoodSubscriptionTransferred can be prcessed immediatly
// the GoodSubscriptionTransferred can be processed immediately
this._addNotificationMessage(notificationData);
debugLog(chalk.red("pendingPublishRequestCount"), this.publishEngine?.pendingPublishRequestCount);
this._publish_pending_notifications();
Expand Down Expand Up @@ -1332,7 +1373,7 @@ export class Subscription extends EventEmitter {
debugLog("Subscription#_tick aborted=", this.aborted, "state=", this.state.toString());

if (this.aborted) {
// xx console.log(" Log aborteds")
// xx console.log(" Log aborted")
// xx // underlying channel has been aborted ...
// xx self.publishEngine.cancelPendingPublishRequestBeforeChannelChange();
// xx // let's still increase lifetime counter to detect timeout
Expand Down

0 comments on commit 34a09bd

Please sign in to comment.