Skip to content

Commit

Permalink
Merge pull request #680 from nats-io/js-dir-cleanup
Browse files Browse the repository at this point in the history
[CI] cleanup of test data and configurations
  • Loading branch information
aricart committed Apr 5, 2024
2 parents cba483d + 62abea2 commit 1c061a9
Show file tree
Hide file tree
Showing 17 changed files with 395 additions and 370 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
TMPDIR: ${{ runner.temp }}
CI: true
NGS_CI_USER: ${{ secrets.NGS_CI_USER }}
run: deno test --allow-all --unstable --failfast --coverage=./cov
run: deno test --allow-all --unstable --parallel --fail-fast --coverage=./cov

- name: Build nats.js
run: deno bundle --unstable src/connect.ts nats.js
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import {
cleanup,
jetstreamServerConf,
NatsServer,
setup,
} from "../../tests/helpers/mod.ts";
import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts";
Expand All @@ -25,13 +26,17 @@ import {
assertExists,
assertRejects,
} from "https://deno.land/std@0.221.0/assert/mod.ts";
import { consumerHbTest } from "./consumers_test.ts";
import { initStream } from "./jstest_util.ts";
import { AckPolicy, DeliverPolicy } from "../jsapi_types.ts";
import { deadline, deferred, delay } from "../../nats-base-client/util.ts";
import { nanos } from "../jsutil.ts";
import { ConsumerEvents, PullConsumerMessagesImpl } from "../consumer.ts";
import {
ConsumerEvents,
ConsumerStatus,
PullConsumerMessagesImpl,
} from "../consumer.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { connect } from "../../src/connect.ts";

Deno.test("consumers - consume", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -88,11 +93,60 @@ Deno.test("consumers - consume callback rejects iter", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume heartbeats", async () => {
await consumerHbTest(false);
Deno.test("consume - heartbeats", async () => {
const servers = await NatsServer.setupDataConnCluster(4);
const nc = await connect({ port: servers[0].port });
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
const c = await js.consumers.get(stream, "a");
const iter = await c.consume({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});

const buf: Promise<void>[] = [];
// stop the data serverss
setTimeout(() => {
buf.push(servers[1].stop());
buf.push(servers[2].stop());
buf.push(servers[3].stop());
}, 1000);

await Promise.all(buf);

const d = deferred<ConsumerStatus>();

await (async () => {
const status = await iter.status();
for await (const s of status) {
d.resolve(s);
iter.stop();
break;
}
})();

await (async () => {
for await (const _r of iter) {
// nothing
}
})();

const cs = await d;
assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed);
assertEquals(cs.data, 2);

await nc.close();
await NatsServer.stopAll(servers, true);
});

Deno.test("consumers - consume deleted consumer", async () => {
Deno.test("consume - deleted consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -142,7 +196,7 @@ Deno.test("consumers - consume deleted consumer", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - sub leaks consume()", async () => {
Deno.test("consume - sub leaks", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

Expand Down Expand Up @@ -171,7 +225,7 @@ Deno.test("consumers - sub leaks consume()", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume drain", async () => {
Deno.test("consume - drain", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

Expand All @@ -198,7 +252,7 @@ Deno.test("consumers - consume drain", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume sync", async () => {
Deno.test("consume - sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
Expand All @@ -225,7 +279,7 @@ Deno.test("consumers - consume sync", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume stream not found request abort", async () => {
Deno.test("consume - stream not found request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -258,7 +312,7 @@ Deno.test("consumers - consume stream not found request abort", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer deleted request abort", async () => {
Deno.test("consume - consumer deleted request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -294,7 +348,7 @@ Deno.test("consumers - consume consumer deleted request abort", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer not found request abort", async () => {
Deno.test("consume - consumer not found request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -328,7 +382,7 @@ Deno.test("consumers - consume consumer not found request abort", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer bind", async () => {
Deno.test("consume - consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down
63 changes: 13 additions & 50 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,42 +185,7 @@ Deno.test("consumers - push consumer not supported", async () => {
});

Deno.test("consumers - fetch heartbeats", async () => {
await consumerHbTest(true);
});

/**
* Setup a cluster that has N nodes with the first node being just a connection
* server - rest are JetStream - min number of servers is 3
* @param count
* @param debug
*/
async function setupDataConnCluster(
count = 4,
debug = false,
): Promise<NatsServer[]> {
if (count < 3) {
return Promise.reject(new Error("min cluster is 4"));
}
let servers = await NatsServer.jetstreamCluster(count, {}, debug);
await NatsServer.stopAll(servers);

servers[0].config.jetstream = "disabled";
for (let i = 1; i < servers.length; i++) {
await Deno.remove(servers[i].config.jetstream.store_dir, {
recursive: true,
});
}

const proms = servers.map((s) => {
return s.restart();
});
servers = await Promise.all(proms);
await NatsServer.dataClusterFormed(proms.slice(1));
return servers;
}

export async function consumerHbTest(fetch: boolean) {
const servers = await setupDataConnCluster(3);
const servers = await NatsServer.setupDataConnCluster(4);

const nc = await connect({ port: servers[0].port });
const { stream } = await initStream(nc);
Expand All @@ -232,24 +197,22 @@ export async function consumerHbTest(fetch: boolean) {

const js = nc.jetstream();
const c = await js.consumers.get(stream, "a");
const iter: ConsumerMessages = fetch
? await c.fetch({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
})
: await c.consume({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});
const iter: ConsumerMessages = await c.fetch({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});

const buf: Promise<void>[] = [];
// stop the data serverss
setTimeout(() => {
servers[1].stop();
servers[2].stop();
buf.push(servers[1].stop());
buf.push(servers[2].stop());
buf.push(servers[3].stop());
}, 1000);

await Promise.all(buf);

const d = deferred<ConsumerStatus>();

await (async () => {
Expand All @@ -273,7 +236,7 @@ export async function consumerHbTest(fetch: boolean) {

await nc.close();
await NatsServer.stopAll(servers, true);
}
});

Deno.test("consumers - bad options", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { NatsConnectionImpl } from "../../nats-base-client/nats.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { PullConsumerMessagesImpl } from "../consumer.ts";

Deno.test("consumers - fetch no messages", async () => {
Deno.test("fetch - no messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const { stream } = await initStream(nc);
Expand All @@ -58,7 +58,7 @@ Deno.test("consumers - fetch no messages", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch less messages", async () => {
Deno.test("fetch - less messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const { stream, subj } = await initStream(nc);
Expand All @@ -83,7 +83,7 @@ Deno.test("consumers - fetch less messages", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch exactly messages", async () => {
Deno.test("fetch - exactly messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const { stream, subj } = await initStream(nc);
Expand Down Expand Up @@ -115,7 +115,7 @@ Deno.test("consumers - fetch exactly messages", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch consumer not found", async () => {
Deno.test("fetch - consumer not found", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["hello"] });
Expand Down Expand Up @@ -149,7 +149,7 @@ Deno.test("consumers - fetch consumer not found", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch deleted consumer", async () => {
Deno.test("fetch - deleted consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });
Expand Down Expand Up @@ -184,7 +184,7 @@ Deno.test("consumers - fetch deleted consumer", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch stream not found", async () => {
Deno.test("fetch - stream not found", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -216,7 +216,7 @@ Deno.test("consumers - fetch stream not found", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch listener leaks", async () => {
Deno.test("fetch - listener leaks", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
Expand Down Expand Up @@ -256,7 +256,7 @@ Deno.test("consumers - fetch listener leaks", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch sync", async () => {
Deno.test("fetch - sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
Expand All @@ -282,7 +282,7 @@ Deno.test("consumers - fetch sync", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch consumer bind", async () => {
Deno.test("fetch - consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down
10 changes: 5 additions & 5 deletions jetstream/tests/jetream409_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async function expectPullSubscribeCallbackError(
}

Deno.test("409 - max_batch", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -129,7 +129,7 @@ Deno.test("409 - max_batch", async () => {
});

Deno.test("409 - max_expires", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -163,7 +163,7 @@ Deno.test("409 - max_expires", async () => {
});

Deno.test("409 - max_bytes", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.8.3")) {
return;
}
Expand Down Expand Up @@ -200,7 +200,7 @@ Deno.test("409 - max_bytes", async () => {
});

Deno.test("409 - max msg size", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.9.0")) {
return;
}
Expand Down Expand Up @@ -236,7 +236,7 @@ Deno.test("409 - max msg size", async () => {
});

Deno.test("409 - max waiting", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
Expand Down
Loading

0 comments on commit 1c061a9

Please sign in to comment.