Skip to content

Commit

Permalink
improve reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
erossignon committed Apr 6, 2024
1 parent 2590d1a commit fec66f9
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 256 deletions.
175 changes: 90 additions & 85 deletions packages/node-opcua-client/source/private/client_base_impl.ts

Large diffs are not rendered by default.

16 changes: 0 additions & 16 deletions packages/node-opcua-client/source/private/client_session_impl.ts
Expand Up @@ -1492,28 +1492,12 @@ export class ClientSessionImpl extends EventEmitter implements ClientSession {
return this._client !== null && this._client._secureChannel !== null && this._client._secureChannel.isOpened();
}

private requestReconnection() {
if (this._client) {
this._client.requestReconnection();
assert(this._client.isReconnecting === true, "expecting client to be reconnecting now");
}
}

public performMessageTransaction(request: Request, callback: (err: Error | null, response?: Response) => void): void {
if (!this._client) {
// session may have been closed by user ... but is still in used !!
return callback(new Error("Session has been closed and should not be used to perform a transaction anymore"));
}

// if (!this.isChannelValid()) {
// // the secure channel is broken, may be the server has crashed or the network cable has been disconnected
// // for a long time
// // we may need to queue this transaction, as a secure token may be being reprocessed
// errorLog(chalk.bgWhite.red("!!! Performing transaction on invalid channel !!! ", request.schema.name));
// // this.requestReconnection();
// return callback(new Error("!!! Performing transaction on invalid channel with " + request.schema.name + ": starting reconnection process"));
// }

this._reconnecting.pendingTransactions = this._reconnecting.pendingTransactions || [];
this._reconnecting.pendingTransactionsCount = this._reconnecting.pendingTransactionsCount || 0;

Expand Down
2 changes: 0 additions & 2 deletions packages/node-opcua-client/source/private/i_private_client.ts
Expand Up @@ -27,7 +27,5 @@ export interface IClientBase {

getTransportSettings(): IBasicTransportSettings;

requestReconnection(): void;

isUnusable(): boolean;
}
@@ -0,0 +1,17 @@
import { warn } from "console";
import { ClientBaseImpl } from "../client_base_impl";

export function waitUntilReconnectionIsCanceled(client: ClientBaseImpl, callback: () => void) {
const interval = 100;
const maxIntervalCount = 100;
let intervalCount = 0;
const timer = setInterval(() => {
if (!client.isReconnecting || ++intervalCount > maxIntervalCount) {
clearInterval(timer);
if (intervalCount > maxIntervalCount) {
warn("waitUntilReconnectionIsCanceled: timeout");
}
callback();
}
}, interval);
}
Expand Up @@ -481,12 +481,17 @@ export function repair_client_session(client: IClientBase, session: ClientSessio
doDebug && debugLog("Aborting reactivation of old session because user requested session to be close");
return callback();
}

doDebug && debugLog(chalk.yellow("Starting client session repair"));

const privateSession = session as any as Reconnectable;
privateSession._reconnecting = privateSession._reconnecting || { reconnecting: false, pendingCallbacks: [] };

if (session.hasBeenClosed()) {

privateSession._reconnecting.reconnecting = false;
doDebug && debugLog("Aborting reactivation of old session because session has been closed");
return callback();
}
if (privateSession._reconnecting.reconnecting) {
doDebug && debugLog(chalk.bgCyan("Reconnecting already happening for session"), session.sessionId.toString());
privateSession._reconnecting.pendingCallbacks.push(callback);
Expand Down
2 changes: 1 addition & 1 deletion packages/node-opcua-end2end-test/test/discovery/_helper.ts
Expand Up @@ -79,7 +79,7 @@ export async function createServerThatRegistersItselfToTheDiscoveryServer(
registerServerMethod: RegisterServerMethod.LDS,

certificateFile,
serverCertificateManager
serverCertificateManager,
});
server.discoveryServerEndpointUrl.should.eql(discoveryServerEndpointUrl);

Expand Down
Expand Up @@ -469,7 +469,9 @@ export function t(test: any) {
wait_a_few_seconds,

shutdownServer,
wait_a_few_seconds
wait_a_few_seconds,
wait_a_few_seconds,

],
done
);
Expand Down
Expand Up @@ -33,13 +33,17 @@ describe("Testing client.isReconnecting flag behavior", function (this: Mocha.Te
await server.shutdown();
});

it("client.isReconnection should be true when client emits reconnection event", async () => {
it("client.isReconnecting should be true when client emits reconnection event", async () => {
client = OPCUAClient.create({});

let isReconnectingValueWhenConnectionLostEventIsEmitted = false;
client.on("connection_lost", () => {
isReconnectingValueWhenConnectionLostEventIsEmitted = client.isReconnecting;
});
let isReconnectingValueWhenStartReconnectionIsEmitted = false;
client.on("start_reconnection", () => {
isReconnectingValueWhenStartReconnectionIsEmitted = client.isReconnecting;
});

let isReconnectingValueWhenReconnectionEventIsEmitted = false;
client.on("connection_reestablished", () => {
Expand All @@ -51,7 +55,7 @@ describe("Testing client.isReconnecting flag behavior", function (this: Mocha.Te

await new Promise((resolve) => {
server.shutdown();
client.once("connection_lost", resolve);
client.once("start_reconnection", resolve);
});
client.isReconnecting.should.eql(true);

Expand All @@ -67,6 +71,7 @@ describe("Testing client.isReconnecting flag behavior", function (this: Mocha.Te
await client.disconnect();

isReconnectingValueWhenConnectionLostEventIsEmitted.should.be.eql(true);
isReconnectingValueWhenStartReconnectionIsEmitted.should.be.eql(true);
isReconnectingValueWhenReconnectionEventIsEmitted.should.be.eql(false);
});
});

0 comments on commit fec66f9

Please sign in to comment.