Skip to content

Commit

Permalink
[FEAT] added ability to get() a stream name - this API returns an o…
Browse files Browse the repository at this point in the history
…bject that allows for easy access to alternates, best (best mirror as per nats-server), get an existing consumer (or ordered consumer) associated with the stream, and to manually retrieve a message. Some of this functionality is already available via JSM, this simply provides more convenient access. (#508)
  • Loading branch information
aricart committed May 11, 2023
1 parent 274dc50 commit d665542
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 9 deletions.
6 changes: 5 additions & 1 deletion nats-base-client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ export class PullConsumerImpl implements Consumer {
return Promise.resolve(this._info);
}
const { stream_name, name } = this._info;
return this.api.info(stream_name, name);
return this.api.info(stream_name, name)
.then((ci) => {
this._info = ci;
return this._info;
});
}
}

Expand Down
20 changes: 15 additions & 5 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
KvOptions,
ObjectStore,
ObjectStoreOptions,
Streams,
Views,
} from "./types.ts";
import {
Expand Down Expand Up @@ -87,6 +88,8 @@ import { Feature } from "./semver.ts";
import { ObjectStoreImpl } from "./objectstore.ts";
import { IdleHeartbeat } from "./idleheartbeat.ts";
import { ConsumersImpl } from "./consumers.ts";
import { StreamsImpl } from "./stream.ts";
import { StreamAPIImpl } from "./jsmstream_api.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand Down Expand Up @@ -123,11 +126,15 @@ class ViewsImpl implements Views {
export class JetStreamClientImpl extends BaseApiClient
implements JetStreamClient {
consumers: Consumers;
api: ConsumerAPI;
streams: Streams;
consumerAPI: ConsumerAPI;
streamAPI: StreamAPIImpl;
constructor(nc: NatsConnection, opts?: JetStreamOptions) {
super(nc, opts);
this.api = new ConsumerAPIImpl(nc, opts);
this.consumers = new ConsumersImpl(this.api);
this.consumerAPI = new ConsumerAPIImpl(nc, opts);
this.streamAPI = new StreamAPIImpl(nc, opts);
this.consumers = new ConsumersImpl(this.consumerAPI);
this.streams = new StreamsImpl(this.streamAPI);
}

get apiPrefix(): string {
Expand Down Expand Up @@ -546,7 +553,10 @@ export class JetStreamClientImpl extends BaseApiClient
jsi.attached = false;
if (jsi.config.durable_name) {
try {
const info = await this.api.info(jsi.stream, jsi.config.durable_name);
const info = await this.consumerAPI.info(
jsi.stream,
jsi.config.durable_name,
);
if (info) {
if (
info.config.filter_subject && info.config.filter_subject !== subject
Expand Down Expand Up @@ -646,7 +656,7 @@ export class JetStreamClientImpl extends BaseApiClient
replay_policy: ReplayPolicy.Instant,
}, jsi.config);

const ci = await this.api.add(jsi.stream, jsi.config);
const ci = await this.consumerAPI.add(jsi.stream, jsi.config);
if (
Array.isArray(
jsi.config.filter_subjects && !Array.isArray(ci.config.filter_subjects),
Expand Down
7 changes: 7 additions & 0 deletions nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
PurgeResponse,
PurgeTrimOpts,
StoredMsg,
Stream,
StreamAPI,
StreamConfig,
StreamInfo,
Expand All @@ -50,6 +51,7 @@ import { Codec, JSONCodec } from "./codec.ts";
import { TD } from "./encoders.ts";
import { Feature } from "./semver.ts";
import { NatsConnectionImpl } from "./nats.ts";
import { StreamImpl } from "./stream.ts";

export function convertStreamSourceDomain(s?: StreamSource) {
if (s === undefined) {
Expand Down Expand Up @@ -319,6 +321,11 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
const subj = `${this.prefix}.STREAM.NAMES`;
return new ListerImpl<string>(subj, listerFilter, this, payload);
}

async get(name: string): Promise<Stream> {
const si = await this.info(name);
return Promise.resolve(new StreamImpl(this, si));
}
}

export class StoredMsgImpl implements StoredMsg {
Expand Down
97 changes: 97 additions & 0 deletions nats-base-client/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
Consumer,
MsgRequest,
StoredMsg,
Stream,
StreamAlternate,
StreamAPI,
StreamInfo,
Streams,
} from "./types.ts";
import { StreamAPIImpl } from "./jsmstream_api.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { OrderedConsumerOptions } from "./consumer.ts";
import { ConsumersImpl } from "./consumers.ts";

export class StreamsImpl implements Streams {
api: StreamAPIImpl;

constructor(api: StreamAPI) {
this.api = api as StreamAPIImpl;
}

get(stream: string): Promise<Stream> {
return this.api.info(stream)
.then((si) => {
return new StreamImpl(this.api, si);
});
}
}

export class StreamImpl implements Stream {
api: StreamAPIImpl;
_info: StreamInfo;

constructor(api: StreamAPI, info: StreamInfo) {
this.api = api as StreamAPIImpl;
this._info = info;
}

get name(): string {
return this._info.config.name;
}

alternates(): Promise<StreamAlternate[]> {
return this.info()
.then((si) => {
return si.alternates ? si.alternates : [];
});
}

async best(): Promise<Stream> {
await this.info();
if (this._info.alternates) {
const asi = await this.api.info(this._info.alternates[0].name);
return new StreamImpl(this.api, asi);
} else {
return this;
}
}

info(cached = false): Promise<StreamInfo> {
if (cached) {
return Promise.resolve(this._info);
}
return this.api.info(this.name)
.then((si) => {
this._info = si;
return this._info;
});
}

getConsumer(
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer> {
return new ConsumersImpl(new ConsumerAPIImpl(this.api.nc, this.api.opts))
.get(this.name, name);
}

getMessage(query: MsgRequest): Promise<StoredMsg> {
return this.api.getMessage(this.name, query);
}
}
26 changes: 26 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,15 @@ export interface JetStreamClient {
* consumer use {@link JetStreamManager}.
*/
consumers: Consumers;

/**
* Returns the interface for accessing {@link Streams}.
*/
streams: Streams;
}

export interface Streams {
get(stream: string): Promise<Stream>;
}

export interface Consumers {
Expand Down Expand Up @@ -1480,6 +1489,12 @@ export interface StreamAPI {
* subject (can be wildcarded)
*/
names(subject?: string): Lister<string>;

/**
* Returns a Stream object
* @param name
*/
get(name: string): Promise<Stream>;
}

/**
Expand Down Expand Up @@ -2826,6 +2841,17 @@ export interface StreamNameBySubject {
subject: string;
}

export interface Stream {
name: string;
info(cached?: boolean): Promise<StreamInfo>;
alternates(): Promise<StreamAlternate[]>;
best(): Promise<Stream>;
getConsumer(
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;
getMessage(query: MsgRequest): Promise<StoredMsg>;
}

export enum JsHeaders {
/**
* Set if message is from a stream source - format is `stream seq`
Expand Down
26 changes: 24 additions & 2 deletions tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,33 @@ Deno.test("consumers - info", async () => {

const js = nc.jetstream();

const { stream } = await initStream(nc);
const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, { durable_name: "b" });
const c = await js.consumers.get(stream, "b");
assertEquals(await jsm.consumers.info(stream, "b"), await c.info());
// retrieve the cached consumer - no messages
const cached = await c.info(false);
assertEquals(cached.num_pending, 0);
assertEquals(await jsm.consumers.info(stream, "b"), cached);

// add a message, retrieve the cached one - still not updated
await js.publish(subj);
assertEquals(await c.info(true), cached);

// update - info and cached copies are updated
const ci = await c.info();
assertEquals(ci.num_pending, 1);
assertEquals((await c.info(true)).num_pending, 1);

await assertRejects(
async () => {
await jsm.consumers.delete(stream, "b");
await c.info();
},
Error,
"consumer not found",
);

await cleanup(ns, nc);
});

Expand Down
1 change: 1 addition & 0 deletions tests/helpers/launcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ export class NatsServer implements PortInfo {
conf.http = conf.http || "127.0.0.1:-1";
conf.leafnodes = conf.leafnodes || {};
conf.leafnodes.listen = conf.leafnodes.listen || "127.0.0.1:-1";
conf.server_tags = [`id:${nuid.next()}`];

return conf;
}
Expand Down
9 changes: 8 additions & 1 deletion tests/jstest_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { AckPolicy, connect, nanos, PubAck } from "../src/mod.ts";
import { assert } from "https://deno.land/std@0.177.0/testing/asserts.ts";
import {
ConnectionOptions,
Empty,
extend,
NatsConnection,
nuid,
Expand Down Expand Up @@ -133,6 +134,7 @@ export async function createConsumer(
export type FillOptions = {
randomize: boolean;
suffixes: string[];
payload: number;
};

export function fill(
Expand All @@ -146,18 +148,23 @@ export function fill(
const options = Object.assign({}, {
randomize: false,
suffixes: "abcdefghijklmnopqrstuvwxyz".split(""),
payload: 0,
}, opts) as FillOptions;

function randomSuffix(): string {
const idx = Math.floor(Math.random() * options.suffixes.length);
return options.suffixes[idx];
}

const payload = options.payload === 0
? Empty
: new Uint8Array(options.payload);

const a = Array.from({ length: count }, (_, idx) => {
const subj = opts.randomize
? `${prefix}.${randomSuffix()}`
: `${prefix}.${options.suffixes[idx % options.suffixes.length]}`;
return js.publish(subj);
return js.publish(subj, payload);
});

return Promise.all(a);
Expand Down
Loading

0 comments on commit d665542

Please sign in to comment.