Skip to content

Commit

Permalink
fix subscription reconstruction after reconnection
Browse files Browse the repository at this point in the history
 - fix a bug in client that causes subscriptions to fail to reconstruct
   if the number of monitored items to rebuild was greater than
   the operational limit maxMonitoredItemsPerCall
  • Loading branch information
erossignon committed May 16, 2021
1 parent b775d57 commit 8817c9b
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export class ClientSidePublishEngine {
}

public terminate() {
debugLog("Terminated ClientPublishEngine ")
this.session = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { Callback, ErrorCallback } from "node-opcua-status-code";
import * as utils from "node-opcua-utils";
import { promoteOpaqueStructure } from "node-opcua-client-dynamic-extension-object";
import { DataType, Variant } from "node-opcua-variant";
import { IBasicSession } from "node-opcua-pseudo-session";
import { createMonitoredItemsLimit, IBasicSession, readOperationLimits } from "node-opcua-pseudo-session";

import { ClientMonitoredItemBase } from "../client_monitored_item_base";
import { ClientMonitoredItemGroup } from "../client_monitored_item_group";
Expand All @@ -57,6 +57,7 @@ import { ClientSidePublishEngine } from "./client_publish_engine";
import { ClientSessionImpl } from "./client_session_impl";
import { ClientMonitoredItem } from "../client_monitored_item";
import { ClientMonitoredItemToolbox } from "../client_monitored_item_toolbox";
import { IBasicSessionWithSubscription } from "node-opcua-pseudo-session";

const debugLog = make_debugLog(__filename);
const doDebug = checkDebugFlag(__filename);
Expand Down Expand Up @@ -155,6 +156,18 @@ function displayKeepAliveWarning(sessionTimeout: number, maxKeepAliveCount: numb
return false;
}

function createMonitoredItemsAndRespectOperationalLimits(
session: IBasicSession & IBasicSessionWithSubscription,
createMonitorItemsRequest: CreateMonitoredItemsRequest,
callback: (err: Error | null, response?: CreateMonitoredItemsResponse) => void) {
readOperationLimits(session).then((operationalLimits) => {
createMonitoredItemsLimit(operationalLimits.maxMonitoredItemsPerCall || 0, session, createMonitorItemsRequest)
.then((createMonitoredItemResponse) => callback(null, createMonitoredItemResponse))
.catch(callback);
}
).catch(callback);
}

export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscription {
/**
* the associated session
Expand Down Expand Up @@ -511,8 +524,8 @@ export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscr
modifySubscriptionRequest.requestedMaxKeepAliveCount = modifySubscriptionRequest.requestedMaxKeepAliveCount === undefined ? this.maxKeepAliveCount : modifySubscriptionRequest.requestedMaxKeepAliveCount;
modifySubscriptionRequest.requestedPublishingInterval = modifySubscriptionRequest.requestedPublishingInterval === undefined ? this.publishingInterval : modifySubscriptionRequest.requestedPublishingInterval;
modifySubscriptionRequest.maxNotificationsPerPublish = modifySubscriptionRequest.maxNotificationsPerPublish === undefined ? this.maxNotificationsPerPublish : modifySubscriptionRequest.maxNotificationsPerPublish;


session.modifySubscription(modifySubscriptionRequest, (err: Error | null, response?: ModifySubscriptionResponse) => {
if (err || !response) {
return callback(err);
Expand Down Expand Up @@ -554,6 +567,7 @@ export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscr
(innerCallback: ErrorCallback) => {
const test = this.publishEngine.getSubscription(this.subscriptionId);

debugLog("recreating ", Object.keys(oldMonitoredItems).length, " monitored Items");
// re-create monitored items
const itemsToCreate: MonitoredItemCreateRequestOptions[] = [];

Expand All @@ -573,14 +587,14 @@ export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscr
});

const session = this.session;
// istanbul ignore next
if (!session) {
return innerCallback(new Error("no session"));
}

debugLog("Recreating ", itemsToCreate.length, " monitored items");

session.createMonitoredItems(
createMonitorItemsRequest,
createMonitoredItemsAndRespectOperationalLimits(session, createMonitorItemsRequest,
(err: Error | null, response?: CreateMonitoredItemsResponse) => {
if (err) {
debugLog("Recreating monitored item has failed with ", err.message);
Expand Down Expand Up @@ -616,6 +630,9 @@ export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscr
}
],
(err) => {
if (err) {
warningLog(err.message);
}
callback(err!);
}
);
Expand Down Expand Up @@ -701,6 +718,7 @@ export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscr
private __create_subscription(callback: ErrorCallback) {
assert(typeof callback === "function");

// istanbul ignore next
if (!this.hasSession) {
return callback(new Error("No Session"));
}
Expand All @@ -726,9 +744,12 @@ export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscr
}
return;
}

/* istanbul ignore next */
if (!response) {
return callback(new Error("internal error"));
}

if (!this.hasSession) {
return callback(new Error("createSubscription has failed = > no session"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { AttributeIds, CreateMonitoredItemsResponse, OPCUAClient, OPCUAServer, resolveNodeId, StatusCodes, TimestampsToReturn } from "node-opcua";
import "should";

async function pause(ms: number) {
return await new Promise((resolve) => setTimeout(resolve, ms));
}
const describe = require("node-opcua-leak-detector").describeWithLeakDetector;
describe("[CLIENT] recreating large subscription during reconnection", () => {

let server: OPCUAServer;
async function startServer(maxMonitoredItemsPerCall: number) {
server = new OPCUAServer({
serverCapabilities: {
operationLimits: {
maxMonitoredItemsPerCall,
}
}
});

await server.start();
return server;
}
async function stopServer(server: OPCUAServer) {
await server.shutdown();
}

it("recreating large subscription during reconnection should not lead to BadTooManyOperation", async () => {

const maxMonitoredItemsPerCall = 2;
// Given a server with a small maxMonitoredItemsPerCall value
let server = await startServer(maxMonitoredItemsPerCall);
const endpointUrl = server.getEndpointUrl();

// Given a client with a large number of monitoredItem
const client = OPCUAClient.create({
});
client.on("backoff", () => console.log("backoff"));
client.on("connection_failed", () => console.log("connection has failed"));
client.on("after_reconnection", () => console.log("after reconnection"));
client.on("connection_lost", () => console.log("connection lost"));
client.on("after_reconnection", () => console.log("after_reconnection"));

client.on("send_request", (request) => {
});

let createMonitoredItemsResponses: CreateMonitoredItemsResponse[] = [];
client.on("receive_response", (response) => {

if (
response.constructor.name === "CreateSubscriptionResponse" ||
response.constructor.name === "CreateMonitoredItemsResponse"
) {
// console.log(response.toString());
}
if (response.constructor.name === "CreateMonitoredItemsResponse") {
createMonitoredItemsResponses.push(response as CreateMonitoredItemsResponse);
}
});

await client.connect(endpointUrl);

const session = await client.createSession();
const subscription = await session.createSubscription2({
requestedLifetimeCount: 10,
requestedPublishingInterval: 100,
requestedMaxKeepAliveCount: 5
});

const nodeId = resolveNodeId("Server_ServerStatus_CurrentTime");

// Given that the client has more monitored Items than maxMonitoredItemsPerCall
for (let i = 0; i < maxMonitoredItemsPerCall + 1; i++) {
const m = await subscription.monitor({
nodeId, attributeId: AttributeIds.Value
}, { samplingInterval: 10 }, TimestampsToReturn.Both)
}


createMonitoredItemsResponses.length.should.eql(maxMonitoredItemsPerCall + 1);
createMonitoredItemsResponses = [];

// When the server stops and restarts
await stopServer(server);
await pause(1000);

let isSessionRestored = false;
session.on('session_restored', () => {
isSessionRestored = true
console.log("Session Restored !");
})

server = await startServer(maxMonitoredItemsPerCall);

// wait until reconnection is completedf
while (session.isReconnecting && !isSessionRestored) {
await pause(100);
}

await pause(100);
console.log("------------------------------------------------------------")

await session.close();
await client.disconnect();
await stopServer(server);

// verify
if (createMonitoredItemsResponses.length === 0) {
throw new Error("createMonitoredItemsResponse missing");
}
let n = 0;
for (let i = 0; i < createMonitoredItemsResponses.length; i++) {
createMonitoredItemsResponses[0].responseHeader.serviceResult.should.eql(StatusCodes.Good);

n += createMonitoredItemsResponses[i].results!.length;
for (const r of createMonitoredItemsResponses[i].results!) {
r.statusCode!.should.eql(StatusCodes.Good);
}
}
n.should.eql(maxMonitoredItemsPerCall + 1);



});

});
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import {
CreateSubscriptionRequestOptions,
CreateSubscriptionResponse,
CreateMonitoredItemsRequestOptions,
CreateMonitoredItemsResponse,
} from "node-opcua-service-subscription";
import {
ResponseCallback
} from "./basic_session_interface";

/**
* @module node-opcua-pseudo-session
*/
export interface IBasicSessionWithSubscription {

createSubscription(options: CreateSubscriptionRequestOptions, callback: ResponseCallback<CreateSubscriptionResponse>): void;
createSubscription(options: CreateSubscriptionRequestOptions): Promise<CreateSubscriptionResponse>;

// setMonitoringMode(options: SetMonitoringModeRequestLike, callback: ResponseCallback<SetMonitoringModeResponse>): void;
// setMonitoringMode(options: SetMonitoringModeRequestLike): Promise<SetMonitoringModeResponse>;

createMonitoredItems(options: CreateMonitoredItemsRequestOptions, callback: ResponseCallback<CreateMonitoredItemsResponse>): void;
createMonitoredItems(options: CreateMonitoredItemsRequestOptions): Promise<CreateMonitoredItemsResponse>;
}
3 changes: 3 additions & 0 deletions packages/node-opcua-pseudo-session/source/browse_all.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* @module node-opcua-pseudo-session
*/
import { BrowseResult } from "node-opcua-service-browse";
import { IBasicSession, BrowseDescriptionLike } from "./basic_session_interface";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* @module node-opcua-pseudo-session
*/
import {
CreateMonitoredItemsRequest,
CreateMonitoredItemsResponse,
} from "node-opcua-service-subscription";
import {
IBasicSession
} from "./basic_session_interface";
import {
IBasicSessionWithSubscription
} from "./basic_session_with_subscription";

export async function createMonitoredItemsLimit(
maxMonitoredItemsPerCall: number,
session: IBasicSessionWithSubscription,
createMonitoredItemsRequest: CreateMonitoredItemsRequest
): Promise<CreateMonitoredItemsResponse>;


export async function createMonitoredItemsLimit(
maxMonitoredItemsPerCall: number,
session: IBasicSessionWithSubscription,
createMonitoredItemsRequest: CreateMonitoredItemsRequest
): Promise<CreateMonitoredItemsResponse> {
const _session2 = session as IBasicSessionWithSubscription;

if (
maxMonitoredItemsPerCall <= 0 ||
!createMonitoredItemsRequest.itemsToCreate ||
createMonitoredItemsRequest.itemsToCreate.length <= maxMonitoredItemsPerCall
) {
return _session2.createMonitoredItems(createMonitoredItemsRequest);
}
const n = [...(createMonitoredItemsRequest.itemsToCreate || [])];
const response = new CreateMonitoredItemsResponse({
diagnosticInfos: null,
results: []
});
do {
const c = n.splice(0, maxMonitoredItemsPerCall);
const cmi = new CreateMonitoredItemsRequest({
subscriptionId: createMonitoredItemsRequest.subscriptionId,
timestampsToReturn: createMonitoredItemsRequest.timestampsToReturn,
itemsToCreate: c
});
const r = await _session2.createMonitoredItems(cmi);
for (const i of r.results!) {
response.results!.push(i);
}
} while (n.length);
return response;
}
3 changes: 3 additions & 0 deletions packages/node-opcua-pseudo-session/source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
*/
export * from "./basic_session_interface";
export * from "./browse_all";
export * from "./read_operational_limits";
export * from "./create_monitored_items_limit";
export * from "./basic_session_with_subscription";

0 comments on commit 8817c9b

Please sign in to comment.