Skip to content

Commit

Permalink
[BUMP] versions, dev NBC
Browse files Browse the repository at this point in the history
[TESTS] added tests for consumer functionality
  • Loading branch information
aricart committed May 19, 2023
1 parent f125ee1 commit f4b1f21
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 2 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nats.ws",
"version": "1.14.0",
"version": "1.15.0-1",
"description": "WebSocket NATS client",
"main": "./cjs/nats.js",
"module": "./esm/nats.js",
Expand Down
2 changes: 1 addition & 1 deletion src/ws_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
render,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/dev/nats-base-client/internal_mod.ts";

const VERSION = "1.14.0";
const VERSION = "1.15.0-1";
const LANG = "nats.ws";

export type WsSocketFactory = (u: string, opts: ConnectionOptions) => Promise<{
Expand Down
174 changes: 174 additions & 0 deletions test/jetstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const { connect, Empty, consumerOpts, AckPolicy, headers, StringCodec } =
const { NatsServer, wsConfig } = require("./helpers/launcher");
const { jetstreamServerConf } = require("./helpers/jsutil");
const { DataBuffer } = require("../lib/nats-base-client/databuffer");
const { setTimeout } = require("timers");

test("jetstream - jsm", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
Expand Down Expand Up @@ -372,3 +373,176 @@ test("jetstream - os basics", async (t) => {
await nc.close();
await ns.stop();
});

test("jetstream - consumer basics", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();

await t.throwsAsync(async () => {
await js.consumers.get("stream", "a");
}, { message: "stream not found" });

await t.throwsAsync(async () => {
await js.streams.get("stream");
}, { message: "stream not found" });

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });

const s = await js.streams.get("stream");
t.truthy(s);
t.is(s.name, "stream");

await t.throwsAsync(async () => {
await js.consumers.get("stream", "a");
}, { message: "consumer not found" });

await t.throwsAsync(async () => {
await s.getConsumer("a");
}, { message: "consumer not found" });

await jsm.consumers.add("stream", {
name: "a",
ack_policy: AckPolicy.ackExplicit,
});

let c = await js.consumers.get("stream", "a");
t.truthy(c);

c = await s.getConsumer("a");
t.truthy(c);

let ci = await c.info(true);
t.is(ci.name, "a");
t.is(ci.num_pending, 0);

let m = await c.next({ expires: 1000 });
t.is(m, null);

await js.publish("hello.a");
await js.publish("hello.b");
await js.publish("hello.c");
await js.publish("hello.d");

ci = await c.info();
t.is(ci.num_pending, 4);

m = await c.next();
m.ack();
t.is(m.subject, "hello.a");

let iter = await c.fetch({ max_messages: 2 });
const buf = [];
for await (let m of iter) {
buf.push(m);
m.ack();
}
t.is(iter.getProcessed(), 2);
t.is(buf.length, 2);
t.is(buf[0].subject, "hello.b");
t.is(buf[1].subject, "hello.c");
buf.length = 0;

iter = await c.consume();
const done = (async () => {
for await (const m of iter) {
buf.push(m);
m.ack();
}
})();
setTimeout(() => {
iter.stop();
}, 2000);

await done;
t.is(iter.getProcessed(), 1);
t.is(buf.length, 1);
t.is(buf[0].subject, "hello.d");

await nc.close();
await ns.stop();
});

test("jetstream - ordered consumer basics", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });

const s = await js.streams.get("stream");

try {
await js.consumers.get("stream");
} catch (err) {
if (err.message.indexOf("is only supported on") !== -1) {
t.log(err.message);
t.pass();
await nc.close();
await ns.stop();
return;
} else {
t.fail(err);
}
}

let c = await js.consumers.get("stream");
t.truthy(c);

c = await s.getConsumer();
t.truthy(c);

let ci = await c.info(true);
t.is(ci.num_pending, 0);

let m = await c.next({ expires: 1000 });
t.is(m, null);

await js.publish("hello.a");
await js.publish("hello.b");
await js.publish("hello.c");
await js.publish("hello.d");

ci = await c.info();
t.is(ci.num_pending, 4);

m = await c.next();
m.ack();
t.is(m.subject, "hello.a");

c = await js.consumers.get("stream");
let iter = await c.fetch({ max_messages: 4 });
const buf = [];
for await (let m of iter) {
buf.push(m);
m.ack();
}
t.is(iter.getProcessed(), 4);
t.is(buf.length, 4);
t.is(buf[0].subject, "hello.a");
t.is(buf[1].subject, "hello.b");
t.is(buf[1].subject, "hello.c");
t.is(buf[1].subject, "hello.d");
buf.length = 0;

c = await js.consumers.get("stream");
iter = await c.consume();
const done = (async () => {
for await (const m of iter) {
buf.push(m);
m.ack();
}
})();
setTimeout(() => {
iter.stop();
}, 2000);

await done;
t.is(iter.getProcessed(), 4);
t.is(buf.length, 4);

await nc.close();
await ns.stop();
});

0 comments on commit f4b1f21

Please sign in to comment.