diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9ae01c0f..dac4f5d8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,7 +30,7 @@ jobs: deno-version: ${{ matrix.deno-version }} - name: Set NATS Server Version - run: echo "NATS_VERSION=v2.10.1" >> $GITHUB_ENV + run: echo "NATS_VERSION=v2.10.3" >> $GITHUB_ENV # this here because dns seems to be wedged on gha # - name: Add hosts to /etc/hosts diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index 455ffd15..21e6f339 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -52,6 +52,7 @@ import { } from "../consumer.ts"; import { deadline } from "../../nats-base-client/util.ts"; import { syncIterator } from "../../nats-base-client/core.ts"; +import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts"; Deno.test("consumers - min supported server", async () => { const { ns, nc } = await setup(jetstreamServerConf({})); @@ -277,32 +278,20 @@ Deno.test("consumers - fetch exactly messages", async () => { }); Deno.test("consumers - consume", async () => { - const { ns, nc } = await setup(jetstreamServerConf({ - jetstream: { - max_memory_store: 1024 * 1024 * 1024, - }, - })); - - const count = 50_000; - const conf = await memStream(nc, count, 0, 5000); + const { ns, nc } = await setup(jetstreamServerConf()); - const jsm = await nc.jetstreamManager(); - await jsm.consumers.add(conf.stream, { - durable_name: "b", - ack_policy: AckPolicy.Explicit, - }); + const count = 1000; + const { stream, consumer } = await setupStreamAndConsumer(nc, count); + const js = nc.jetstream({ timeout: 30_000 }); + const c = await js.consumers.get(stream, consumer); + const ci = await c.info(); + assertEquals(ci.num_pending, count); const start = Date.now(); - const js = nc.jetstream(); - const consumer = await js.consumers.get(conf.stream, "b"); - assertEquals((await consumer.info(true)).num_pending, count); - const iter = await consumer.consume({ - expires: 10_000, - max_messages: 50_000, - }); + const iter = await c.consume({ expires: 2_000, max_messages: 10 }); for await (const m of iter) { m.ack(); - if (m.seq === count) { + if (m.info.pending === 0) { const millis = Date.now() - start; console.log( `consumer: ${millis}ms - ${count / (millis / 1000)} msgs/sec`, @@ -312,33 +301,23 @@ Deno.test("consumers - consume", async () => { } assertEquals(iter.getReceived(), count); assertEquals(iter.getProcessed(), count); - assertEquals((await consumer.info()).num_pending, 0); + assertEquals((await c.info()).num_pending, 0); await cleanup(ns, nc); }); Deno.test("consumers - consume callback rejects iter", async () => { - const { ns, nc } = await setup(jetstreamServerConf({ - jetstream: { - max_memory_store: 1024 * 1024 * 1024, - }, - })); - - const conf = await memStream(nc, 0); - const jsm = await nc.jetstreamManager(); - await jsm.consumers.add(conf.stream, { - durable_name: "b", - ack_policy: AckPolicy.Explicit, - }); - + const { ns, nc } = await setup(jetstreamServerConf()); + const { stream, consumer } = await setupStreamAndConsumer(nc, 0); const js = nc.jetstream(); - const consumer = await js.consumers.get(conf.stream, "b"); - const iter = await consumer.consume({ - expires: 10_000, - max_messages: 50_000, + const c = await js.consumers.get(stream, consumer); + const iter = await c.consume({ + expires: 5_000, + max_messages: 10_000, callback: (m) => { m.ack(); }, }); + await assertRejects( async () => { for await (const _o of iter) { @@ -361,39 +340,6 @@ Deno.test("consumers - fetch heartbeats", async () => { await consumerHbTest(true); }); -async function memStream( - nc: NatsConnection, - msgs = 1000, - size = 0, - batch = 10000, -): Promise<{ millis: number; stream: string; subj: string }> { - const jsm = await nc.jetstreamManager(); - const stream = nuid.next(); - const subj = nuid.next(); - await jsm.streams.add({ - name: stream, - subjects: [subj], - storage: StorageType.Memory, - }); - const payload = new Uint8Array(size); - - const js = nc.jetstream(); - const start = Date.now(); - const buf: Promise[] = []; - for (let i = 0; i < msgs; i++) { - buf.push(js.publish(subj, payload)); - if (buf.length === batch) { - await Promise.all(buf); - buf.length = 0; - } - } - if (buf.length) { - await Promise.all(buf); - buf.length = 0; - } - return { millis: Date.now() - start, subj, stream }; -} - /** * Setup a cluster that has N nodes with the first node being just a connection * server - rest are JetStream - min number of servers is 3 diff --git a/jetstream/tests/objectstore_test.ts b/jetstream/tests/objectstore_test.ts index 3d1f24c7..86a031d1 100644 --- a/jetstream/tests/objectstore_test.ts +++ b/jetstream/tests/objectstore_test.ts @@ -853,6 +853,7 @@ Deno.test("objectstore - hashtests", async () => { const oi = await os.put( { name: t.hash, options: { max_chunk_size: 9 } }, rs.stream(), + { timeout: 20_000 }, ); assertEquals(oi.digest, `${digestType}${t.hash}`); } diff --git a/tests/basics_test.ts b/tests/basics_test.ts index 3080fc7e..05827d80 100644 --- a/tests/basics_test.ts +++ b/tests/basics_test.ts @@ -1175,15 +1175,9 @@ Deno.test("basics - initial connect error", async () => { Deno.test("basics - close promise resolves", async () => { const ns = await NatsServer.start(); const nc = await connect({ port: ns.port, reconnect: false }); - setTimeout(() => { - ns.stop(); - }); - - await nc.closed().then((err) => { - assertEquals(err, undefined); - }).catch((err) => { - fail(err); - }); + const results = await Promise.all([nc.closed(), nc.close()]); + assertEquals(results[0], undefined); + await ns.stop(); }); Deno.test("basics - inbox prefixes cannot have wildcards", async () => { diff --git a/tests/heartbeats_test.ts b/tests/heartbeats_test.ts index 5e1502a1..9bebcf1e 100644 --- a/tests/heartbeats_test.ts +++ b/tests/heartbeats_test.ts @@ -66,8 +66,10 @@ Deno.test("heartbeat - timers fire", async () => { await delay(400); assert(hb.timer); hb.cancel(); + // we can have a timer still running here - we need to wait for lag + await delay(50); assertEquals(hb.timer, undefined); - assert(status.length >= 3, `status ${status.length} >= 3`); + assert(status.length >= 2, `status ${status.length} >= 2`); assertEquals(status[0].type, DebugEvents.PingTimer); }); @@ -82,7 +84,6 @@ Deno.test("heartbeat - errors fire on missed maxOut", async () => { const hb = new Heartbeat(ph, 100, 3); hb._schedule(); - await disconnect; assertEquals(hb.timer, undefined); assert(status.length >= 7, `${status.length} >= 7`); @@ -99,7 +100,7 @@ Deno.test("heartbeat - recovers from missed", async () => { const hb = new Heartbeat(ph, 100, 3); hb._schedule(); - await delay(800); + await delay(850); hb.cancel(); assertEquals(hb.timer, undefined); assert(status.length >= 6, `${status.length} >= 6`); diff --git a/tests/idleheartbeats_test.ts b/tests/idleheartbeats_test.ts index 48dc722a..e6302b29 100644 --- a/tests/idleheartbeats_test.ts +++ b/tests/idleheartbeats_test.ts @@ -102,8 +102,8 @@ Deno.test("idleheartbeat - timeout autocancel", async () => { // and resource leaks for a timer if not cleared. await d; - assert(h.count >= 7); assertEquals(h.cancelAfter, 2000); assertEquals(h.timer, 0); assertEquals(h.autoCancelTimer, 0); + assert(h.count >= 7, `${h.count} >= 7`); }); diff --git a/tests/mrequest_test.ts b/tests/mrequest_test.ts index d6a3aa9f..67cb74fb 100644 --- a/tests/mrequest_test.ts +++ b/tests/mrequest_test.ts @@ -101,7 +101,7 @@ async function requestManyJitter(noMux = false): Promise { } } const time = Date.now() - start; - assert(500 > time); + assert(1000 > time); assertEquals(iter.getProcessed(), 10); await cleanup(ns, nc); } diff --git a/tests/reconnect_test.ts b/tests/reconnect_test.ts index 1150f84b..2a858ac2 100644 --- a/tests/reconnect_test.ts +++ b/tests/reconnect_test.ts @@ -186,7 +186,7 @@ Deno.test("reconnect - indefinite reconnects", async () => { await srv.stop(); await lock; await srv.stop(); - assert(reconnects > 5); + assert(reconnects >= 5, `${reconnects} >= 5`); assert(reconnect); assertEquals(disconnects, 1); }); @@ -334,6 +334,9 @@ Deno.test("reconnect - close stops reconnects", async () => { // the promise will reject if deadline exceeds fail(err); }); + // await some more, because the close could have a timer pending that + // didn't complete flapping the test on resource leak + await delay(750); }); Deno.test("reconnect - stale connections don't close", async () => {