Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [EXPERIMENTAL] exposed a mechanism to force a reconnect. #632

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions README.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export enum DebugEvents {
Reconnecting = "reconnecting",
PingTimer = "pingTimer",
StaleConnection = "staleConnection",
ClientInitiatedReconnect = "client initiated reconnect",
}

export enum ErrorCode {
Expand Down Expand Up @@ -657,7 +658,31 @@ export interface NatsConnection {
*/
rtt(): Promise<number>;

/**
* Returns a {@link ServicesAPI} which allows you to build services
* using NATS.
*/
services: ServicesAPI;

/**
* Use of this API is experimental, and it is subject to be removed.
*
* reconnect() enables a client to force a reconnect. A reconnect will disconnect
* the client, and possibly initiate a reconnect to the cluster. Note that all
* reconnect caveats apply:
*
* - If the reconnection policy given to the client doesn't allow reconnects, the
* connection will close.
*
* - Messages that are inbound or outbound could be lost.
*
* - All requests that are in flight will be rejected.
*
* Note that the returned promise will reject if the client is already closed, or if
* it is in the process of draining. If the client is currently disconnected,
* this API has no effect, as the client is already attempting to reconnect.
*/
reconnect(): Promise<void>;
}

/**
Expand Down
14 changes: 14 additions & 0 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,20 @@ export class NatsConnectionImpl implements NatsConnection {
}
return this._services;
}

reconnect(): Promise<void> {
if (this.isClosed()) {
return Promise.reject(
NatsError.errorForCode(ErrorCode.ConnectionClosed),
);
}
if (this.isDraining()) {
return Promise.reject(
NatsError.errorForCode(ErrorCode.ConnectionDraining),
);
}
return this.protocol.reconnect();
}
}

export class ServicesFactory implements ServicesAPI {
Expand Down
11 changes: 11 additions & 0 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,17 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
this.transport.disconnect();
}

public reconnect(): Promise<void> {
if (this.connected) {
this.dispatchStatus({
type: DebugEvents.ClientInitiatedReconnect,
data: "",
});
this.transport.disconnect();
}
return Promise.resolve();
}

async disconnected(err?: Error): Promise<void> {
this.dispatchStatus(
{
Expand Down
7 changes: 4 additions & 3 deletions tests/authenticator_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from "https://raw.githubusercontent.com/nats-io/jwt.js/main/src/jwt.ts";
import { assertBetween } from "./helpers/mod.ts";
import { deadline, delay } from "../nats-base-client/util.ts";
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";

function disconnectReconnect(nc: NatsConnection): Promise<void> {
const done = deferred<void>();
Expand Down Expand Up @@ -67,17 +68,17 @@ async function testAuthenticatorFn(
return fn(nonce);
};
conf = Object.assign({}, conf, { debug });
let { ns, nc } = await setup(conf, {
const { ns, nc } = await setup(conf, {
authenticator,
});

const cycle = disconnectReconnect(nc);

await delay(2000);
called = 0;
await ns.stop();
const nci = nc as NatsConnectionImpl;
nci.reconnect();
await delay(1000);
ns = await deadline(ns.restart(), 4000);
await deadline(cycle, 4000);
assertBetween(called, 1, 10);
await nc.flush();
Expand Down
4 changes: 4 additions & 0 deletions tests/reconnect_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ Deno.test("reconnect - stale connections don't close", async () => {
pingInterval: 2000,
reconnectTimeWait: 500,
ignoreAuthErrorAbort: true,
// need a timeout, otherwise the default is 20s and we leak resources
timeout: 2000,
});

let stales = 0;
Expand All @@ -427,6 +429,8 @@ Deno.test("reconnect - stale connections don't close", async () => {
return c.close();
});
assert(stales >= 3, `stales ${stales}`);
// there could be a redial timer waiting here for 2s
await delay(2000);
});

Deno.test("reconnect - protocol errors don't close client", async () => {
Expand Down
Loading