Skip to content

Commit

Permalink
[TEST] Added tests for #580
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Aug 28, 2023
1 parent 5cab8ac commit 1d168e3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
29 changes: 29 additions & 0 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { assertRejects } from "https://deno.land/std@0.190.0/testing/asserts.ts"
import {
assertEquals,
assertExists,
assertStringIncludes,
} from "https://deno.land/std@0.75.0/testing/asserts.ts";
import {
deferred,
Expand Down Expand Up @@ -1022,3 +1023,31 @@ Deno.test("consumers - next listener leaks", async () => {

await cleanup(ns, nc);
});

Deno.test("consumers - inboxPrefix is respected", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), { inboxPrefix: "x" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();

await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

const consumer = await js.consumers.get("messages", "c");
const iter = await consumer.consume() as PullConsumerMessagesImpl;
const done = (async () => {
for await (const m of iter) {
// nothing
}
})().catch();
assertStringIncludes(iter.inbox, "x.");
iter.stop();
await done;
await cleanup(ns, nc);
});
29 changes: 27 additions & 2 deletions jetstream/tests/consumersordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import {
assertExists,
assertRejects,
} from "https://deno.land/std@0.190.0/testing/asserts.ts";
import { DeliverPolicy, JsMsg } from "../mod.ts";
import { OrderedPullConsumerImpl } from "../consumer.ts";
import { AckPolicy, DeliverPolicy, JsMsg, nanos } from "../mod.ts";
import {
OrderedConsumerMessages,
OrderedPullConsumerImpl,
PullConsumerMessagesImpl,
} from "../consumer.ts";
import { deferred } from "../../nats-base-client/mod.ts";
import {
cleanup,
Expand All @@ -29,6 +33,7 @@ import {
setup,
} from "../../tests/helpers/mod.ts";
import { deadline, delay } from "../../nats-base-client/util.ts";
import { assertStringIncludes } from "https://deno.land/std@0.75.0/testing/asserts.ts";

Deno.test("ordered - get", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -736,3 +741,23 @@ Deno.test("ordered - mem", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered - inboxPrefix is respected", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), { inboxPrefix: "x" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();

const consumer = await js.consumers.get("messages");
const iter = await consumer.consume() as OrderedConsumerMessages;
const done = (async () => {
for await (const m of iter) {
// nothing
}
})().catch();
assertStringIncludes(iter.src.inbox, "x.");
iter.stop();
await done;
await cleanup(ns, nc);
});

0 comments on commit 1d168e3

Please sign in to comment.