Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI] cleanup of test data and configurations #680

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
TMPDIR: ${{ runner.temp }}
CI: true
NGS_CI_USER: ${{ secrets.NGS_CI_USER }}
run: deno test --allow-all --unstable --failfast --coverage=./cov
run: deno test --allow-all --unstable --parallel --fail-fast --coverage=./cov

- name: Build nats.js
run: deno bundle --unstable src/connect.ts nats.js
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import {
cleanup,
jetstreamServerConf,
NatsServer,
setup,
} from "../../tests/helpers/mod.ts";
import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts";
Expand All @@ -25,13 +26,17 @@ import {
assertExists,
assertRejects,
} from "https://deno.land/std@0.221.0/assert/mod.ts";
import { consumerHbTest } from "./consumers_test.ts";
import { initStream } from "./jstest_util.ts";
import { AckPolicy, DeliverPolicy } from "../jsapi_types.ts";
import { deadline, deferred, delay } from "../../nats-base-client/util.ts";
import { nanos } from "../jsutil.ts";
import { ConsumerEvents, PullConsumerMessagesImpl } from "../consumer.ts";
import {
ConsumerEvents,
ConsumerStatus,
PullConsumerMessagesImpl,
} from "../consumer.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { connect } from "../../src/connect.ts";

Deno.test("consumers - consume", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -88,11 +93,60 @@ Deno.test("consumers - consume callback rejects iter", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume heartbeats", async () => {
await consumerHbTest(false);
Deno.test("consume - heartbeats", async () => {
const servers = await NatsServer.setupDataConnCluster(4);
const nc = await connect({ port: servers[0].port });
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
const c = await js.consumers.get(stream, "a");
const iter = await c.consume({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});

const buf: Promise<void>[] = [];
// stop the data serverss
setTimeout(() => {
buf.push(servers[1].stop());
buf.push(servers[2].stop());
buf.push(servers[3].stop());
}, 1000);

await Promise.all(buf);

const d = deferred<ConsumerStatus>();

await (async () => {
const status = await iter.status();
for await (const s of status) {
d.resolve(s);
iter.stop();
break;
}
})();

await (async () => {
for await (const _r of iter) {
// nothing
}
})();

const cs = await d;
assertEquals(cs.type, ConsumerEvents.HeartbeatsMissed);
assertEquals(cs.data, 2);

await nc.close();
await NatsServer.stopAll(servers, true);
});

Deno.test("consumers - consume deleted consumer", async () => {
Deno.test("consume - deleted consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -142,7 +196,7 @@ Deno.test("consumers - consume deleted consumer", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - sub leaks consume()", async () => {
Deno.test("consume - sub leaks", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

Expand Down Expand Up @@ -171,7 +225,7 @@ Deno.test("consumers - sub leaks consume()", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume drain", async () => {
Deno.test("consume - drain", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

Expand All @@ -198,7 +252,7 @@ Deno.test("consumers - consume drain", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume sync", async () => {
Deno.test("consume - sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
Expand All @@ -225,7 +279,7 @@ Deno.test("consumers - consume sync", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume stream not found request abort", async () => {
Deno.test("consume - stream not found request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -258,7 +312,7 @@ Deno.test("consumers - consume stream not found request abort", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer deleted request abort", async () => {
Deno.test("consume - consumer deleted request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -294,7 +348,7 @@ Deno.test("consumers - consume consumer deleted request abort", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer not found request abort", async () => {
Deno.test("consume - consumer not found request abort", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -328,7 +382,7 @@ Deno.test("consumers - consume consumer not found request abort", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer bind", async () => {
Deno.test("consume - consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down
63 changes: 13 additions & 50 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,42 +185,7 @@ Deno.test("consumers - push consumer not supported", async () => {
});

Deno.test("consumers - fetch heartbeats", async () => {
await consumerHbTest(true);
});

/**
* Setup a cluster that has N nodes with the first node being just a connection
* server - rest are JetStream - min number of servers is 3
* @param count
* @param debug
*/
async function setupDataConnCluster(
count = 4,
debug = false,
): Promise<NatsServer[]> {
if (count < 3) {
return Promise.reject(new Error("min cluster is 4"));
}
let servers = await NatsServer.jetstreamCluster(count, {}, debug);
await NatsServer.stopAll(servers);

servers[0].config.jetstream = "disabled";
for (let i = 1; i < servers.length; i++) {
await Deno.remove(servers[i].config.jetstream.store_dir, {
recursive: true,
});
}

const proms = servers.map((s) => {
return s.restart();
});
servers = await Promise.all(proms);
await NatsServer.dataClusterFormed(proms.slice(1));
return servers;
}

export async function consumerHbTest(fetch: boolean) {
const servers = await setupDataConnCluster(3);
const servers = await NatsServer.setupDataConnCluster(4);

const nc = await connect({ port: servers[0].port });
const { stream } = await initStream(nc);
Expand All @@ -232,24 +197,22 @@ export async function consumerHbTest(fetch: boolean) {

const js = nc.jetstream();
const c = await js.consumers.get(stream, "a");
const iter: ConsumerMessages = fetch
? await c.fetch({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
})
: await c.consume({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});
const iter: ConsumerMessages = await c.fetch({
max_messages: 100,
idle_heartbeat: 1000,
expires: 30000,
});

const buf: Promise<void>[] = [];
// stop the data serverss
setTimeout(() => {
servers[1].stop();
servers[2].stop();
buf.push(servers[1].stop());
buf.push(servers[2].stop());
buf.push(servers[3].stop());
}, 1000);

await Promise.all(buf);

const d = deferred<ConsumerStatus>();

await (async () => {
Expand All @@ -273,7 +236,7 @@ export async function consumerHbTest(fetch: boolean) {

await nc.close();
await NatsServer.stopAll(servers, true);
}
});

Deno.test("consumers - bad options", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { NatsConnectionImpl } from "../../nats-base-client/nats.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { PullConsumerMessagesImpl } from "../consumer.ts";

Deno.test("consumers - fetch no messages", async () => {
Deno.test("fetch - no messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const { stream } = await initStream(nc);
Expand All @@ -58,7 +58,7 @@ Deno.test("consumers - fetch no messages", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch less messages", async () => {
Deno.test("fetch - less messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const { stream, subj } = await initStream(nc);
Expand All @@ -83,7 +83,7 @@ Deno.test("consumers - fetch less messages", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch exactly messages", async () => {
Deno.test("fetch - exactly messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const { stream, subj } = await initStream(nc);
Expand Down Expand Up @@ -115,7 +115,7 @@ Deno.test("consumers - fetch exactly messages", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch consumer not found", async () => {
Deno.test("fetch - consumer not found", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["hello"] });
Expand Down Expand Up @@ -149,7 +149,7 @@ Deno.test("consumers - fetch consumer not found", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch deleted consumer", async () => {
Deno.test("fetch - deleted consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });
Expand Down Expand Up @@ -184,7 +184,7 @@ Deno.test("consumers - fetch deleted consumer", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch stream not found", async () => {
Deno.test("fetch - stream not found", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -216,7 +216,7 @@ Deno.test("consumers - fetch stream not found", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch listener leaks", async () => {
Deno.test("fetch - listener leaks", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
Expand Down Expand Up @@ -256,7 +256,7 @@ Deno.test("consumers - fetch listener leaks", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch sync", async () => {
Deno.test("fetch - sync", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
Expand All @@ -282,7 +282,7 @@ Deno.test("consumers - fetch sync", async () => {
await cleanup(ns, nc);
});

Deno.test("consumers - fetch consumer bind", async () => {
Deno.test("fetch - consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
Expand Down
10 changes: 5 additions & 5 deletions jetstream/tests/jetream409_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async function expectPullSubscribeCallbackError(
}

Deno.test("409 - max_batch", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -129,7 +129,7 @@ Deno.test("409 - max_batch", async () => {
});

Deno.test("409 - max_expires", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
Expand Down Expand Up @@ -163,7 +163,7 @@ Deno.test("409 - max_expires", async () => {
});

Deno.test("409 - max_bytes", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.8.3")) {
return;
}
Expand Down Expand Up @@ -200,7 +200,7 @@ Deno.test("409 - max_bytes", async () => {
});

Deno.test("409 - max msg size", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.9.0")) {
return;
}
Expand Down Expand Up @@ -236,7 +236,7 @@ Deno.test("409 - max msg size", async () => {
});

Deno.test("409 - max waiting", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
Expand Down
Loading
Loading