From fec66f9bc0d3ab3f4c727b4933b1fa0a6830b57d Mon Sep 17 00:00:00 2001 From: Etienne Rossignon Date: Sat, 6 Apr 2024 15:30:54 +0200 Subject: [PATCH] improve reconnection --- .../source/private/client_base_impl.ts | 175 +++++++------- .../source/private/client_session_impl.ts | 16 -- .../source/private/i_private_client.ts | 2 - .../reconnection/client_reconnection.ts | 17 ++ .../private/reconnection/reconnection.ts | 7 +- .../test/discovery/_helper.ts | 2 +- .../u_test_frequent_server_restart.ts | 4 +- .../test/end_to_end/test_e2e_1331.ts | 9 +- .../test_e2e_client_on_close_events.js | 222 +++++++----------- ...e2e_large_subscription_and_reconnection.ts | 7 +- ...2e_secure_all_possibe_secure_connection.ts | 7 +- .../test/test_issue_1162.ts | 4 +- .../client/client_secure_channel_layer.ts | 7 +- .../source/register_server_manager.ts | 20 +- packages/playground/polling_client.ts | 6 + 15 files changed, 249 insertions(+), 256 deletions(-) create mode 100644 packages/node-opcua-client/source/private/reconnection/client_reconnection.ts diff --git a/packages/node-opcua-client/source/private/client_base_impl.ts b/packages/node-opcua-client/source/private/client_base_impl.ts index 84f57d2e56..e6a63190e5 100644 --- a/packages/node-opcua-client/source/private/client_base_impl.ts +++ b/packages/node-opcua-client/source/private/client_base_impl.ts @@ -66,6 +66,7 @@ import { UserIdentityInfo } from "../user_identity_info"; import { performCertificateSanityCheck } from "../verify"; import { ClientSessionImpl } from "./client_session_impl"; import { IClientBase } from "./i_private_client"; +import { waitUntilReconnectionIsCanceled } from "./reconnection/client_reconnection"; const debugLog = make_debugLog(__filename); const doDebug = checkDebugFlag(__filename); @@ -80,13 +81,13 @@ const defaultConnectionStrategy: ConnectionStrategyOptions = { randomisationFactor: 0.1 }; -interface MasterClient extends OPCUAClientBase { - _tmpClient?: OPCUAClientBase; -} -function __findEndpoint(this: OPCUAClientBase, endpointUrl: string, params: FindEndpointOptions, _callback: FindEndpointCallback) { - const masterClient = this as MasterClient; - debugLog("findEndpoint : endpointUrl = ", endpointUrl); - debugLog(" params ", params); +function __findEndpoint(this: ClientBaseImpl, endpointUrl: string, params: FindEndpointOptions, _callback: FindEndpointCallback) { + if (this.isUnusable()) { + return _callback(new Error("Client is not usable")); + } + const masterClient = this as ClientBaseImpl; + doDebug && debugLog("findEndpoint : endpointUrl = ", endpointUrl); + doDebug && debugLog(" params ", params); assert(!masterClient._tmpClient); const callback = (err: Error | null, result?: FindEndpointResult) => { @@ -105,7 +106,6 @@ function __findEndpoint(this: OPCUAClientBase, endpointUrl: string, params: Find clientName: "EndpointFetcher", - // use same connectionStrategy as parent connectionStrategy: params.connectionStrategy, // connectionStrategy: { @@ -326,7 +326,11 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase * true if the client is trying to reconnect to the server after a connection break. */ get isReconnecting(): boolean { - return !!(this._secureChannel && this._secureChannel.isConnecting) || this._internalState !== "connected"; + return ( + !!(this._secureChannel && this._secureChannel.isConnecting) || + this._internalState === "reconnecting_newchannel_connected" || + this._internalState === "reconnecting" + ); } /** @@ -373,7 +377,6 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase public _sessions: ClientSessionImpl[]; protected _serverEndpoints: EndpointDescription[]; public _secureChannel: ClientSecureChannelLayer | null; - protected disconnecting: boolean; // statistics... private _byteRead: number; @@ -384,7 +387,7 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase private _transactionsPerformed: number; private _reconnectionIsCanceled: boolean; private _clockAdjuster?: ClockAdjustment; - private _tmpClient?: OPCUAClientBase; + protected _tmpClient?: OPCUAClientBase; private _instanceNumber: number; private _transportSettings: TransportSettings; private _transportTimeout?: number; @@ -392,7 +395,12 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase public clientCertificateManager: OPCUACertificateManager; public isUnusable() { - return this._internalState === "disconnected" || this._internalState === "disconnecting"; + return ( + this._internalState === "disconnected" || + this._internalState === "disconnecting" || + this._internalState === "panic" || + this._internalState === "uninitialized" + ); } protected _setInternalState(internalState: InternalClientState): void { @@ -405,6 +413,12 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase chalk.yellow(internalState) ); } + if (this._internalState === "disconnecting" || this._internalState === "disconnected") { + if (internalState === "reconnecting") { + errorLog("Internal error, cannot switch to reconnecting when already disconnecting"); + } // when disconnecting, we cannot accept any other state + } + this._internalState = internalState; } public emit(eventName: string | symbol, ...others: any[]): boolean { @@ -425,8 +439,10 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase super(options as IOPCUASecureObjectOptions); + this._setInternalState("uninitialized"); + this._instanceNumber = g_ClientCounter++; - this._internalState = "uninitialized"; + this.applicationName = options.applicationName || "NodeOPCUA-Client"; assert(!this.applicationName.match(/^locale=/), "applicationName badly converted from LocalizedText"); assert(!this.applicationName.match(/urn:/), "applicationName should not be a URI"); @@ -434,8 +450,6 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase // we need to delay _applicationUri initialization this._applicationUri = options.applicationUri || this._getBuiltApplicationUri(); - this.disconnecting = false; - this.clientCertificateManager = options.clientCertificateManager; this._secureChannel = null; @@ -494,13 +508,13 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase warningLog("internal error: _cancel_reconnection should only be used when reconnecting is in progress"); } - debugLog("canceling reconnection"); + debugLog("canceling reconnection : ", this.clientName); this._reconnectionIsCanceled = true; // istanbul ignore next if (!this._secureChannel) { - debugLog("_cancel_reconnection: Nothing to do !"); + debugLog("_cancel_reconnection: Nothing to do for !", this.clientName, " because secure channel doesn't exist"); return callback(); // nothing to do } @@ -652,7 +666,7 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase assert(this._secureChannel === null); assert(typeof this.endpointUrl === "string"); - debugLog("_internal_create_secure_channel creating new ClientSecureChannelLayer _internalState =", this._internalState); + debugLog("_internal_create_secure_channel creating new ClientSecureChannelLayer _internalState =", this._internalState, this.clientName); const secureChannel = new ClientSecureChannelLayer({ connectionStrategy, defaultSecureTokenLifetime: this.defaultSecureTokenLifetime, @@ -686,7 +700,6 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase debugLog("_internal_create_secure_channel after secureChannel.create"); if (!this._secureChannel) { debugLog("_secureChannel has been closed during the transaction !"); - assert(this.disconnecting); return innerCallback(new Error("Secure Channel Closed")); } if (!err) { @@ -699,10 +712,11 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase (innerCallback: ErrorCallback) => { assert(this._secureChannel !== null); if (!this.knowsServerEndpoint) { + this._setInternalState("connecting"); + this.getEndpoints((err: Error | null /*, endpoints?: EndpointDescription[]*/) => { if (!this._secureChannel) { debugLog("_secureChannel has been closed during the transaction ! (while getEndpoints)"); - assert(this.disconnecting); return innerCallback(new Error("Secure Channel Closed")); } innerCallback(err ? err : undefined); @@ -715,7 +729,7 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase ], (err) => { if (err) { - doDebug && debugLog("Inner create secure channel has failed", err.message); + doDebug && debugLog(this.clientName, " : Inner create secure channel has failed", err.message); if (this._secureChannel) { this._secureChannel!.abortConnection(() => { this._destroy_secure_channel(); @@ -770,7 +784,6 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase return; } warningLog("Creating default certificate ... please wait"); - if (this.disconnecting) return; await ClientBaseImpl.createCertificate( this.clientCertificateManager, @@ -812,14 +825,14 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase if (!fs.existsSync(this.privateKeyFile)) { throw new Error(" cannot locate private key file " + this.privateKeyFile); } - if (this.disconnecting) return; + if (this.isUnusable()) return; await this.clientCertificateManager.withLock2(async () => { await performCertificateSanityCheck(this, "client", this.clientCertificateManager, this._getBuiltApplicationUri()); }); } - protected _internalState: InternalClientState; + protected _internalState: InternalClientState = "uninitialized"; protected _handleUnrecoverableConnectionFailure(err: Error, callback: ErrorCallback): void { debugLog(err.message); @@ -870,12 +883,11 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase } this._setInternalState("connecting"); - this.disconnecting = false; this.initializeCM() .then(() => { - debugLog("ClientBaseImpl#connect ", endpointUrl); - if (this.disconnecting) { + debugLog("ClientBaseImpl#connect ", endpointUrl, this.clientName); + if (this._internalState === "disconnecting" || this._internalState === "disconnected") { return this._handleDisconnectionWhileConnecting(new Error("premature disconnection 1"), callback); } @@ -888,7 +900,7 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase if (err) { return this._handleUnrecoverableConnectionFailure(err, callback); } - if (this.disconnecting) { + if (this.isUnusable()) { return this._handleDisconnectionWhileConnecting(new Error("premature disconnection 2"), callback); } if (forceEndpointDiscoveryOnConnect) { @@ -919,7 +931,8 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase this._clockAdjuster = this._clockAdjuster || new ClockAdjustment(); OPCUAClientBase.registry.register(this); - debugLog("__connectStep2"); + debugLog("__connectStep2 ", this._internalState); + this._internal_create_secure_channel(this.connectionStrategy, (err: Error | null) => { if (!err) { this._handleSuccessfulConnection(callback); @@ -961,16 +974,6 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase return this._secureChannel ? this._secureChannel.getClientNonce() : null; } - public requestReconnection() { - if (this._secureChannel) { - this._secureChannel.abortConnection(() => { - errorLog("new connection requested"); - /* - */ - }); - } - } - public performMessageTransaction(request: Request, callback: ResponseCallback): void { if (!this._secureChannel) { // this may happen if the Server has closed the connection abruptly for some unknown reason @@ -987,7 +990,7 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase ) { return callback( new Error( - "performMessageTransaction: Invalid client state " + + "performMessageTransaction: Invalid client state = " + this._internalState + " while performing a transaction " + request.schema.name @@ -1176,11 +1179,6 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase debugLog(chalk.bgWhite.green("_closeSession ") + this._secureChannel!.channelId); - if (this.isReconnecting) { - errorLog("OPCUAClientImpl#_closeSession called while reconnection in progress ! What shall we do"); - return callback(null); - } - const request = new CloseSessionRequest({ deleteSubscriptions }); @@ -1225,27 +1223,27 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase public disconnect(...args: any[]): any { const callback = args[0]; assert(typeof callback === "function", "expecting a callback function here"); - - if (this._internalState === "disconnected" || this._internalState === "disconnecting") { - if (this._internalState === "disconnecting") { - warningLog("[NODE-OPCUA-W26] OPCUAClient#disconnect called while already disconnecting"); - } - return callback(); - } - debugLog("disconnecting client! (will set reconnectionIsCanceled to true"); this._reconnectionIsCanceled = true; - this.disconnecting = true; - if (this._tmpClient) { + warningLog("disconnecting client while tmpClient exists", this._tmpClient.clientName); this._tmpClient.disconnect((err) => { this._tmpClient = undefined; - assert(!this._tmpClient); // retry disconnect on main client this.disconnect(callback); }); return; } + if (this._internalState === "disconnected" || this._internalState === "disconnecting") { + if (this._internalState === "disconnecting") { + warningLog("[NODE-OPCUA-W26] OPCUAClient#disconnect called while already disconnecting"); + } + return callback(); + } + debugLog("disconnecting client! (will set reconnectionIsCanceled to true"); + this._reconnectionIsCanceled = true; + debugLog("ClientBaseImpl#disconnect", this.endpointUrl); + if (this.isReconnecting && !this._reconnectionIsCanceled) { debugLog("ClientBaseImpl#disconnect called while reconnection is in progress"); // let's abort the reconnection process @@ -1599,21 +1597,21 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase debugLog(chalk.yellow.bold(" ClientBaseImpl emitting close"), err?.message); this._destroy_secure_channel(); if (!err || !this.reconnectOnFailure) { - // this is a normal close operation initiated by us - /** - * @event close - * @param error - */ if (err) { + /** + * @event connection_lost + */ this.emit("connection_lost", err?.message); // instead of "close" } + // this is a normal close operation initiated by us this.emit("close", err); // instead of "close" } else { - /** - * @event connection_lost - */ if (this.reconnectOnFailure && this._internalState !== "reconnecting") { debugLog(" ClientBaseImpl emitting connection_lost"); + this._setInternalState("reconnecting"); + /** + * @event connection_lost + */ this.emit("connection_lost", err?.message); // instead of "close" this._repairConnection(); } @@ -1630,24 +1628,27 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase }); } - private _inside_repairConnection = false; + #insideRepairConnection = false; - private shouldRepairAgain = false; + #shouldRepairAgain = false; /** * @internal * @private - * + * * timeout to wait before client attempt to reconnect in case of failure - * + * */ static retryDelay = 1000; private _repairConnection() { + doDebug && debugLog("_repairConnection = ", this._internalState); + if (this.isUnusable()) return; + const duration = ClientBaseImpl.retryDelay; if (duration) { this.emit("startingDelayBeforeReconnection", duration); setTimeout(() => { - this.emit("repairConnectionStarted"); + if (this.isUnusable()) return; this.__innerRepairConnection(); }, duration); } else { @@ -1655,12 +1656,16 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase } } private __innerRepairConnection() { - if (this._inside_repairConnection) { + if (this.isUnusable()) return; + + debugLog("Entering _repairConnection ", this._internalState); + if (this.#insideRepairConnection) { errorLog("_repairConnection already in progress internal state = ", this._internalState); - this.shouldRepairAgain = true; + this.#shouldRepairAgain = true; return; } - this._inside_repairConnection = true; + this.emit("repairConnectionStarted"); + this.#insideRepairConnection = true; debugLog("recreating new secure channel ", this._internalState); this._recreate_secure_channel((err1?: Error) => { debugLog("secureChannel#on(close) => _recreate_secure_channel returns ", err1 ? err1.message : "OK"); @@ -1668,14 +1673,16 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase if (err1) { debugLog("_recreate_secure_channel has failed: err = ", err1.message); this.emit("close", err1); - this._setInternalState("disconnected"); - this._inside_repairConnection = false; - if (this.shouldRepairAgain) { - this.shouldRepairAgain = false; + this.#insideRepairConnection = false; + if (this.#shouldRepairAgain) { + this.#shouldRepairAgain = false; this._repairConnection(); + } else { + this._setInternalState("disconnected"); } return; } else { + if (this.isUnusable()) return; this._finalReconnectionStep((err2?: Error | null) => { if (err2) { // istanbul ignore next @@ -1686,19 +1693,21 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase // we still need to retry connecting here !!! debugLog("Disconnected following reconnection failure", err2.message); debugLog(`I will retry OPCUA client reconnection in ${OPCUAClientBase.retryDelay / 1000} seconds`); - this._inside_repairConnection = false; - this.shouldRepairAgain = false; + this.#insideRepairConnection = false; + this.#shouldRepairAgain = false; this._destroy_secure_channel(); + // this._setInternalState("reconnecting_failed"); setTimeout(() => this._repairConnection(), OPCUAClientBase.retryDelay); + return; } else { /** * @event connection_reestablished * send when the connection is reestablished after a connection break */ - this._inside_repairConnection = false; - this.shouldRepairAgain = false; + this.#insideRepairConnection = false; + this.#shouldRepairAgain = false; this._setInternalState("connected"); this.emit("connection_reestablished"); } @@ -1756,13 +1765,9 @@ class TmpClient extends ClientBaseImpl { return; } - if (this.disconnecting) { - return this._handleUnrecoverableConnectionFailure(new Error("premature disconnection 3"), callback!); - } - this._setInternalState("connecting"); this._connectStep2(endpoint, (err?: Error) => { - if (this.disconnecting) { + if (this.isUnusable()) { return this._handleUnrecoverableConnectionFailure(new Error("premature disconnection 4"), callback!); } callback!(err); diff --git a/packages/node-opcua-client/source/private/client_session_impl.ts b/packages/node-opcua-client/source/private/client_session_impl.ts index af3d6e6677..7289656116 100644 --- a/packages/node-opcua-client/source/private/client_session_impl.ts +++ b/packages/node-opcua-client/source/private/client_session_impl.ts @@ -1492,28 +1492,12 @@ export class ClientSessionImpl extends EventEmitter implements ClientSession { return this._client !== null && this._client._secureChannel !== null && this._client._secureChannel.isOpened(); } - private requestReconnection() { - if (this._client) { - this._client.requestReconnection(); - assert(this._client.isReconnecting === true, "expecting client to be reconnecting now"); - } - } - public performMessageTransaction(request: Request, callback: (err: Error | null, response?: Response) => void): void { if (!this._client) { // session may have been closed by user ... but is still in used !! return callback(new Error("Session has been closed and should not be used to perform a transaction anymore")); } - // if (!this.isChannelValid()) { - // // the secure channel is broken, may be the server has crashed or the network cable has been disconnected - // // for a long time - // // we may need to queue this transaction, as a secure token may be being reprocessed - // errorLog(chalk.bgWhite.red("!!! Performing transaction on invalid channel !!! ", request.schema.name)); - // // this.requestReconnection(); - // return callback(new Error("!!! Performing transaction on invalid channel with " + request.schema.name + ": starting reconnection process")); - // } - this._reconnecting.pendingTransactions = this._reconnecting.pendingTransactions || []; this._reconnecting.pendingTransactionsCount = this._reconnecting.pendingTransactionsCount || 0; diff --git a/packages/node-opcua-client/source/private/i_private_client.ts b/packages/node-opcua-client/source/private/i_private_client.ts index 5d86629ef6..4983f4aadf 100644 --- a/packages/node-opcua-client/source/private/i_private_client.ts +++ b/packages/node-opcua-client/source/private/i_private_client.ts @@ -27,7 +27,5 @@ export interface IClientBase { getTransportSettings(): IBasicTransportSettings; - requestReconnection(): void; - isUnusable(): boolean; } diff --git a/packages/node-opcua-client/source/private/reconnection/client_reconnection.ts b/packages/node-opcua-client/source/private/reconnection/client_reconnection.ts new file mode 100644 index 0000000000..5e60bb7e3f --- /dev/null +++ b/packages/node-opcua-client/source/private/reconnection/client_reconnection.ts @@ -0,0 +1,17 @@ +import { warn } from "console"; +import { ClientBaseImpl } from "../client_base_impl"; + +export function waitUntilReconnectionIsCanceled(client: ClientBaseImpl, callback: () => void) { + const interval = 100; + const maxIntervalCount = 100; + let intervalCount = 0; + const timer = setInterval(() => { + if (!client.isReconnecting || ++intervalCount > maxIntervalCount) { + clearInterval(timer); + if (intervalCount > maxIntervalCount) { + warn("waitUntilReconnectionIsCanceled: timeout"); + } + callback(); + } + }, interval); +} diff --git a/packages/node-opcua-client/source/private/reconnection/reconnection.ts b/packages/node-opcua-client/source/private/reconnection/reconnection.ts index cb01c4e6f6..6a57aff9ec 100644 --- a/packages/node-opcua-client/source/private/reconnection/reconnection.ts +++ b/packages/node-opcua-client/source/private/reconnection/reconnection.ts @@ -481,12 +481,17 @@ export function repair_client_session(client: IClientBase, session: ClientSessio doDebug && debugLog("Aborting reactivation of old session because user requested session to be close"); return callback(); } - doDebug && debugLog(chalk.yellow("Starting client session repair")); const privateSession = session as any as Reconnectable; privateSession._reconnecting = privateSession._reconnecting || { reconnecting: false, pendingCallbacks: [] }; + if (session.hasBeenClosed()) { + + privateSession._reconnecting.reconnecting = false; + doDebug && debugLog("Aborting reactivation of old session because session has been closed"); + return callback(); + } if (privateSession._reconnecting.reconnecting) { doDebug && debugLog(chalk.bgCyan("Reconnecting already happening for session"), session.sessionId.toString()); privateSession._reconnecting.pendingCallbacks.push(callback); diff --git a/packages/node-opcua-end2end-test/test/discovery/_helper.ts b/packages/node-opcua-end2end-test/test/discovery/_helper.ts index fac9d41c21..e02dce1f71 100644 --- a/packages/node-opcua-end2end-test/test/discovery/_helper.ts +++ b/packages/node-opcua-end2end-test/test/discovery/_helper.ts @@ -79,7 +79,7 @@ export async function createServerThatRegistersItselfToTheDiscoveryServer( registerServerMethod: RegisterServerMethod.LDS, certificateFile, - serverCertificateManager + serverCertificateManager, }); server.discoveryServerEndpointUrl.should.eql(discoveryServerEndpointUrl); diff --git a/packages/node-opcua-end2end-test/test/discovery/u_test_frequent_server_restart.ts b/packages/node-opcua-end2end-test/test/discovery/u_test_frequent_server_restart.ts index c76c998a86..1f23a1f6a1 100644 --- a/packages/node-opcua-end2end-test/test/discovery/u_test_frequent_server_restart.ts +++ b/packages/node-opcua-end2end-test/test/discovery/u_test_frequent_server_restart.ts @@ -469,7 +469,9 @@ export function t(test: any) { wait_a_few_seconds, shutdownServer, - wait_a_few_seconds + wait_a_few_seconds, + wait_a_few_seconds, + ], done ); diff --git a/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_1331.ts b/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_1331.ts index dafe2f6d75..13d7e829f1 100644 --- a/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_1331.ts +++ b/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_1331.ts @@ -33,13 +33,17 @@ describe("Testing client.isReconnecting flag behavior", function (this: Mocha.Te await server.shutdown(); }); - it("client.isReconnection should be true when client emits reconnection event", async () => { + it("client.isReconnecting should be true when client emits reconnection event", async () => { client = OPCUAClient.create({}); let isReconnectingValueWhenConnectionLostEventIsEmitted = false; client.on("connection_lost", () => { isReconnectingValueWhenConnectionLostEventIsEmitted = client.isReconnecting; }); + let isReconnectingValueWhenStartReconnectionIsEmitted = false; + client.on("start_reconnection", () => { + isReconnectingValueWhenStartReconnectionIsEmitted = client.isReconnecting; + }); let isReconnectingValueWhenReconnectionEventIsEmitted = false; client.on("connection_reestablished", () => { @@ -51,7 +55,7 @@ describe("Testing client.isReconnecting flag behavior", function (this: Mocha.Te await new Promise((resolve) => { server.shutdown(); - client.once("connection_lost", resolve); + client.once("start_reconnection", resolve); }); client.isReconnecting.should.eql(true); @@ -67,6 +71,7 @@ describe("Testing client.isReconnecting flag behavior", function (this: Mocha.Te await client.disconnect(); isReconnectingValueWhenConnectionLostEventIsEmitted.should.be.eql(true); + isReconnectingValueWhenStartReconnectionIsEmitted.should.be.eql(true); isReconnectingValueWhenReconnectionEventIsEmitted.should.be.eql(false); }); }); diff --git a/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_client_on_close_events.js b/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_client_on_close_events.js index 9348616f82..c4700ce8a3 100644 --- a/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_client_on_close_events.js +++ b/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_client_on_close_events.js @@ -1,12 +1,9 @@ "use strict"; const should = require("should"); -const async = require("async"); const sinon = require("sinon"); -const { OPCUAClient, OPCUAServer, get_empty_nodeset_filename, nodesets } = require("node-opcua"); - -const empty_nodeset_filename = get_empty_nodeset_filename(); +const { OPCUAClient, OPCUAServer, nodesets } = require("node-opcua"); const { make_debugLog, checkDebugFlag } = require("node-opcua-debug"); const debugLog = make_debugLog("TEST"); @@ -20,31 +17,24 @@ describe("testing Client-Server - Event", function () { let server; let endpointUrl; - function start_server(done) { + async function start_server(done) { server = new OPCUAServer({ port, nodeset_filename: nodesets.ua, serverCapabilities: { maxSessions: 10 } }); - server.start(function () { - endpointUrl = server.getEndpointUrl(); - done(); - }); + await server.start(); + endpointUrl = server.getEndpointUrl(); } - - function end_server(done) { + async function end_server() { if (server) { - server.shutdown(function () { - server = null; - done(); - }); - } else { - done(); + await server.shutdown(); + server = null; } } - it("TSC-1 should raise a close event once on normal disconnection", function (done) { + it("TSC-1 should raise a close event once on normal disconnection", async () => { let close_counter = 0; const client = OPCUAClient.create(); @@ -57,34 +47,24 @@ describe("testing Client-Server - Event", function () { close_counter++; }); - async.series( - [ - function (callback) { - debugLog(" --> Starting server"); - start_server(callback); - }, - function (callback) { - debugLog(" --> Connecting Client"); - client.connect(endpointUrl, callback); - }, - function (callback) { - close_counter.should.eql(0); - debugLog(" --> Disconnecting Client"); - client.disconnect(callback); - }, - function (callback) { - close_counter.should.eql(1); - callback(null); - }, - function (callback) { - debugLog(" --> Stopping server"); - end_server(callback); - } - ], - done - ); + debugLog(" --> Starting server"); + await start_server(); + + try { + debugLog(" --> Connecting Client"); + await client.connect(endpointUrl); + close_counter.should.eql(0); + + debugLog(" --> Disconnecting Client"); + await client.disconnect(); + + close_counter.should.eql(1); + } finally { + debugLog(" --> stopping server"); + await end_server(); + } }); - it("TSC-2 client (not reconnecting) should raise a close event with an error when server initiates disconnection", function (done) { + it("TSC-2 client (not reconnecting) should raise a close event with an error when server initiates disconnection", async () => { // note : client is not trying to reconnect const options = { connectionStrategy: { @@ -99,50 +79,35 @@ describe("testing Client-Server - Event", function () { const _client_received_close_event = sinon.spy(); client.on("close", _client_received_close_event); - async.series( - [ - function (callback) { - debugLog(" --> Starting server"); - start_server(callback); - }, - function (callback) { - debugLog(" --> Connecting Client"); - client.connect(endpointUrl, callback); - }, - function (callback) { - _client_received_close_event.callCount.should.eql(0); - debugLog(" --> Stopping server"); - end_server(function () { - callback(); - }); - }, - - // wait a little bit , to relax client - function (callback) { - setTimeout(callback, 100); - }, - - function (callback) { - _client_received_close_event.callCount.should.eql(1); - _client_received_close_event.getCall(0).args.length.should.eql(1); - should(_client_received_close_event.getCall(0).args[0]).not.eql(null); - _client_received_close_event.getCall(0).args[0].message.should.match(/disconnected by third party/); - callback(); - }, - function (callback) { - client.disconnect(callback); - } - ], - done - ); + debugLog(" --> Starting server"); + await start_server(); + debugLog(" --> Connecting Client"); + await client.connect(endpointUrl); + + _client_received_close_event.callCount.should.eql(0); + debugLog(" --> Stopping server"); + await end_server(); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + _client_received_close_event.callCount.should.eql(1); + _client_received_close_event.getCall(0).args.length.should.eql(1); + should(_client_received_close_event.getCall(0).args[0]).not.eql(null); + _client_received_close_event.getCall(0).args[0].message.should.match(/disconnected by third party/); + + await client.disconnect(); + _client_received_close_event.callCount.should.eql(1); + _client_received_close_event.getCall(0).args.length.should.eql(1); + should(_client_received_close_event.getCall(0).args[0]).not.eql(null); + _client_received_close_event.getCall(0).args[0].message.should.match(/disconnected by third party/); }); - it("TSC-3 client (reconnecting) should raise a close event with an error when server initiates disconnection (after reconnecting has failed)", function (done) { + it("TSC-3 client (reconnecting) should raise a close event with an error when server initiates disconnection (after reconnecting has failed)", async () => { // note : client will try to reconnect and eventually fail ... const options = { connectionStrategy: { - initialDelay: 10, - maxDelay: 20, + initialDelay: 100, + maxDelay: 200, maxRetry: 1, // <= RETRY randomisationFactor: 0 } @@ -154,55 +119,50 @@ describe("testing Client-Server - Event", function () { const _client_backoff_event = sinon.spy(); client.on("backoff", _client_backoff_event); + client.on("backoff", () => { - debugLog("client attempt to connect"); + debugLog("client.on('backoff'): client is attempting to connect", "isReconnecting=", client.isReconnecting); + }); + + client.on("close", function (err) { + debugLog(" 8 --> client has sent 'close' event", err ? err.message : null); + }); + + debugLog(" 1--> Starting server"); + await start_server(); + + debugLog(" 2--> Connecting Client"); + await client.connect(endpointUrl); + _client_received_close_event.callCount.should.eql(0); + + debugLog(" 3 -> Stopping server and wait for client to detect that server has shutdown abruptly"); + await new Promise((resolve, reject) => { + client.once("connection_lost", () => { + debugLog(" )> received connection_lost event"); + resolve(); + }); + debugLog(" 4 --> Stopping server"); + end_server().then(() => { + debugLog(" 5 --> Server stopped"); + }); }); - async.series( - [ - function (callback) { - debugLog(" 1--> Starting server"); - start_server(callback); - }, - function (callback) { - debugLog(" 2--> Connecting Client"); - client.connect(endpointUrl, callback); - }, - function (callback) { - _client_received_close_event.callCount.should.eql(0); - - client.once("connection_lost", function () { - debugLog(" 4 or 5--> client has detected that server has shutdown abruptly"); - debugLog(" and will try to reconnect"); - - setTimeout(() => { - debugLog(" 6--> disconnecting client (while reconnecting)"); - client.disconnect(() => { - debugLog(" 8 --> client has been disconnected"); - callback(); - }); - }, 5000); // let's give client some time to attempt a reconnection - }); - client.on("close", function (err) { - debugLog(" 8 --> client has sent 'close' event", err ? err.message : null); - //xx should.exist(err); - }); - - debugLog(" 3--> Stopping server"); - end_server(function () { - debugLog(" 4 or 5 --> Server stopped"); - }); - }, - - function (callback) { - _client_backoff_event.callCount.should.be.greaterThan(0); - _client_received_close_event.callCount.should.eql(1); - should.exist(_client_received_close_event.getCall(0).args[0]); - _client_received_close_event.getCall(0).args[0].message.should.match(/Reconnection has been canceled/); - callback(); - } - ], - done - ); + debugLog(" 6 --> client has detected that server has shutdown abruptly"); + debugLog(" and will try to reconnect"); + // wait for client to attempt to reconnect + debugLog(" 7 ---> now waiting for client to attempt to reconnect"); + await new Promise((resolve) => setTimeout(resolve, 2000)); + client.isReconnecting.should.eql(true); + debugLog(" 6 --> client now reconnecting"); + + // let's give client some time to attempt a reconnection + debugLog(" 6--> disconnecting client (while reconnecting, but server not present)"); + await client.disconnect(); + debugLog(" 8 --> client has been disconnected"); + + _client_backoff_event.callCount.should.be.greaterThan(0); + _client_received_close_event.callCount.should.eql(1); // TO CHECK + should.exist(_client_received_close_event.getCall(0).args[0], "expecting an error in the close event"); + _client_received_close_event.getCall(0).args[0].message.should.match(/Reconnection has been canceled/); }); }); diff --git a/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_large_subscription_and_reconnection.ts b/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_large_subscription_and_reconnection.ts index 0e9b2f19ac..49ae1e9c51 100644 --- a/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_large_subscription_and_reconnection.ts +++ b/packages/node-opcua-end2end-test/test/end_to_end/test_e2e_large_subscription_and_reconnection.ts @@ -51,7 +51,7 @@ describe("[CLIENT] recreating large subscription during reconnection", () => { response.constructor.name === "CreateSubscriptionResponse" || response.constructor.name === "CreateMonitoredItemsResponse" ) { - // console.log(response.toString()); + console.log(response.toString()); } if (response.constructor.name === "CreateMonitoredItemsResponse") { createMonitoredItemsResponses.push(response as CreateMonitoredItemsResponse); @@ -79,6 +79,10 @@ describe("[CLIENT] recreating large subscription during reconnection", () => { createMonitoredItemsResponses.length.should.eql(maxMonitoredItemsPerCall + 1); + + + // now clear the monitored items + createMonitoredItemsResponses = []; // When the server stops and restarts @@ -93,6 +97,7 @@ describe("[CLIENT] recreating large subscription during reconnection", () => { server = await startServer(maxMonitoredItemsPerCall); + // wait until reconnection is completed while (session.isReconnecting && !isSessionRestored) { await pause(100); diff --git a/packages/node-opcua-end2end-test/test/secure_connection/test_e2e_secure_all_possibe_secure_connection.ts b/packages/node-opcua-end2end-test/test/secure_connection/test_e2e_secure_all_possibe_secure_connection.ts index 788e27a543..6ebbbf7047 100644 --- a/packages/node-opcua-end2end-test/test/secure_connection/test_e2e_secure_all_possibe_secure_connection.ts +++ b/packages/node-opcua-end2end-test/test/secure_connection/test_e2e_secure_all_possibe_secure_connection.ts @@ -111,7 +111,7 @@ export interface InnerServer { temperatureVariableId: NodeId; server: OPCUAServer; } -async function start_inner_server_local(options: OPCUAServerOptions): Promise { +async function start_inner_server_local(options?: OPCUAServerOptions): Promise { options = options || {}; if (options.serverCertificateManager) { throw new Error("start_inner_server_local: serverCertificateManager should not be defined"); @@ -220,7 +220,7 @@ async function trustCertificateOnClient(): Promise { } } -async function start_server(options: OPCUAServerOptions): Promise { +async function start_server(options?: OPCUAServerOptions): Promise { // Given a server that have a signed end point const data = await start_inner_server_local(options); @@ -577,13 +577,14 @@ describe("ZZB- testing Secure Client-Server communication", function (this: any) it("QQQ1 a client shall be able to establish a SIGNED connection with a server", async () => { should.exist(serverCertificate); server.currentChannelCount.should.equal(0); + const clientCertificateManager = await getClientCertificateManager(); const options = { securityMode: MessageSecurityMode.Sign, securityPolicy: SecurityPolicy.Basic128Rsa15, serverCertificate: serverCertificate, connectionStrategy: no_reconnect_connectivity_strategy, - clientCertificateManager: await getClientCertificateManager() + clientCertificateManager }; const client = OPCUAClient.create(options); await trustClientCertificateOnServer(client); diff --git a/packages/node-opcua-end2end-test/test/test_issue_1162.ts b/packages/node-opcua-end2end-test/test/test_issue_1162.ts index 23992a70ad..1e1aef521d 100644 --- a/packages/node-opcua-end2end-test/test/test_issue_1162.ts +++ b/packages/node-opcua-end2end-test/test/test_issue_1162.ts @@ -171,8 +171,8 @@ describe("Testing automatic reconnection to a server when credential have change await wait(10 * 1000); console.log("client.isReconnecting = ", client.isReconnecting); await wait(10 * 1000); - console.log("client.isReconnecting = ", client.isReconnecting); - await wait(10 * 1000); + // console.log("client.isReconnecting = ", client.isReconnecting); + // await wait(10 * 1000); console.log("client.isReconnecting = ", client.isReconnecting); client.isReconnecting.should.eql(true, "client should be trying to reconnect constantly without success"); } finally { diff --git a/packages/node-opcua-secure-channel/source/client/client_secure_channel_layer.ts b/packages/node-opcua-secure-channel/source/client/client_secure_channel_layer.ts index 5ed6f67ab1..10a2efbf31 100644 --- a/packages/node-opcua-secure-channel/source/client/client_secure_channel_layer.ts +++ b/packages/node-opcua-secure-channel/source/client/client_secure_channel_layer.ts @@ -316,8 +316,8 @@ export class ClientSecureChannelLayer extends EventEmitter { private _bytesWritten = 0; private _timeDrift = 0; - public static minTransactionTimeout = 5 * 1000; // 10 sec - public static defaultTransactionTimeout = 15 * 1000; // 15 minute + public static minTransactionTimeout = 5 * 1000; // 5 sec + public static defaultTransactionTimeout = 15 * 1000; // 15 seconds public static defaultTransportTimeout = 60 * 1000; // 60 seconds /** @@ -701,8 +701,7 @@ export class ClientSecureChannelLayer extends EventEmitter { } this._isDisconnecting = true; doDebug && debugLog("abortConnection ", !!this.__call); - assert(typeof callback === "function"); - + async.series( [ (inner_callback: ErrorCallback) => { diff --git a/packages/node-opcua-server/source/register_server_manager.ts b/packages/node-opcua-server/source/register_server_manager.ts index 9c2a7c18b5..cb53b89879 100644 --- a/packages/node-opcua-server/source/register_server_manager.ts +++ b/packages/node-opcua-server/source/register_server_manager.ts @@ -258,6 +258,8 @@ export interface RegisterServerManagerOptions { server: IPartialServer; discoveryServerEndpointUrl: string; } + +let g_registeringClientCounter = 0; /** * RegisterServerManager is responsible to Register an opcua server on a LDS or LDS-ME server * This class takes in charge : @@ -325,7 +327,7 @@ export class RegisterServerManager extends EventEmitter implements IRegisterServ clearTimeout(this._registrationTimerId); this._registrationTimerId = null; } - + assert(this._registrationTimerId === null, "stop has not been called"); this.removeAllListeners(); } @@ -406,13 +408,16 @@ export class RegisterServerManager extends EventEmitter implements IRegisterServ // Retry Strategy must be set this.server.serverCertificateManager.referenceCounter++; + + const prefix = "Client-" + g_registeringClientCounter++ + " - "; + const registrationClient = OPCUAClientBase.create({ - clientName: this.server.serverInfo.applicationUri!, + clientName: prefix + " Registering Client for Server " + this.server.serverInfo.applicationUri!, applicationName, applicationUri: this.server.serverInfo.applicationUri!, - + connectionStrategy: infinite_connectivity_strategy, clientCertificateManager: this.server.serverCertificateManager, @@ -426,6 +431,7 @@ export class RegisterServerManager extends EventEmitter implements IRegisterServ registrationClient.on("backoff", (nbRetry: number, delay: number) => { debugLog("RegisterServerManager - received backoff"); warningLog( + registrationClient.clientName, chalk.bgWhite.cyan("contacting discovery server backoff "), this.discoveryServerEndpointUrl, " attempt #", @@ -637,15 +643,15 @@ export class RegisterServerManager extends EventEmitter implements IRegisterServ return outer_callback(); } if (!(this.state === RegisterServerManagerStatus.INITIALIZING || this.state === RegisterServerManagerStatus.WAITING)) { - warningLog("Warning : cannot register server - wrong state " , RegisterServerManagerStatus[this.state]); + warningLog("Warning : cannot register server - wrong state ", RegisterServerManagerStatus[this.state]); return outer_callback(); - }; + } this._setState(theStatus); if (this._registration_client) { warningLog( - `Warning there is already a registering/unregistering task taking place: ${ + `Warning there is already a registering/un-registering task taking place: ${ RegisterServerManagerStatus[this.state] } state` ); @@ -760,7 +766,7 @@ export class RegisterServerManager extends EventEmitter implements IRegisterServ this._cancel_pending_client_if_any(callback); }); } else { - debugLog("RegisterServerManager#_cancel_pending_client_if_any : done"); + debugLog("RegisterServerManager#_cancel_pending_client_if_any : done (nothing to do)"); callback(); } } diff --git a/packages/playground/polling_client.ts b/packages/playground/polling_client.ts index 6505e21507..b7acd09b64 100644 --- a/packages/playground/polling_client.ts +++ b/packages/playground/polling_client.ts @@ -66,6 +66,12 @@ async function main() { const session = await client.createSession(); + session.on("session_restored", ()=>{ + console.log(">> Session restored"); + }); + session.on("session_closed", ()=>{ + console.log(">> session closed"); + }) // Note: this example demonstrate how polling can be used in OPCUA , // Note that Pooling is **not** the recommended way to monitored // change of a UA Variable! Use Subscription instead ....