Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX 265 #267

Merged
merged 1 commit into from
Feb 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,18 +415,16 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
async processError(m: Uint8Array) {
const s = decode(m);
const err = ProtocolHandler.toError(s);
const handled = this.subscriptions.handleError(err);
if (!handled) {
this.dispatchStatus({ type: Events.Error, data: err.code });
}
this.subscriptions.handleError(err);
this.dispatchStatus({ type: Events.Error, data: err.code });
await this.handleError(err);
}

async handleError(err: NatsError) {
if (err.isAuthError()) {
this.handleAuthError(err);
}
if (err.isPermissionError() || err.isProtocolError()) {
if (err.isProtocolError()) {
await this._close(err);
}
this.lastError = err;
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
// cleanup - they used break or return from the iterator
// make sure we clean up, if they didn't call unsub
this.iterClosed.then(() => {
this.closed.resolve();
this.unsubscribe();
});
}
Expand Down
168 changes: 117 additions & 51 deletions tests/auth_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 The NATS Authors
* Copyright 2018-2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -15,22 +15,23 @@

import {
assertEquals,
assertRejects,
fail,
} from "https://deno.land/std@0.125.0/testing/asserts.ts";
import {
connect,
credsAuthenticator,
ErrorCode,
Events,
jwtAuthenticator,
nkeyAuthenticator,
Status,
StringCodec,
} from "../src/mod.ts";
import { assertErrorCode, Lock, NatsServer } from "./helpers/mod.ts";
import { assertErrorCode, NatsServer } from "./helpers/mod.ts";
import { deferred, nkeys } from "../nats-base-client/internal_mod.ts";
import { NKeyAuth } from "../nats-base-client/authenticator.ts";
import { assert } from "../nats-base-client/denobuffer.ts";
import { cleanup, setup } from "./jstest_util.ts";

const conf = {
authorization: {
Expand Down Expand Up @@ -100,77 +101,142 @@ Deno.test("auth - un/pw", async () => {
await ns.stop();
});

Deno.test("auth - sub permissions", async () => {
const ns = await NatsServer.start(conf);
const lock = Lock();
const nc = await connect(
{ port: ns.port, user: "derek", pass: "foobar" },
Deno.test("auth - sub no permissions keeps connection", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { subscribe: "foo" },
}],
},
}, { user: "a", pass: "a", reconnect: false });

const errStatus = deferred<Status>();
const _ = (async () => {
for await (const s of nc.status()) {
errStatus.resolve(s);
}
})();

const cbErr = deferred<Error | null>();
const sub = nc.subscribe("bar", {
callback: (err, msg) => {
cbErr.resolve(err);
},
});

const v = await Promise.all([errStatus, cbErr, sub.closed]);
assertEquals(v[0].data, ErrorCode.PermissionsViolation);
assertEquals(
v[1]?.message,
"'Permissions Violation for Subscription to \"bar\"'",
);
assertEquals(nc.isClosed(), false);

const status = nc.status();
await cleanup(ns, nc);
});

let notifications = 0;
(async () => {
for await (const _s of status) {
notifications++;
}
})().then();
Deno.test("auth - sub iterator no permissions keeps connection", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { subscribe: "foo" },
}],
},
}, { user: "a", pass: "a", reconnect: false });

const done = nc.closed();
const errStatus = deferred<Status>();
const _ = (async () => {
for await (const s of nc.status()) {
errStatus.resolve(s);
}
})();

const sub = nc.subscribe("foo");
const iterErr = deferred<Error | null>();
const sub = nc.subscribe("bar");
(async () => {
for await (const _m of sub) {
// ignoring
for await (const m of sub) {
}
})().catch((err) => {
lock.unlock();
assertErrorCode(err as Error, ErrorCode.PermissionsViolation);
notifications++;
iterErr.resolve(err);
});

nc.publish("foo");
await nc.flush();

await Promise.all([lock, done]);
assertEquals(notifications, 1);
await ns.stop();
const v = await Promise.all([errStatus, iterErr, sub.closed]);
assertEquals(v[0].data, ErrorCode.PermissionsViolation);
assertEquals(
v[1]?.message,
"'Permissions Violation for Subscription to \"bar\"'",
);
assertEquals(sub.isClosed(), true);
assertEquals(nc.isClosed(), false);

await cleanup(ns, nc);
});

Deno.test("auth - pub perm", async () => {
const ns = await NatsServer.start(conf);
const lock = Lock();
const nc = await connect(
{ port: ns.port, user: "derek", pass: "foobar" },
);
Deno.test("auth - pub permissions keep connection", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { publish: "foo" },
}],
},
}, { user: "a", pass: "a", reconnect: false });

const status = nc.status();
const errStatus = deferred<Status>();
(async () => {
for await (const s of status) {
const _ = (async () => {
for await (const s of nc.status()) {
errStatus.resolve(s);
}
})().then();
})();

nc.closed().then((err) => {
assertErrorCode(err as Error, ErrorCode.PermissionsViolation);
lock.unlock();
});
nc.publish("bar");

const sub = nc.subscribe("bar");
const iter = (async () => {
for await (const _m of sub) {
fail("should not have been called");
const v = await errStatus;
assertEquals(v.data, ErrorCode.PermissionsViolation);
assertEquals(nc.isClosed(), false);

await cleanup(ns, nc);
});

Deno.test("auth - req permissions keep connection", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { publish: "foo" },
}],
},
}, { user: "a", pass: "a", reconnect: false });

const errStatus = deferred<Status>();
const _ = (async () => {
for await (const s of nc.status()) {
console.log(s);
errStatus.resolve(s);
}
})();

nc.publish("bar");
await assertRejects(
async () => {
await nc.request("bar");
},
Error,
ErrorCode.Timeout,
);

await lock;
await iter;
const es = await errStatus;
assertEquals(es.type, Events.Error);
assertEquals(es.data, ErrorCode.PermissionsViolation);
await ns.stop();
const v = await errStatus;
assertEquals(v.data, ErrorCode.PermissionsViolation);
assertEquals(nc.isClosed(), false);

await cleanup(ns, nc);
});

Deno.test("auth - user and token is rejected", () => {
Expand Down
4 changes: 3 additions & 1 deletion tests/heartbeats_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 The NATS Authors
* Copyright 2020-2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -103,4 +103,6 @@ Deno.test("heartbeat - recovers from missed", async () => {
hb.cancel();
assertEquals(hb.timer, undefined);
assert(status.length >= 6, `${status.length} >= 6`);
// some resources in the test runner are not always cleaned unless we wait a bit
await delay(500);
});
7 changes: 2 additions & 5 deletions tests/iterators_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 The NATS Authors
* Copyright 2020-2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { connect, createInbox, ErrorCode, NatsError } from "../src/mod.ts";
import { connect, createInbox, ErrorCode } from "../src/mod.ts";
import { assertEquals } from "https://deno.land/std@0.125.0/testing/asserts.ts";
import { assertErrorCode, Lock, NatsServer } from "./helpers/mod.ts";
import { assert } from "../nats-base-client/denobuffer.ts";
Expand Down Expand Up @@ -85,9 +85,6 @@ Deno.test("iterators - permission error breaks and closes", async () => {
});

await lock;
await nc.closed().then((err) => {
assertErrorCode(err as NatsError, ErrorCode.PermissionsViolation);
});
await cleanup(ns, nc);
});

Expand Down