Skip to content

Commit

Permalink
[FEAT] direct batch
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Feb 29, 2024
1 parent faf2c3e commit 7d27088
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 1 deletion.
8 changes: 8 additions & 0 deletions jetstream/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,14 @@ export type DirectMsgRequest =
| LastForMsgRequest
| NextMsgRequest;

export type DirectBatchOptions = {
batch?: number;
max_bytes?: number;
multi_last: string[];
up_to_seq?: number;
up_to_time?: Date | string;
};

export interface StreamState {
/**
* Number of messages stored in the Stream
Expand Down
70 changes: 70 additions & 0 deletions jetstream/jsm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import {
Msg,
MsgHdrs,
NatsConnection,
QueuedIterator,
RequestStrategy,
ReviverFn,
} from "../nats-base-client/core.ts";
import {
AccountInfoResponse,
ApiResponse,
DirectBatchOptions,
DirectMsgRequest,
JetStreamAccountStats,
LastForMsgRequest,
Expand Down Expand Up @@ -84,6 +87,73 @@ export class DirectStreamAPIImpl extends BaseApiClient
const dm = new DirectMsgImpl(r);
return Promise.resolve(dm);
}

async getBatch(
stream: string,
opts: DirectBatchOptions,
): Promise<QueuedIterator<StoredMsg>> {
validateStreamName(stream);
const pre = this.opts.apiPrefix || "$JS.API";
const subj = `${pre}.DIRECT.GET.${stream}`;
opts = opts || {};
if (!Array.isArray(opts.multi_last) || opts.multi_last.length === 0) {
return Promise.reject("multi_last is required");
}
const payload = JSON.stringify(opts, (key, value) => {
if (key === "up_to_time" && value instanceof Date) {
return value.toISOString();
}
return value;
});

const iter = new QueuedIteratorImpl<StoredMsg>();

const raw = await this.nc.requestMany(
subj,
payload,
{
strategy: RequestStrategy.SentinelMsg,
},
);

(async () => {
let gotFirst = false;
let badServer = false;
let badRequest: string | undefined;
for await (const m of raw) {
if (!gotFirst) {
gotFirst = true;
const code = m.headers?.code || 0;
if (code !== 0 && code < 200 || code > 299) {
badRequest = m.headers?.description.toLowerCase();
break;
}
// inspect the message and make sure that we have a supported server
const v = m.headers?.get("Nats-Num-Pending");
if (v === "") {
badServer = true;
break;
}
}
if (m.data.length === 0) {
break;
}
iter.push(new DirectMsgImpl(m));
}
//@ts-ignore: term function
iter.push((): void => {
if (badServer) {
throw new Error("batch direct get not supported by the server");
}
if (badRequest) {
throw new Error(`bad request: ${badRequest}`);
}
iter.stop();
});
})();

return Promise.resolve(iter);
}
}

export class DirectMsgImpl implements DirectMsg {
Expand Down
71 changes: 70 additions & 1 deletion jetstream/tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
fail,
} from "https://deno.land/std@0.200.0/assert/mod.ts";

import { NatsConnectionImpl } from "../../nats-base-client/internal_mod.ts";
import { NatsConnectionImpl } from "../../nats-base-client/nats.ts";
import {
deferred,
Empty,
Expand Down Expand Up @@ -2731,3 +2731,72 @@ Deno.test("jsm - api check ok", async () => {
assertEquals(count, 3);
await cleanup(ns, nc);
});

Deno.test("jsm - batch direct get multi_last", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await Promise.all([
js.publish("a.foo", "foo"),
js.publish("a.bar", "bar"),
js.publish("a.baz", "baz"),
]);

const iter = await jsm.direct.getBatch("A", {
multi_last: ["a.foo", "a.baz"],
});

const keys = [];
for await (const m of iter) {
keys.push(m.subject);
}
assertEquals(keys.length, 2);
assertArrayIncludes(keys, ["a.foo", "a.baz"]);

await cleanup(ns, nc);
});

Deno.test("jsm - batch direct get batch", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await Promise.all([
js.publish("a.foo", "foo"),
js.publish("a.bar", "bar"),
js.publish("a.baz", "baz"),
js.publish("a.foobar", "foobar"),
]);

const iter = await jsm.direct.getBatch("A", {
batch: 3,
multi_last: [">"],
});

const buf = [];
for await (const m of iter) {
buf.push(m);
}
assertEquals(buf.length, 3);

await cleanup(ns, nc);
});
11 changes: 11 additions & 0 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
ConsumerInfo,
defaultConsumer,
DeliverPolicy,
DirectBatchOptions,
DirectMsgRequest,
JetStreamAccountStats,
MsgRequest,
Expand Down Expand Up @@ -764,6 +765,16 @@ export interface DirectStreamAPI {
* @param query
*/
getMessage(stream: string, query: DirectMsgRequest): Promise<StoredMsg>;

/**
* Retrieves all last subject messages for the specified subjects
* @param stream
* @param opts
*/
getBatch(
stream: string,
opts: DirectBatchOptions,
): Promise<QueuedIterator<StoredMsg>>;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export enum Feature {
JS_STREAM_SOURCE_SUBJECT_TRANSFORM = "js_stream_source_subject_transform",
JS_STREAM_COMPRESSION = "js_stream_compression",
JS_DEFAULT_CONSUMER_LIMITS = "js_default_consumer_limits",
JS_BATCH_DIRECT_GET = "js_batch_direct_get",
}

type FeatureVersion = {
Expand Down Expand Up @@ -109,6 +110,7 @@ export class Features {
this.set(Feature.JS_STREAM_SOURCE_SUBJECT_TRANSFORM, "2.10.0");
this.set(Feature.JS_STREAM_COMPRESSION, "2.10.0");
this.set(Feature.JS_DEFAULT_CONSUMER_LIMITS, "2.10.0");
this.set(Feature.JS_BATCH_DIRECT_GET, "2.11.0");

this.disabled.forEach((f) => {
this.features.delete(f);
Expand Down

0 comments on commit 7d27088

Please sign in to comment.