Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGE] by default kv enables allow_direct unless client opts-out. #361

Merged
merged 1 commit into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import { millis, nanos } from "./jsutil.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { headers, MsgHdrs } from "./headers.ts";
import { consumerOpts, deferred, ErrorCode } from "./mod.ts";
import { compare, parseSemVer } from "./semver.ts";
import { compare, Feature, parseSemVer } from "./semver.ts";
import { JetStreamManagerImpl } from "./jsm.ts";

export function Base64KeyCodec(): KvCodec<string> {
Expand Down Expand Up @@ -248,8 +248,27 @@ export class Bucket implements KV, KvRemove {
const discardNew = have ? compare(have, parseSemVer("2.7.2")) >= 0 : false;
sc.discard = discardNew ? DiscardPolicy.New : DiscardPolicy.Old;

const direct = have ? compare(have, parseSemVer("2.8.5")) >= 0 : false;
sc.allow_direct = opts.allow_direct ? direct : false;
const { ok: direct, min } = nci.protocol.features.get(
Feature.JS_ALLOW_DIRECT,
);
if (!direct && opts.allow_direct === true) {
const v = have
? `${have!.major}.${have!.minor}.${have!.micro}`
: "unknown";
return Promise.reject(
new Error(
`allow_direct is not available on server version ${v} - requires ${min}`,
),
);
}
// if we are given allow_direct we use it, otherwise what
// the server supports - in creation this will always rule,
// but allows the client to opt-in even if it is already
// available on the stream
opts.allow_direct = typeof opts.allow_direct === "boolean"
? opts.allow_direct
: direct;
sc.allow_direct = opts.allow_direct;
this.direct = sc.allow_direct;

sc.num_replicas = bo.replicas;
Expand All @@ -259,7 +278,10 @@ export class Bucket implements KV, KvRemove {
sc.allow_rollup_hdrs = true;

try {
await this.jsm.streams.info(sc.name);
const info = await this.jsm.streams.info(sc.name);
if (!info.config.allow_direct && this.direct === true) {
this.direct = false;
}
} catch (err) {
if (err.message === "stream not found") {
await this.jsm.streams.add(sc);
Expand Down
2 changes: 2 additions & 0 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export function compare(a: SemVer, b: SemVer): number {
export enum Feature {
JS_PULL_MAX_BYTES = "js_pull_max_bytes",
JS_NEW_CONSUMER_CREATE_API = "js_new_consumer_create",
JS_ALLOW_DIRECT = "js_allow_direct",
}

type FeatureVersion = {
Expand All @@ -41,6 +42,7 @@ export class Features {

this.set(Feature.JS_PULL_MAX_BYTES, "2.8.3");
this.set(Feature.JS_NEW_CONSUMER_CREATE_API, "2.9.0");
this.set(Feature.JS_ALLOW_DIRECT, "2.9.0");
}

set(f: Feature, requires: string) {
Expand Down
63 changes: 62 additions & 1 deletion tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
delay,
DiscardPolicy,
Empty,
KvOptions,
nanos,
NatsConnectionImpl,
NatsError,
Expand Down Expand Up @@ -57,6 +58,7 @@ import { Lock, NatsServer, notCompatible } from "./helpers/mod.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
import { JetStreamSubscriptionInfoable } from "../nats-base-client/jsclient.ts";
import { connect } from "../src/mod.ts";
import { Features } from "../nats-base-client/semver.ts";

Deno.test("kv - key validation", () => {
const bad = [
Expand Down Expand Up @@ -1359,12 +1361,71 @@ Deno.test("kv - replicas", async () => {
await NatsServer.stopAll(servers);
});

Deno.test("kv - allow direct", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

if (await notCompatible(ns, nc, "2.9.0")) {
return;
}
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();

async function test(
name: string,
opts: Partial<KvOptions>,
direct: boolean,
): Promise<void> {
const kv = await js.views.kv(name, opts) as Bucket;
assertEquals(kv.direct, direct);
const si = await jsm.streams.info(kv.bucketName());
assertEquals(si.config.allow_direct, direct);
}

// default is not specified but allowed by the server
await test(nuid.next(), { history: 1 }, true);
// user opted to no direct
await test(nuid.next(), { history: 1, allow_direct: false }, false);
// user opted for direct
await test(nuid.next(), { history: 1, allow_direct: true }, true);

// now we create a kv that enables it
const xkv = await js.views.kv("X") as Bucket;
assertEquals(xkv.direct, true);

// but the client opts-out of the direct
const xc = await js.views.kv("X", { allow_direct: false }) as Bucket;
assertEquals(xc.direct, false);

// now the creator disables it, but the client wants it
const ykv = await js.views.kv("Y", { allow_direct: false }) as Bucket;
assertEquals(ykv.direct, false);
const yc = await js.views.kv("Y", { allow_direct: true }) as Bucket;
assertEquals(yc.direct, false);

// now let's pretend we got a server that doesn't support it
const nci = nc as NatsConnectionImpl;
nci.protocol.features = new Features({ major: 2, minor: 8, micro: 0 });
nci.info!.version = "2.8.0";

await assertRejects(
async () => {
await test(nuid.next(), { history: 1, allow_direct: true }, false);
},
Error,
"allow_direct is not available on server version",
);

await cleanup(ns, nc);
});

Deno.test("kv - direct message", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

if (await notCompatible(ns, nc, "2.8.5")) {
if (await notCompatible(ns, nc, "2.9.0")) {
return;
}

Expand Down