Skip to content

Commit

Permalink
[FEAT] allow access to consumer info in watch for internal extenders
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Nov 4, 2021
1 parent c70cc01 commit 97fa597
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 2 deletions.
8 changes: 6 additions & 2 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ export class JetStreamClientImpl extends BaseApiClient
);
}
}

jsi.last = info;
jsi.config = info.config;
jsi.attached = true;
}
Expand Down Expand Up @@ -504,6 +504,7 @@ export class JetStreamClientImpl extends BaseApiClient
const ci = await this.api.add(jsi.stream, jsi.config);
jsi.name = ci.name;
jsi.config = ci.config;
jsi.last = ci;
}

static ingestionFn(
Expand Down Expand Up @@ -630,7 +631,9 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
const jinfo = this.sub.info as JetStreamSubscriptionInfo;
const name = jinfo.config.durable_name || jinfo.name;
const subj = `${jinfo.api.prefix}.CONSUMER.INFO.${jinfo.stream}.${name}`;
return await jinfo.api._request(subj) as ConsumerInfo;
const ci = await jinfo.api._request(subj) as ConsumerInfo;
jinfo.last = ci;
return ci;
}
}

Expand Down Expand Up @@ -669,6 +672,7 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl

interface JetStreamSubscriptionInfo extends ConsumerOpts {
api: BaseApiClient;
last: ConsumerInfo;
attached: boolean;
deliver: string;
bind: boolean;
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ export class Bucket implements KV, KvRemove {
});

const sub = await this.js.subscribe(subj, copts);
qi._data = sub;
qi.iterClosed.then(() => {
sub.unsubscribe();
});
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
protocolFilterFn?: ProtocolFilterFn<T>;
dispatchedFn?: DispatchedFn<T>;
ctx?: unknown;
_data?: unknown; //data is for use by extenders in any way they like
private err?: Error;

constructor() {
Expand Down
27 changes: 27 additions & 0 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import {
validateKey,
} from "../nats-base-client/kv.ts";
import { notCompatible } from "./helpers/mod.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
import { JetStreamSubscriptionInfoable } from "../nats-base-client/jsclient.ts";

Deno.test("kv - key validation", () => {
const bad = [
Expand Down Expand Up @@ -660,3 +662,28 @@ Deno.test("kv - update key", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - internal consumer", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}

async function getCount(name: string): Promise<number> {
const b = await Bucket.create(nc, name) as Bucket;
let watch = await b.watch() as QueuedIteratorImpl<unknown>;
const sub = watch._data as JetStreamSubscriptionInfoable;
return sub?.info?.last?.num_pending || 0;
}

const name = nuid.next();
const b = await Bucket.create(nc, name) as Bucket;
assertEquals(await getCount(name), 0);

await b.put("a", Empty);
assertEquals(await getCount(name), 1);

await cleanup(ns, nc);
});

0 comments on commit 97fa597

Please sign in to comment.