Skip to content

Commit

Permalink
[FIX] protocol errors (any unknown error from the server) was handled…
Browse files Browse the repository at this point in the history
… by the client with a `close()`. Now while the connection could reset, reconnects are honored.

FIX #421
  • Loading branch information
aricart committed Dec 1, 2022
1 parent 7fa8384 commit 79a74a9
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 28 deletions.
30 changes: 17 additions & 13 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
.then(async (_err?) => {
this.connected = false;
if (!this.isClosed()) {
await this.disconnected(this.transport.closeError);
// if the transport gave an error use that, otherwise
// we may have received a protocol error
await this.disconnected(this.transport.closeError || this.lastError);
return;
}
});
Expand Down Expand Up @@ -444,18 +446,20 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
async processError(m: Uint8Array) {
const s = decode(m);
const err = ProtocolHandler.toError(s);
let isMuxPermissionError = false;
const status: Status = { type: Events.Error, data: err.code };
if (err.permissionContext) {
status.permissionContext = err.permissionContext;
const mux = this.subscriptions.getMux();
isMuxPermissionError = mux?.subject === err.permissionContext.subject;
}
this.subscriptions.handleError(err);
this.muxSubscriptions.handleError(isMuxPermissionError, err);
if (isMuxPermissionError) {
// remove the permission - enable it to be recreated
this.subscriptions.setMux(null);
if (err.isPermissionError()) {
let isMuxPermissionError = false;
if (err.permissionContext) {
status.permissionContext = err.permissionContext;
const mux = this.subscriptions.getMux();
isMuxPermissionError = mux?.subject === err.permissionContext.subject;
}
this.subscriptions.handleError(err);
this.muxSubscriptions.handleError(isMuxPermissionError, err);
if (isMuxPermissionError) {
// remove the permission - enable it to be recreated
this.subscriptions.setMux(null);
}
}
this.dispatchStatus(status);
await this.handleError(err);
Expand All @@ -466,7 +470,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
this.handleAuthError(err);
}
if (err.isProtocolError()) {
await this._close(err);
this.lastError = err;
}
if (!err.isPermissionError()) {
this.lastError = err;
Expand Down
21 changes: 7 additions & 14 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -512,23 +512,16 @@ Deno.test("basics - request requires a subject", async () => {
});

Deno.test("basics - closed returns error", async () => {
const lock = Lock(1);
const cs = new TestServer(false, (ca: Connection) => {
setTimeout(async () => {
await ca.write(new TextEncoder().encode("-ERR 'here'\r\n"));
}, 500);
});

const nc = await connect(
{ servers: `127.0.0.1:${cs.getPort()}` },
);
const { ns, nc } = await setup({}, { reconnect: false });
setTimeout(() => {
(nc as NatsConnectionImpl).protocol.sendCommand("Y\r\n");
}, 1000);
await nc.closed()
.then((v) => {
assertEquals((v as Error).message, "'here'");
lock.unlock();
assertEquals((v as NatsError).code, ErrorCode.ProtocolError);
});
assertEquals(nc.isClosed(), true);
await cs.stop();

await cleanup(ns, nc);
});

Deno.test("basics - subscription with timeout", async () => {
Expand Down
129 changes: 128 additions & 1 deletion tests/reconnect_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import {
} from "../src/mod.ts";
import { assertErrorCode, Lock, NatsServer } from "./helpers/mod.ts";
import {
DataBuffer,
DebugEvents,
deferred,
delay,
NatsConnectionImpl,
} from "../nats-base-client/internal_mod.ts";
import { setup } from "./jstest_util.ts";
import { cleanup, setup } from "./jstest_util.ts";
import { deadline } from "https://deno.land/std@0.114.0/async/deadline.ts";
import Conn = Deno.Conn;

Deno.test("reconnect - should receive when some servers are invalid", async () => {
const lock = Lock(1);
Expand Down Expand Up @@ -332,3 +334,128 @@ Deno.test("reconnect - close stops reconnects", async () => {
fail(err);
});
});

Deno.test("reconnect - stale connections don't close", async () => {
const listener = Deno.listen({ port: 0, transport: "tcp" });
const { port } = listener.addr as Deno.NetAddr;
const connections: Conn[] = [];

const TE = new TextEncoder();

const INFO = TE.encode(
"INFO " + JSON.stringify({
server_id: "TEST",
version: "0.0.0",
host: "127.0.0.1",
port: port,
}) + "\r\n",
);

const PING = { re: /^PING\r\n/im, out: TE.encode("PONG\r\n") };
const CONNECT = { re: /^CONNECT\s+([^\r\n]+)\r\n/im, out: TE.encode("") };
const CMDS = [PING, CONNECT];

const startReading = (conn: Conn) => {
const buf = new Uint8Array(1024 * 8);
let inbound = new DataBuffer();
(async () => {
while (true) {
const count = await conn.read(buf);
if (count === null) {
break;
}
if (count) {
inbound.fill(DataBuffer.concat(buf.subarray(0, count)));
const lines = DataBuffer.toAscii(inbound.peek());
for (let i = 0; i < CMDS.length; i++) {
const m = CMDS[i].re.exec(lines);
if (m) {
const len = m[0].length;
if (len) {
inbound.drain(len);
await conn.write(CMDS[i].out);
}
if (i === 0) {
// sent the PONG we are done.
return;
}
}
}
}
}
})();
};

(async () => {
for await (const conn of listener) {
try {
connections.push(conn);
await conn.write(INFO);
startReading(conn);
} catch (_err) {
console.log(_err);
return;
}
}
})().then();

const nc = await connect({
port,
maxReconnectAttempts: -1,
pingInterval: 2000,
reconnectTimeWait: 500,
ignoreAuthErrorAbort: true,
});

let stales = 0;
(async () => {
for await (const s of nc.status()) {
console.log(s);
if (s.type === DebugEvents.StaleConnection) {
stales++;
if (stales === 3) {
await nc.close();
}
}
}
})().then();

await nc.closed();
connections.forEach((c) => {
return c.close();
});
listener.close();
assert(stales >= 3, `stales ${stales}`);
});

Deno.test("reconnect - protocol errors don't close client", async () => {
const { ns, nc } = await setup({}, {
maxReconnectAttempts: -1,
reconnectTimeWait: 500,
});
const nci = nc as NatsConnectionImpl;

let reconnects = 0;
(async () => {
for await (const s of nc.status()) {
if (s.type === Events.Reconnect) {
reconnects++;
if (reconnects < 3) {
setTimeout(() => {
nci.protocol.sendCommand(`X\r\n`);
});
}
if (reconnects === 3) {
await nc.close();
}
}
}
})().then();

nci.protocol.sendCommand(`X\r\n`);

const err = await nc.closed();
assertEquals(err, undefined);

await cleanup(ns, nc);
});

0 comments on commit 79a74a9

Please sign in to comment.