Skip to content

Commit

Permalink
Merge pull request #668 from nats-io/reduce-ci
Browse files Browse the repository at this point in the history
[FEAT] added a bind option to the consume, fetch, next - note this option is not supported in ordered consumers.
  • Loading branch information
aricart committed Mar 23, 2024
2 parents d3ab912 + 6c67382 commit 12cfdb8
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 19 deletions.
73 changes: 59 additions & 14 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import {
} from "./jsapi_types.ts";
import { JsHeaders } from "./types.ts";
import { SubscriptionImpl } from "../nats-base-client/protocol.ts";
import { assertRejects } from "https://deno.land/std@0.200.0/assert/assert_rejects.ts";

enum PullConsumerType {
Unset = -1,
Expand All @@ -55,38 +56,42 @@ enum PullConsumerType {
export type Ordered = {
ordered: true;
};
export type NextOptions = Expires;
export type NextOptions = Expires & Bind;
export type ConsumeBytes =
& MaxBytes
& Partial<MaxMessages>
& Partial<ThresholdBytes>
& Expires
& IdleHeartbeat
& ConsumeCallback
& AbortOnMissingResource;
& AbortOnMissingResource
& Bind;
export type ConsumeMessages =
& Partial<MaxMessages>
& Partial<ThresholdMessages>
& Expires
& IdleHeartbeat
& ConsumeCallback
& AbortOnMissingResource;
& AbortOnMissingResource
& Bind;
export type ConsumeOptions = ConsumeBytes | ConsumeMessages;
/**
* Options for fetching
* Options for fetching bytes
*/
export type FetchBytes =
& MaxBytes
& Partial<MaxMessages>
& Expires
& IdleHeartbeat;
& IdleHeartbeat
& Bind;
/**
* Options for a c
* Options for fetching messages
*/
export type FetchMessages =
& Partial<MaxMessages>
& Expires
& IdleHeartbeat;
& IdleHeartbeat
& Bind;
export type FetchOptions = FetchBytes | FetchMessages;
export type PullConsumerOptions = FetchOptions | ConsumeOptions;
export type MaxMessages = {
Expand Down Expand Up @@ -132,10 +137,21 @@ export type Expires = {
expires?: number;
};

export type Bind = {
/**
* If set to true the client will not try to check on its consumer by issuing consumer info
* requests. This means that the client may not report consumer not found, etc., and will simply
* fail request for messages due to missed heartbeats. This option is exclusive of abort_on_missing_resource.
*
* This option is not valid on ordered consumers.
*/
bind?: boolean;
};

export type AbortOnMissingResource = {
/**
* If true, consume will abort if the stream or consumer is not found. Default is to recover
* once the stream/consumer is restored.
* once the stream/consumer is restored. This option is exclusive of bind.
*/
abort_on_missing_resource?: boolean;
};
Expand Down Expand Up @@ -252,7 +268,6 @@ export interface ExportedConsumer {

export interface Consumer extends ExportedConsumer {
info(cached?: boolean): Promise<ConsumerInfo>;

delete(): Promise<boolean>;
}

Expand Down Expand Up @@ -283,6 +298,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
forOrderedConsumer: boolean;
resetHandler?: () => void;
abortOnMissingResource?: boolean;
bind: boolean;

// callback: ConsumerCallbackFn;
constructor(
Expand All @@ -293,8 +309,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
super();
this.consumer = c;

const copts = opts as ConsumeOptions;

this.opts = this.parseOptions(opts, refilling);
this.callback = (opts as ConsumeCallback).callback || null;
this.callback = copts.callback || null;
this.noIterator = typeof this.callback === "function";
this.monitor = null;
this.pong = null;
Expand All @@ -304,8 +322,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.inbox = createInbox(c.api.nc.options.inboxPrefix);
this.listeners = [];
this.forOrderedConsumer = false;
this.abortOnMissingResource =
(opts as ConsumeOptions).abort_on_missing_resource === true;
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
this.bind = copts.bind === true;

this.start();
}
Expand Down Expand Up @@ -525,7 +543,21 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
}

async resetPending(): Promise<boolean> {
resetPending(): Promise<boolean> {
return this.bind ? this.resetPendingNoInfo() : this.resetPendingWithInfo();
}

async resetPendingNoInfo(): Promise<boolean> {
// here we are blind - we won't do an info, so all we are doing
// is invalidating the previous request results.
this.pending.msgs = 0;
this.pending.bytes = 0;
this.pending.requests = 0;
this.pull(this.pullOptions());
return true;
}

async resetPendingWithInfo(): Promise<boolean> {
let notFound = 0;
let streamNotFound = 0;
const bo = backoff();
Expand Down Expand Up @@ -1122,6 +1154,10 @@ export class OrderedPullConsumerImpl implements Consumer {
max_messages: 100,
expires: 30_000,
} as ConsumeMessages): Promise<ConsumerMessages> {
const copts = opts as ConsumeOptions;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}
if (this.type === PullConsumerType.Fetch) {
return Promise.reject(new Error("ordered consumer initialized as fetch"));
}
Expand All @@ -1142,6 +1178,11 @@ export class OrderedPullConsumerImpl implements Consumer {
fetch(
opts: FetchOptions = { max_messages: 100, expires: 30_000 },
): Promise<ConsumerMessages> {
const copts = opts as ConsumeOptions;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}

if (this.type === PullConsumerType.Consume) {
return Promise.reject(
new Error("ordered consumer already initialized as consume"),
Expand All @@ -1168,9 +1209,13 @@ export class OrderedPullConsumerImpl implements Consumer {
async next(
opts: NextOptions = { expires: 30_000 },
): Promise<JsMsg | null> {
const d = deferred<JsMsg | null>();
const copts = opts as ConsumeOptions;
if (copts.bind) {
return Promise.reject(new Error("bind is not supported"));
}
copts.max_messages = 1;

const d = deferred<JsMsg | null>();
copts.callback = (m) => {
// we can clobber the callback, because they are not supported
// except on consume, which will fail when we try to fetch
Expand Down
67 changes: 64 additions & 3 deletions jetstream/tests/consumers_consume_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ import {
setup,
} from "../../tests/helpers/mod.ts";
import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts";
import { assertEquals } from "https://deno.land/std@0.200.0/assert/assert_equals.ts";
import { assertRejects } from "https://deno.land/std@0.200.0/assert/assert_rejects.ts";
import {
assert,
assertEquals,
assertExists,
assertRejects,
} from "https://deno.land/std@0.200.0/assert/mod.ts";
import { consumerHbTest } from "./consumers_test.ts";
import { initStream } from "./jstest_util.ts";
import { AckPolicy, DeliverPolicy } from "../jsapi_types.ts";
import { deadline, deferred, delay } from "../../nats-base-client/util.ts";
import { nanos } from "../jsutil.ts";
import { ConsumerEvents, PullConsumerMessagesImpl } from "../consumer.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { assertExists } from "https://deno.land/std@0.200.0/assert/assert_exists.ts";

Deno.test("consumers - consume", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -324,3 +327,61 @@ Deno.test("consumers - consume consumer not found request abort", async () => {

await cleanup(ns, nc);
});

Deno.test("consumers - consume consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });

await jsm.consumers.add("A", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
const c = await js.consumers.get("A", "a");
await c.delete();

const cisub = nc.subscribe("$JS.API.CONSUMER.INFO.A.a", {
callback: () => {},
});

const iter = await c.consume({
expires: 1000,
bind: true,
});

let hbm = 0;
let cnf = 0;

(async () => {
for await (const s of await iter.status()) {
switch (s.type) {
case ConsumerEvents.HeartbeatsMissed:
hbm++;
if (hbm > 5) {
iter.stop();
}
break;
case ConsumerEvents.ConsumerNotFound:
cnf++;
break;
}
}
})().then();

const done = (async () => {
for await (const _ of iter) {
// nothing
}
})();

await done;
assert(hbm > 1);
assertEquals(cnf, 0);
assertEquals(cisub.getProcessed(), 0);

await cleanup(ns, nc);
});
41 changes: 40 additions & 1 deletion jetstream/tests/consumers_fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import { delay } from "../../nats-base-client/util.ts";
import { assertRejects } from "https://deno.land/std@0.200.0/assert/assert_rejects.ts";
import { nanos } from "../jsutil.ts";
import { NatsConnectionImpl } from "../../nats-base-client/nats.ts";
import { PullConsumerMessagesImpl } from "../consumer.ts";
import { ConsumerEvents, PullConsumerMessagesImpl } from "../consumer.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { assertExists } from "https://deno.land/std@0.200.0/assert/assert_exists.ts";
import { assert } from "https://deno.land/std@0.200.0/assert/assert.ts";

Deno.test("consumers - fetch no messages", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -279,3 +280,41 @@ Deno.test("consumers - fetch sync", async () => {
assertEquals(await sync.next(), null);
await cleanup(ns, nc);
});

Deno.test("consumers - fetch consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });

await jsm.consumers.add("A", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await js.publish("a");

const c = await js.consumers.get("A", "a");
await c.delete();

const cisub = nc.subscribe("$JS.API.CONSUMER.INFO.A.a", {
callback: () => {},
});

const iter = await c.fetch({
expires: 1000,
bind: true,
});

const done = (async () => {
for await (const _ of iter) {
// nothing
}
})();

await done;
assertEquals(cisub.getProcessed(), 0);
await cleanup(ns, nc);
});
33 changes: 33 additions & 0 deletions jetstream/tests/consumers_next_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,36 @@ Deno.test("consumers - next stream not found", async () => {

await cleanup(ns, nc);
});

Deno.test("consumers - next consumer bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });

await jsm.consumers.add("A", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await js.publish("a");

const c = await js.consumers.get("A", "a");
await c.delete();

const cisub = nc.subscribe("$JS.API.CONSUMER.INFO.A.a", {
callback: () => {},
});

const msg = await c.next({
expires: 1000,
bind: true,
});

assertEquals(msg, null);
assertEquals(cisub.getProcessed(), 0);

await cleanup(ns, nc);
});
Loading

0 comments on commit 12cfdb8

Please sign in to comment.