From 98ee6cc468b78100877b4d7dff7eef4c9af75caf Mon Sep 17 00:00:00 2001 From: aricart Date: Mon, 12 Dec 2022 14:08:46 -0400 Subject: [PATCH] [CHANGE] mrequest signature changes possible errors at core NATS level can only be sub/no responder type errors which should fail the request --- nats-base-client/nats.ts | 24 +++--- nats-base-client/types.ts | 2 +- tests/basics_test.ts | 30 +------ tests/mrequest_test.ts | 171 +++++++++++++++++++++++++++++++++++--- 4 files changed, 172 insertions(+), 55 deletions(-) diff --git a/nats-base-client/nats.ts b/nats-base-client/nats.ts index e684a070..a27c06bb 100644 --- a/nats-base-client/nats.ts +++ b/nats-base-client/nats.ts @@ -161,7 +161,7 @@ export class NatsConnectionImpl implements NatsConnection { subject: string, data: Uint8Array = Empty, opts: Partial = { maxWait: 1000, maxMessages: -1 }, - ): Promise> { + ): Promise> { try { this._check(subject, true, true); } catch (err) { @@ -175,11 +175,11 @@ export class NatsConnectionImpl implements NatsConnection { } // the iterator for user results - const qi = new QueuedIteratorImpl(); - function stop() { + const qi = new QueuedIteratorImpl(); + function stop(err?: Error) { //@ts-ignore: stop function qi.push(() => { - qi.stop(); + qi.stop(err); }); } @@ -187,11 +187,7 @@ export class NatsConnectionImpl implements NatsConnection { // simply pushes errors and messages into the iterator function callback(err: Error | null, msg: Msg | null) { if (err || msg === null) { - // FIXME: the stop function should not require commenting - if (err !== null) { - qi.push(err); - } - stop(); + stop(err === null ? undefined : err); } else { qi.push(msg); } @@ -208,7 +204,7 @@ export class NatsConnectionImpl implements NatsConnection { callback: (err, msg) => { // we only expect runtime errors or a no responders if ( - msg.data.length === 0 && + msg?.data?.length === 0 && msg?.headers?.status === ErrorCode.NoResponders ) { err = NatsError.errorForCode(ErrorCode.NoResponders); @@ -248,13 +244,15 @@ export class NatsConnectionImpl implements NatsConnection { stop(); }) .catch((err: Error) => { - qi.push(err); - stop(); + qi.stop(err); }); const cancel = (err?: Error) => { if (err) { - qi.push(err); + //@ts-ignore: error + qi.push(() => { + throw err; + }); } clearTimers(); sub.drain() diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index b4525047..5345db6d 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -142,7 +142,7 @@ export interface NatsConnection { subject: string, data: Uint8Array, opts: Partial, - ): Promise>; + ): Promise>; /** * Returns a Promise that resolves when the client receives a reply from diff --git a/tests/basics_test.ts b/tests/basics_test.ts index 312c808d..53bb80c0 100644 --- a/tests/basics_test.ts +++ b/tests/basics_test.ts @@ -33,14 +33,7 @@ import { NatsError, StringCodec, } from "../src/mod.ts"; -import { - assertErrorCode, - Connection, - disabled, - Lock, - NatsServer, - TestServer, -} from "./helpers/mod.ts"; +import { assertErrorCode, disabled, Lock, NatsServer } from "./helpers/mod.ts"; import { deferred, delay, @@ -1119,27 +1112,6 @@ Deno.test("basics - request many waits for timer late response", async () => { await cleanup(ns, nc); }); -Deno.test("basics - request many stops on error", async () => { - const { ns, nc } = await setup({}); - const nci = nc as NatsConnectionImpl; - - const subj = createInbox(); - - const iter = await nci.requestMany(subj, Empty, { - strategy: RequestStrategy.Timer, - maxWait: 2000, - }); - const d = deferred(); - for await (const mer of iter) { - if (mer instanceof Error) { - d.resolve(mer); - } - } - const err = await d; - assertErrorCode(err, ErrorCode.NoResponders); - await cleanup(ns, nc); -}); - Deno.test("basics - server version", async () => { const { ns, nc } = await setup({}); const nci = nc as NatsConnectionImpl; diff --git a/tests/mrequest_test.ts b/tests/mrequest_test.ts index f09236a5..bfd40124 100644 --- a/tests/mrequest_test.ts +++ b/tests/mrequest_test.ts @@ -15,7 +15,7 @@ import { cleanup, setup } from "./jstest_util.ts"; import { NatsConnectionImpl } from "../nats-base-client/nats.ts"; import { createInbox } from "../nats-base-client/protocol.ts"; -import { Empty, RequestStrategy } from "../nats-base-client/types.ts"; +import { Empty, Events, RequestStrategy } from "../nats-base-client/types.ts"; import { assert, @@ -23,9 +23,8 @@ import { fail, } from "https://deno.land/std@0.138.0/testing/asserts.ts"; import { StringCodec } from "../nats-base-client/codec.ts"; -import { assertErrorCode } from "./helpers/mod.ts"; -import { deferred, ErrorCode } from "../nats-base-client/mod.ts"; -import { delay } from "../nats-base-client/util.ts"; +import { deferred, delay } from "../nats-base-client/util.ts"; +import { assertRejects } from "https://deno.land/std@0.125.0/testing/asserts.ts"; async function requestManyCount(noMux = false): Promise { const { ns, nc } = await setup({}); @@ -249,14 +248,15 @@ async function requestManyStopsOnError(noMux = false): Promise { maxWait: 2000, noMux, }); - const d = deferred(); - for await (const mer of iter) { - if (mer instanceof Error) { - d.resolve(mer); - } - } - const err = await d; - assertErrorCode(err, ErrorCode.NoResponders); + await assertRejects( + async () => { + for await (const _mer of iter) { + // do nothing + } + }, + Error, + "503", + ); await cleanup(ns, nc); } @@ -267,3 +267,150 @@ Deno.test("mreq - request many stops on error", async () => { Deno.test("mreq - request many stops on error noMux", async () => { await requestManyStopsOnError(true); }); + +Deno.test("mreq - pub permission error", async () => { + const { ns, nc } = await setup({ + authorization: { + users: [{ + user: "a", + password: "a", + permissions: { publish: { deny: "q" } }, + }], + }, + }, { user: "a", pass: "a" }); + + const d = deferred(); + (async () => { + for await (const s of nc.status()) { + if (s.type === Events.Error, s.permissionContext?.subject === "q") { + d.resolve(); + } + } + })().then(); + + const iter = await nc.requestMany("q", Empty, { + strategy: RequestStrategy.Count, + maxMessages: 3, + maxWait: 2000, + }); + + await assertRejects( + async () => { + for await (const _m of iter) { + // nothing + } + }, + Error, + "Permissions Violation for Publish", + ); + await d; + await cleanup(ns, nc); +}); + +Deno.test("mreq - sub permission error", async () => { + const { ns, nc } = await setup({ + authorization: { + users: [{ + user: "a", + password: "a", + permissions: { subscribe: { deny: "_INBOX.>" } }, + }], + }, + }, { user: "a", pass: "a" }); + + nc.subscribe("q", { + callback: (_err, msg) => { + msg?.respond(); + }, + }); + + const d = deferred(); + (async () => { + for await (const s of nc.status()) { + if ( + s.type === Events.Error, + s.permissionContext?.operation === "subscription" + ) { + d.resolve(); + } + } + })().then(); + + await assertRejects( + async () => { + const iter = await nc.requestMany("q", Empty, { + strategy: RequestStrategy.Count, + maxMessages: 3, + maxWait: 2000, + noMux: true, + }); + for await (const _m of iter) { + // nothing; + } + }, + Error, + "Permissions Violation for Subscription", + ); + await d; + await cleanup(ns, nc); +}); + +Deno.test("mreq - lost sub permission", async () => { + const { ns, nc } = await setup({ + authorization: { + users: [{ + user: "a", + password: "a", + }], + }, + }, { user: "a", pass: "a" }); + + let reloaded = false; + nc.subscribe("q", { + callback: (_err, msg) => { + msg?.respond(); + if (!reloaded) { + reloaded = true; + ns.reload({ + authorization: { + users: [{ + user: "a", + password: "a", + permissions: { subscribe: { deny: "_INBOX.>" } }, + }], + }, + }); + } + }, + }); + + const d = deferred(); + (async () => { + for await (const s of nc.status()) { + if ( + s.type === Events.Error, + s.permissionContext?.operation === "subscription" + ) { + d.resolve(); + } + } + })().then(); + + await assertRejects( + async () => { + const iter = await nc.requestMany("q", Empty, { + strategy: RequestStrategy.Count, + maxMessages: 3, + maxWait: 5000, + noMux: true, + }); + for await (const _m of iter) { + // nothing + } + }, + Error, + "Permissions Violation for Subscription", + ); + await d; + await cleanup(ns, nc); +});