Skip to content

Commit

Permalink
[CHANGE] mrequest signature changes possible errors at core NATS leve…
Browse files Browse the repository at this point in the history
…l can only be sub/no responder type errors which should fail the request
  • Loading branch information
aricart committed Dec 12, 2022
1 parent b54cdd1 commit 98ee6cc
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 55 deletions.
24 changes: 11 additions & 13 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export class NatsConnectionImpl implements NatsConnection {
subject: string,
data: Uint8Array = Empty,
opts: Partial<RequestManyOptions> = { maxWait: 1000, maxMessages: -1 },
): Promise<QueuedIterator<Msg | Error>> {
): Promise<QueuedIterator<Msg>> {
try {
this._check(subject, true, true);
} catch (err) {
Expand All @@ -175,23 +175,19 @@ export class NatsConnectionImpl implements NatsConnection {
}

// the iterator for user results
const qi = new QueuedIteratorImpl<Msg | Error>();
function stop() {
const qi = new QueuedIteratorImpl<Msg>();
function stop(err?: Error) {
//@ts-ignore: stop function
qi.push(() => {
qi.stop();
qi.stop(err);
});
}

// callback for the subscription or the mux handler
// simply pushes errors and messages into the iterator
function callback(err: Error | null, msg: Msg | null) {
if (err || msg === null) {
// FIXME: the stop function should not require commenting
if (err !== null) {
qi.push(err);
}
stop();
stop(err === null ? undefined : err);
} else {
qi.push(msg);
}
Expand All @@ -208,7 +204,7 @@ export class NatsConnectionImpl implements NatsConnection {
callback: (err, msg) => {
// we only expect runtime errors or a no responders
if (
msg.data.length === 0 &&
msg?.data?.length === 0 &&
msg?.headers?.status === ErrorCode.NoResponders
) {
err = NatsError.errorForCode(ErrorCode.NoResponders);
Expand Down Expand Up @@ -248,13 +244,15 @@ export class NatsConnectionImpl implements NatsConnection {
stop();
})
.catch((err: Error) => {
qi.push(err);
stop();
qi.stop(err);
});

const cancel = (err?: Error) => {
if (err) {
qi.push(err);
//@ts-ignore: error
qi.push(() => {
throw err;
});
}
clearTimers();
sub.drain()
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export interface NatsConnection {
subject: string,
data: Uint8Array,
opts: Partial<RequestManyOptions>,
): Promise<QueuedIterator<Msg | Error>>;
): Promise<QueuedIterator<Msg>>;

/**
* Returns a Promise that resolves when the client receives a reply from
Expand Down
30 changes: 1 addition & 29 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,7 @@ import {
NatsError,
StringCodec,
} from "../src/mod.ts";
import {
assertErrorCode,
Connection,
disabled,
Lock,
NatsServer,
TestServer,
} from "./helpers/mod.ts";
import { assertErrorCode, disabled, Lock, NatsServer } from "./helpers/mod.ts";
import {
deferred,
delay,
Expand Down Expand Up @@ -1119,27 +1112,6 @@ Deno.test("basics - request many waits for timer late response", async () => {
await cleanup(ns, nc);
});

Deno.test("basics - request many stops on error", async () => {
const { ns, nc } = await setup({});
const nci = nc as NatsConnectionImpl;

const subj = createInbox();

const iter = await nci.requestMany(subj, Empty, {
strategy: RequestStrategy.Timer,
maxWait: 2000,
});
const d = deferred<Error>();
for await (const mer of iter) {
if (mer instanceof Error) {
d.resolve(mer);
}
}
const err = await d;
assertErrorCode(err, ErrorCode.NoResponders);
await cleanup(ns, nc);
});

Deno.test("basics - server version", async () => {
const { ns, nc } = await setup({});
const nci = nc as NatsConnectionImpl;
Expand Down
171 changes: 159 additions & 12 deletions tests/mrequest_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
import { cleanup, setup } from "./jstest_util.ts";
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
import { createInbox } from "../nats-base-client/protocol.ts";
import { Empty, RequestStrategy } from "../nats-base-client/types.ts";
import { Empty, Events, RequestStrategy } from "../nats-base-client/types.ts";

import {
assert,
assertEquals,
fail,
} from "https://deno.land/std@0.138.0/testing/asserts.ts";
import { StringCodec } from "../nats-base-client/codec.ts";
import { assertErrorCode } from "./helpers/mod.ts";
import { deferred, ErrorCode } from "../nats-base-client/mod.ts";
import { delay } from "../nats-base-client/util.ts";
import { deferred, delay } from "../nats-base-client/util.ts";
import { assertRejects } from "https://deno.land/std@0.125.0/testing/asserts.ts";

async function requestManyCount(noMux = false): Promise<void> {
const { ns, nc } = await setup({});
Expand Down Expand Up @@ -249,14 +248,15 @@ async function requestManyStopsOnError(noMux = false): Promise<void> {
maxWait: 2000,
noMux,
});
const d = deferred<Error>();
for await (const mer of iter) {
if (mer instanceof Error) {
d.resolve(mer);
}
}
const err = await d;
assertErrorCode(err, ErrorCode.NoResponders);
await assertRejects(
async () => {
for await (const _mer of iter) {
// do nothing
}
},
Error,
"503",
);
await cleanup(ns, nc);
}

Expand All @@ -267,3 +267,150 @@ Deno.test("mreq - request many stops on error", async () => {
Deno.test("mreq - request many stops on error noMux", async () => {
await requestManyStopsOnError(true);
});

Deno.test("mreq - pub permission error", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { publish: { deny: "q" } },
}],
},
}, { user: "a", pass: "a" });

const d = deferred();
(async () => {
for await (const s of nc.status()) {
if (s.type === Events.Error, s.permissionContext?.subject === "q") {
d.resolve();
}
}
})().then();

const iter = await nc.requestMany("q", Empty, {
strategy: RequestStrategy.Count,
maxMessages: 3,
maxWait: 2000,
});

await assertRejects(
async () => {
for await (const _m of iter) {
// nothing
}
},
Error,
"Permissions Violation for Publish",
);
await d;
await cleanup(ns, nc);
});

Deno.test("mreq - sub permission error", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { subscribe: { deny: "_INBOX.>" } },
}],
},
}, { user: "a", pass: "a" });

nc.subscribe("q", {
callback: (_err, msg) => {
msg?.respond();
},
});

const d = deferred();
(async () => {
for await (const s of nc.status()) {
if (
s.type === Events.Error,
s.permissionContext?.operation === "subscription"
) {
d.resolve();
}
}
})().then();

await assertRejects(
async () => {
const iter = await nc.requestMany("q", Empty, {
strategy: RequestStrategy.Count,
maxMessages: 3,
maxWait: 2000,
noMux: true,
});
for await (const _m of iter) {
// nothing;
}
},
Error,
"Permissions Violation for Subscription",
);
await d;
await cleanup(ns, nc);
});

Deno.test("mreq - lost sub permission", async () => {
const { ns, nc } = await setup({
authorization: {
users: [{
user: "a",
password: "a",
}],
},
}, { user: "a", pass: "a" });

let reloaded = false;
nc.subscribe("q", {
callback: (_err, msg) => {
msg?.respond();
if (!reloaded) {
reloaded = true;
ns.reload({
authorization: {
users: [{
user: "a",
password: "a",
permissions: { subscribe: { deny: "_INBOX.>" } },
}],
},
});
}
},
});

const d = deferred();
(async () => {
for await (const s of nc.status()) {
if (
s.type === Events.Error,
s.permissionContext?.operation === "subscription"
) {
d.resolve();
}
}
})().then();

await assertRejects(
async () => {
const iter = await nc.requestMany("q", Empty, {
strategy: RequestStrategy.Count,
maxMessages: 3,
maxWait: 5000,
noMux: true,
});
for await (const _m of iter) {
// nothing
}
},
Error,
"Permissions Violation for Subscription",
);
await d;
await cleanup(ns, nc);
});

0 comments on commit 98ee6cc

Please sign in to comment.