Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream/consumer info timestamps, stream configuration ` #561

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions jetstream/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ export interface StreamInfo extends ApiPaged {
* closer and faster to access.
*/
alternates?: StreamAlternate[];
/**
* The ISO timestamp when the StreamInfo was generated. This field is only available
* on servers 2.10.x or better
*/
"ts"?: string;
}

export interface SubjectTransformConfig {
Expand Down Expand Up @@ -125,6 +130,13 @@ export interface StreamConfig extends StreamUpdateConfig {
* Can only be set on already created streams via the Update API
*/
sealed: boolean;

/**
* Sets the first sequence number used by the stream. This property can only be
* specified when creating the stream, and likely is not valid on mirrors etc,
* as it may disrupt the synchronization logic.
*/
"first_seq": number;
}

/**
Expand Down Expand Up @@ -623,9 +635,9 @@ export interface ConsumerInfo {
*/
name: string;
/**
* The time the Consumer was created
* The ISO timestamp when the Consumer was created
*/
created: Nanos;
created: string;
/**
* The consumer configuration
*/
Expand Down Expand Up @@ -663,6 +675,11 @@ export interface ConsumerInfo {
* Indicates if any client is connected and receiving messages from a push consumer
*/
"push_bound": boolean;
/**
* The ISO timestamp when the ConsumerInfo was generated. This field is only available
* on servers 2.10.x or better
*/
"ts"?: string;
}

export interface ConsumerListResponse extends ApiResponse, ApiPaged {
Expand Down
6 changes: 6 additions & 0 deletions jetstream/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
throw new Error(`stream 'metadata' requires server ${min}`);
}
}
if (cfg.first_seq) {
const { min, ok } = nci.features.get(Feature.JS_STREAM_FIRST_SEQ);
if (!ok) {
throw new Error(`stream 'first_seq' requires server ${min}`);
}
}
validateStreamName(cfg.name);
cfg.mirror = convertStreamSourceDomain(cfg.mirror);
//@ts-ignore: the sources are either set or not - so no item should be undefined in the list
Expand Down
5 changes: 4 additions & 1 deletion jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ Deno.test("consumers - info", async () => {
const c = await js.consumers.get(stream, "b");
// retrieve the cached consumer - no messages
const cached = await c.info(false);
cached.ts = "";
assertEquals(cached.num_pending, 0);
assertEquals(await jsm.consumers.info(stream, "b"), cached);
const updated = await jsm.consumers.info(stream, "b");
updated.ts = "";
assertEquals(updated, cached);

// add a message, retrieve the cached one - still not updated
await js.publish(subj);
Expand Down
44 changes: 43 additions & 1 deletion jetstream/tests/streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/

import { NatsServer } from "../../tests/helpers/mod.ts";
import { NatsServer, notCompatible } from "../../tests/helpers/mod.ts";
import { AckPolicy, connect, JSONCodec } from "../../src/mod.ts";
import {
assertEquals,
Expand All @@ -27,6 +27,7 @@ import {
setup,
} from "../../tests/helpers/mod.ts";
import { initStream } from "./jstest_util.ts";
import { NatsConnectionImpl } from "../../nats-base-client/nats.ts";

Deno.test("streams - get", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
Expand Down Expand Up @@ -186,3 +187,44 @@ Deno.test("streams - delete message", async () => {

await cleanup(ns, nc);
});

Deno.test("streams - first_seq", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}

const jsm = await nc.jetstreamManager();
const si = await jsm.streams.add({
name: "test",
first_seq: 50,
subjects: ["foo"],
});
assertEquals(si.config.first_seq, 50);

const pa = await nc.jetstream().publish("foo");
assertEquals(pa.seq, 50);

await cleanup(ns, nc);
});

Deno.test("streams - first_seq fails if wrong server", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const nci = nc as NatsConnectionImpl;
nci.features.update("2.9.2");

const jsm = await nc.jetstreamManager();
await assertRejects(
async () => {
await jsm.streams.add({
name: "test",
first_seq: 50,
subjects: ["foo"],
});
},
Error,
"stream 'first_seq' requires server 2.10.0",
);

await cleanup(ns, nc);
});
2 changes: 2 additions & 0 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export enum Feature {
JS_SIMPLIFICATION = "js_simplification",
JS_STREAM_CONSUMER_METADATA = "js_stream_consumer_metadata",
JS_CONSUMER_FILTER_SUBJECTS = "js_consumer_filter_subjects",
JS_STREAM_FIRST_SEQ = "js_stream_first_seq",
}

type FeatureVersion = {
Expand Down Expand Up @@ -99,6 +100,7 @@ export class Features {
this.set(Feature.JS_SIMPLIFICATION, "2.9.4");
this.set(Feature.JS_STREAM_CONSUMER_METADATA, "2.10.0");
this.set(Feature.JS_CONSUMER_FILTER_SUBJECTS, "2.10.0");
this.set(Feature.JS_STREAM_FIRST_SEQ, "2.10.0");

this.disabled.forEach((f) => {
this.features.delete(f);
Expand Down
Loading