Skip to content

Commit

Permalink
improve reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
erossignon committed Apr 7, 2024
1 parent 2590d1a commit 38caa35
Show file tree
Hide file tree
Showing 20 changed files with 332 additions and 371 deletions.
4 changes: 2 additions & 2 deletions packages/node-opcua-address-space/test/test_issue_1342.ts
Expand Up @@ -2,13 +2,13 @@ import should from "should";
import sinon from "sinon";
import { nodesets } from "node-opcua-nodesets";
import { DataType } from "node-opcua-basic-types";
import { Variant } from "node-opcua-variant";
import { AddressSpace, Namespace } from "..";
import { generateAddressSpace } from "../distNodeJS";
import { Variant } from "node-opcua-variant";

const describe = require("node-opcua-leak-detector").describeWithLeakDetector;
describe("testing github issue https://github.com/node-opcua/node-opcua/issues/1342", function (this: Mocha.Suite) {
this.timeout(10000);
this.timeout(Math.max(this.timeout(), 20000));
let addressSpace: AddressSpace;
let namespace: Namespace;

Expand Down
4 changes: 2 additions & 2 deletions packages/node-opcua-address-space/test/test_issue_314.ts
Expand Up @@ -11,7 +11,7 @@ import { generateAddressSpace } from "../nodeJS";
// tslint:disable-next-line:no-var-requires
const describe = require("node-opcua-leak-detector").describeWithLeakDetector;
describe("testing loading ExtensionObject value from NodeSet XML file", function (this: any) {
this.timeout(20000); // could be slow on appveyor !
this.timeout(Math.max(this.timeout(), 30000)); // could be slow on appveyor !

let addressSpace: AddressSpace;

Expand All @@ -37,7 +37,7 @@ describe("testing loading ExtensionObject value from NodeSet XML file", function

const dataValue = node.readValue();
dataValue.value.dataType.should.eql(DataType.ExtensionObject);

// console.log("xx ", dataValue.value.toString());

dataValue.value.value.constructor.name.should.eql("EUInformation");
Expand Down
175 changes: 90 additions & 85 deletions packages/node-opcua-client/source/private/client_base_impl.ts

Large diffs are not rendered by default.

22 changes: 2 additions & 20 deletions packages/node-opcua-client/source/private/client_session_impl.ts
Expand Up @@ -1492,28 +1492,12 @@ export class ClientSessionImpl extends EventEmitter implements ClientSession {
return this._client !== null && this._client._secureChannel !== null && this._client._secureChannel.isOpened();
}

private requestReconnection() {
if (this._client) {
this._client.requestReconnection();
assert(this._client.isReconnecting === true, "expecting client to be reconnecting now");
}
}

public performMessageTransaction(request: Request, callback: (err: Error | null, response?: Response) => void): void {
if (!this._client) {
// session may have been closed by user ... but is still in used !!
return callback(new Error("Session has been closed and should not be used to perform a transaction anymore"));
}

// if (!this.isChannelValid()) {
// // the secure channel is broken, may be the server has crashed or the network cable has been disconnected
// // for a long time
// // we may need to queue this transaction, as a secure token may be being reprocessed
// errorLog(chalk.bgWhite.red("!!! Performing transaction on invalid channel !!! ", request.schema.name));
// // this.requestReconnection();
// return callback(new Error("!!! Performing transaction on invalid channel with " + request.schema.name + ": starting reconnection process"));
// }

this._reconnecting.pendingTransactions = this._reconnecting.pendingTransactions || [];
this._reconnecting.pendingTransactionsCount = this._reconnecting.pendingTransactionsCount || 0;

Expand Down Expand Up @@ -2216,6 +2200,7 @@ export class ClientSessionImpl extends EventEmitter implements ClientSession {
request: Request,
callback: (err: Error | null, response?: Response) => void
) {
warningLog("attempt to recreate session to reperform a transation ", request.constructor.name);
if (this.recursive_repair_detector >= 1) {
// tslint:disable-next-line: no-console
warningLog("recreate_session_and_reperform_transaction => Already in Progress");
Expand All @@ -2241,10 +2226,7 @@ type promoteOpaqueStructure3WithCallbackFunc = (
callback: ErrorCallback
) => void;

async function promoteOpaqueStructure2(
session: IBasicSessionAsync2,
callMethodResult: CallMethodResult
): Promise<void> {
async function promoteOpaqueStructure2(session: IBasicSessionAsync2, callMethodResult: CallMethodResult): Promise<void> {
if (!callMethodResult || !callMethodResult.outputArguments || callMethodResult.outputArguments.length === 0) {
return;
}
Expand Down
2 changes: 0 additions & 2 deletions packages/node-opcua-client/source/private/i_private_client.ts
Expand Up @@ -27,7 +27,5 @@ export interface IClientBase {

getTransportSettings(): IBasicTransportSettings;

requestReconnection(): void;

isUnusable(): boolean;
}
@@ -0,0 +1,17 @@
import { warn } from "console";
import { ClientBaseImpl } from "../client_base_impl";

export function waitUntilReconnectionIsCanceled(client: ClientBaseImpl, callback: () => void) {
const interval = 100;
const maxIntervalCount = 100;
let intervalCount = 0;
const timer = setInterval(() => {
if (!client.isReconnecting || ++intervalCount > maxIntervalCount) {
clearInterval(timer);
if (intervalCount > maxIntervalCount) {
warn("waitUntilReconnectionIsCanceled: timeout");
}
callback();
}
}, interval);
}
Expand Up @@ -25,6 +25,13 @@ const doDebug = checkDebugFlag("RECONNECTION");
const errorLog = make_errorLog("RECONNECTION");
const warningLog = make_warningLog("RECONNECTION");

function _shouldNotContinue3(client: IClientBase) {
if (!client._secureChannel) {
return new Error("Failure during reconnection : client or session is not usable anymore");
}
return null;
}

export function _shouldNotContinue(session: ClientSessionImpl) {
if (!session._client || session.hasBeenClosed() || !session._client._secureChannel || session._client.isUnusable()) {
return new Error("Failure during reconnection : client or session is not usable anymore");
Expand Down Expand Up @@ -227,9 +234,10 @@ function repair_client_session_by_recreating_a_new_session(
});
},
(err1?: Error | null) => {
// prettier-ignore
{ const err = _shouldNotContinue(session); if (err) { return innerCallback(err); } }
if (!err1) {
// prettier-ignore
{ const err = _shouldNotContinue(session); if (err) { return innerCallback(err); } }
}
innerCallback(err1!);
}
Expand Down Expand Up @@ -328,25 +336,10 @@ function repair_client_session_by_recreating_a_new_session(
subscriptionsToTransfer,
(err: Error | null, transferSubscriptionsResponse?: TransferSubscriptionsResponse) => {
// may be the connection with server has been disconnected
// prettier-ignore
{ const err = _shouldNotContinue(session); if (err) { return innerCallback(err); } }

if (err || !transferSubscriptionsResponse) {
warningLog(chalk.bgCyan("Warning TransferSubscription has failed " + err?.message));
if (client._secureChannel === null) {
warningLog(
chalk.bgCyan(
"the connection has been lost while transferring subscription.\nWe need to re-establish the connection first and start over."
)
);
return innerCallback(new Error("No secure channel"));
}
if (session.hasBeenClosed()) {
warningLog(
chalk.bgCyan("Cannot complete subscription transferSubscription due to session termination")
);
return innerCallback(
new Error("Cannot complete subscription transferSubscription due to session termination")
);
}
warningLog(chalk.bgCyan("May be the server is not supporting this feature"));
// when transfer subscription has failed, we have no other choice but
// recreate the subscriptions on the server side
Expand Down Expand Up @@ -440,6 +433,8 @@ function _repair_client_session(client: IClientBase, session: ClientSessionImpl,
debugLog("Session repair completed with err: ", err2 ? err2.message : "<no error>", session.sessionId.toString());
if (!err2) {
session.emit("session_repaired");
} else {
session.emit("session_repaired_failed", err2);
}
callback(err2);
};
Expand All @@ -461,10 +456,6 @@ function _repair_client_session(client: IClientBase, session: ClientSessionImpl,
//
doDebug && debugLog(" ActivateSession : ", err ? chalk.red(err.message) : chalk.green(" SUCCESS !!! "));
if (err) {
if (!client._secureChannel || session.hasBeenClosed()) {
warningLog("reconnection: disconnection happened while we were trying to reactivate existing session");
return callback2(new Error("No secure channel"));
}
// activate old session has failed => let's recreate a new Channel and transfer the subscription
return repair_client_session_by_recreating_a_new_session(client, session, callback2);
} else {
Expand All @@ -481,14 +472,18 @@ export function repair_client_session(client: IClientBase, session: ClientSessio
doDebug && debugLog("Aborting reactivation of old session because user requested session to be close");
return callback();
}

doDebug && debugLog(chalk.yellow("Starting client session repair"));

const privateSession = session as any as Reconnectable;
privateSession._reconnecting = privateSession._reconnecting || { reconnecting: false, pendingCallbacks: [] };

if (session.hasBeenClosed()) {
privateSession._reconnecting.reconnecting = false;
doDebug && debugLog("Aborting reactivation of old session because session has been closed");
return callback();
}
if (privateSession._reconnecting.reconnecting) {
doDebug && debugLog(chalk.bgCyan("Reconnecting already happening for session"), session.sessionId.toString());
doDebug && debugLog(chalk.bgCyan("Reconnection is already happening for session"), session.sessionId.toString());
privateSession._reconnecting.pendingCallbacks.push(callback);
return;
}
Expand All @@ -503,17 +498,16 @@ export function repair_client_session(client: IClientBase, session: ClientSessio
{ const err = _shouldNotContinue(session); if (err) { return callback(err); }}

_repair_client_session(client, session, (err) => {
// prettier-ignore
{ const err = _shouldNotContinue(session); if (err) { return callback(err); } }

if (err) {
errorLog(
chalk.red("session restoration has failed! err ="),
err.message,
session.sessionId.toString(),
" => Let's retry"
);
if (!client._secureChannel) {
privateSession._reconnecting.reconnecting = false;
return callback(new Error("No secure channel"));
}
if (!session.hasBeenClosed()) {
const delay = 2000;
errorLog(chalk.red(`... will retry session repair... in ${delay} ms`));
Expand All @@ -523,34 +517,31 @@ export function repair_client_session(client: IClientBase, session: ClientSessio
}, delay);
return;
} else {
privateSession._reconnecting.reconnecting = false;
errorLog(chalk.red("session restoration should be interrupted because session has been closed forcefully"));
// session does not need to be repaired anymore
callback();
}
// session does not need to be repaired anymore
callback();
return;
}

privateSession._reconnecting.reconnecting = false;

// istanbul ignore next
doDebug && debugLog(chalk.yellow("session has been restored"), session.sessionId.toString());

session.emit("session_restored");
const otherCallbacks = privateSession._reconnecting.pendingCallbacks;
privateSession._reconnecting.pendingCallbacks = [];

// re-inject element in queue

// istanbul ignore next
doDebug && debugLog(chalk.yellow("re-injecting transaction queue"), transactionQueue.length);

transactionQueue.forEach((e: any) => privateSession.pendingTransactions.push(e));
otherCallbacks.forEach((c: EmptyCallback) => c(err));
callback(err);
});
};
repeatedAction(callback);
repeatedAction((err) => {
privateSession._reconnecting.reconnecting = false;

const otherCallbacks = privateSession._reconnecting.pendingCallbacks;
privateSession._reconnecting.pendingCallbacks = [];
// re-inject element in queue
// istanbul ignore next
doDebug && debugLog(chalk.yellow("re-injecting transaction queue"), transactionQueue.length);
transactionQueue.forEach((e: any) => privateSession.pendingTransactions.push(e));
otherCallbacks.forEach((c: EmptyCallback) => c(err));
callback(err);
});
}

export function repair_client_sessions(client: IClientBase, callback: (err?: Error) => void): void {
Expand All @@ -566,6 +557,9 @@ export function repair_client_sessions(client: IClientBase, callback: (err?: Err
},
(err, allErrors: (undefined | Error | null)[] | undefined) => {
err && errorLog("sessions reactivation completed with err: err ", err ? err.message : "null");
// prettier-ignore
{ const err = _shouldNotContinue3(client); if (err) { return callback(err); } }

return callback(err!);
}
);
Expand Down
2 changes: 1 addition & 1 deletion packages/node-opcua-end2end-test/test/discovery/_helper.ts
Expand Up @@ -79,7 +79,7 @@ export async function createServerThatRegistersItselfToTheDiscoveryServer(
registerServerMethod: RegisterServerMethod.LDS,

certificateFile,
serverCertificateManager
serverCertificateManager,
});
server.discoveryServerEndpointUrl.should.eql(discoveryServerEndpointUrl);

Expand Down
Expand Up @@ -469,7 +469,9 @@ export function t(test: any) {
wait_a_few_seconds,

shutdownServer,
wait_a_few_seconds
wait_a_few_seconds,
wait_a_few_seconds,

],
done
);
Expand Down

0 comments on commit 38caa35

Please sign in to comment.