Skip to content

Commit

Permalink
Merge pull request #676 from nats-io/fix-ackAck
Browse files Browse the repository at this point in the history
ackAck() always returning false
  • Loading branch information
aricart authored Mar 28, 2024
2 parents bcdda74 + 3f797d5 commit 88face5
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:
jobs:
test:
name: ${{ matrix.config.kind }} ${{ matrix.config.os }}
runs-on: ubuntu-latest
runs-on: ubuntu-latest-4-cores
environment: CI
strategy:
matrix:
Expand Down
11 changes: 9 additions & 2 deletions jetstream/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { RequestOne } from "../nats-base-client/request.ts";
import { nanos } from "./jsutil.ts";
import { Msg, MsgHdrs, RequestOptions } from "../nats-base-client/core.ts";
import { DeliveryInfo, PullOptions } from "./jsapi_types.ts";
import { deferred } from "../nats-base-client/mod.ts";

export const ACK = Uint8Array.of(43, 65, 67, 75);
const NAK = Uint8Array.of(45, 78, 65, 75);
Expand Down Expand Up @@ -220,6 +221,7 @@ export class JsMsgImpl implements JsMsg {
// this has to dig into the internals as the message has access
// to the protocol but not the high-level client.
async ackAck(): Promise<boolean> {
const d = deferred<boolean>();
if (!this.didAck) {
this.didAck = true;
if (this.msg.reply) {
Expand All @@ -243,13 +245,18 @@ export class JsMsgImpl implements JsMsg {
}
try {
await Promise.race([r.timer, r.deferred]);
return true;
d.resolve(true);
} catch (err) {
r.cancel(err);
d.reject(err);
}
} else {
d.resolve(false);
}
} else {
d.resolve(false);
}
return false;
return d;
}

ack() {
Expand Down
95 changes: 94 additions & 1 deletion jetstream/tests/jsmsg_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@
*/
import {
assertEquals,
assertRejects,
fail,
} from "https://deno.land/std@0.200.0/assert/mod.ts";
import {
AckPolicy,
connect,
createInbox,
Empty,
Msg,
StorageType,
StringCodec,
} from "../../src/mod.ts";
import { nanos } from "../jsutil.ts";
import { parseInfo, toJsMsg } from "../jsmsg.ts";
import { JsMsgImpl, parseInfo, toJsMsg } from "../jsmsg.ts";
import {
cleanup,
jetstreamServerConf,
setup,
} from "../../tests/helpers/mod.ts";
import { JetStreamManagerImpl } from "../jsm.ts";
import { MsgImpl } from "../../nats-base-client/msg.ts";

Deno.test("jsmsg - parse", () => {
// "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
Expand Down Expand Up @@ -132,3 +142,86 @@ Deno.test("jsmsg - acks", async () => {

await nc.close();
});

Deno.test("jsmsg - no ack consumer is ackAck 503", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const c = await js.consumers.get("A", "a");
const jm = await c.next();

await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck();
},
Error,
"503",
);

await cleanup(ns, nc);
});

Deno.test("jsmsg - explicit consumer ackAck", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await js.publish("a.a");

await jsm.consumers.add("A", {
durable_name: "a",
ack_policy: AckPolicy.Explicit,
});
const c = await js.consumers.get("A", "a");
const jm = await c.next();
assertEquals(await jm?.ackAck(), true);
assertEquals(await jm?.ackAck(), false);

await cleanup(ns, nc);
});

Deno.test("jsmsg - explicit consumer ackAck timeout", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const c = await js.consumers.get("A", "a");
const jm = await c.next();
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck();
},
Error,
"TIMEOUT",
);

await cleanup(ns, nc);
});

0 comments on commit 88face5

Please sign in to comment.