Skip to content

Commit

Permalink
[FEAT] added mechanism where a Feature can be disabled by the user.…
Browse files Browse the repository at this point in the history
… This requires casting the connection to a `NatsConnectionImpl`, and then accessing the `Features` controller as `nc.features.disable(Feature)`. This is an experimental feature, currently supporting a very small number of options
  • Loading branch information
aricart committed Sep 16, 2022
1 parent 6164d01 commit 676eb9f
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 27 deletions.
4 changes: 2 additions & 2 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export class JetStreamClientImpl extends BaseApiClient
const args: Partial<PullOptions> = {};
args.batch = opts.batch || 1;
if (max_bytes) {
const fv = this.nc.protocol.features.get(Feature.JS_PULL_MAX_BYTES);
const fv = this.nc.features.get(Feature.JS_PULL_MAX_BYTES);
if (!fv.ok) {
throw new Error(
`max_bytes is only supported on servers ${fv.min} or better`,
Expand Down Expand Up @@ -827,7 +827,7 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
args.batch = opts.batch || 1;
args.no_wait = opts.no_wait || false;
if ((opts.max_bytes ?? 0) > 0) {
const fv = this.js.nc.protocol.features.get(Feature.JS_PULL_MAX_BYTES);
const fv = this.js.nc.features.get(Feature.JS_PULL_MAX_BYTES);
if (!fv.ok) {
throw new Error(
`max_bytes is only supported on servers ${fv.min} or better`,
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/jsmconsumer_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
}

const nci = this.nc as NatsConnectionImpl;
const { min, ok: newAPI } = nci.protocol.features.get(
const { min, ok: newAPI } = nci.features.get(
Feature.JS_NEW_CONSUMER_CREATE_API,
);

Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export class Bucket implements KV, KvRemove {
const discardNew = have ? compare(have, parseSemVer("2.7.2")) >= 0 : false;
sc.discard = discardNew ? DiscardPolicy.New : DiscardPolicy.Old;

const { ok: direct, min } = nci.protocol.features.get(
const { ok: direct, min } = nci.features.get(
Feature.JS_ALLOW_DIRECT,
);
if (!direct && opts.allow_direct === true) {
Expand Down
6 changes: 5 additions & 1 deletion nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
} from "./types.ts";

import type { SemVer } from "./semver.ts";
import { parseSemVer } from "./semver.ts";
import { Features, parseSemVer } from "./semver.ts";

import { parseOptions } from "./options.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
Expand Down Expand Up @@ -502,4 +502,8 @@ export class NatsConnectionImpl implements NatsConnection {
await this.flush();
return Date.now() - start;
}

get features(): Features {
return this.protocol.features;
}
}
2 changes: 1 addition & 1 deletion nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
? undefined
: this.servers.update(info);
if (!this.infoReceived) {
this.features = new Features(parseSemVer(info.version));
this.features.update(parseSemVer(info.version));
this.infoReceived = true;
if (this.transport.isEncrypted()) {
this.servers.updateTLSName();
Expand Down
57 changes: 54 additions & 3 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,83 @@ type FeatureVersion = {
};

export class Features {
server: SemVer;
server!: SemVer;
features: Map<Feature, FeatureVersion>;
disabled: Feature[];
constructor(v: SemVer) {
this.features = new Map<Feature, FeatureVersion>();
this.server = v;
this.disabled = [];
this.update(v);
}

/**
* Removes all disabled entries
*/
resetDisabled() {
this.disabled.length = 0;
this.update(this.server);
}

/**
* Disables a particular feature.
* @param f
*/
disable(f: Feature) {
this.disabled.push(f);
this.update(this.server);
}

isDisabled(f: Feature) {
return this.disabled.indexOf(f) !== -1;
}

update(v: SemVer | string) {
if (typeof v === "string") {
v = parseSemVer(v);
}
this.server = v;
this.set(Feature.JS_PULL_MAX_BYTES, "2.8.3");
this.set(Feature.JS_NEW_CONSUMER_CREATE_API, "2.9.0");
this.set(Feature.JS_ALLOW_DIRECT, "2.9.0");

this.disabled.forEach((f) => {
this.features.delete(f);
});
}

/**
* Register a feature that requires a particular server version.
* @param f
* @param requires
*/
set(f: Feature, requires: string) {
this.features.set(f, {
min: requires,
ok: compare(this.server, parseSemVer(requires)) >= 0,
});
}

/**
* Returns whether the feature is available and the min server
* version that supports it.
* @param f
*/
get(f: Feature): FeatureVersion {
return this.features.get(f) || { min: "unknown", ok: false };
}

/**
* Returns true if the feature is supported
* @param f
*/
supports(f: Feature): boolean {
return this.get(f).ok;
return this.get(f)?.ok || false;
}

/**
* Returns true if the server is at least the specified version
* @param v
*/
require(v: SemVer | string): boolean {
if (typeof v === "string") {
v = parseSemVer(v);
Expand Down
4 changes: 2 additions & 2 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1170,8 +1170,8 @@ Deno.test("basics - server version", async () => {
assertEquals(nci.protocol.features.require("3.0.0"), false);
assertEquals(nci.protocol.features.require("2.8.2"), true);

const ok = nci.protocol.features.require("2.8.3");
const bytes = nci.protocol.features.get(Feature.JS_PULL_MAX_BYTES);
const ok = nci.features.require("2.8.3");
const bytes = nci.features.get(Feature.JS_PULL_MAX_BYTES);
assertEquals(ok, bytes.ok);
assertEquals(bytes.min, "2.8.3");
assertEquals(ok, nci.protocol.features.supports(Feature.JS_PULL_MAX_BYTES));
Expand Down
3 changes: 1 addition & 2 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import {
isHeartbeatMsg,
Js409Errors,
} from "../nats-base-client/jsutil.ts";
import { Features } from "../nats-base-client/semver.ts";
import { assertIsError } from "https://deno.land/std@0.138.0/testing/asserts.ts";

function callbackConsume(debug = false): JsMsgCallback {
Expand Down Expand Up @@ -3315,7 +3314,7 @@ Deno.test("jetstream - pull consumer max_bytes rejected on old servers", async (

// change the version of the server to fail pull with max bytes
const nci = nc as NatsConnectionImpl;
nci.protocol.features = new Features({ major: 2, minor: 7, micro: 0 });
nci.features.update("2.7.0");

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
Expand Down
42 changes: 32 additions & 10 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
JSONCodec,
jwtAuthenticator,
nanos,
NatsConnection,
NatsConnectionImpl,
NatsError,
nkeys,
Expand Down Expand Up @@ -65,7 +66,7 @@ import {
import { StreamUpdateConfig } from "../nats-base-client/types.ts";
import { JetStreamManagerImpl } from "../nats-base-client/jsm.ts";
import { assertExists } from "https://deno.land/std@0.75.0/testing/asserts.ts";
import { Features } from "../nats-base-client/semver.ts";
import { Feature } from "../nats-base-client/semver.ts";

const StreamNameRequired = "stream name required";
const ConsumerNameRequired = "durable name required";
Expand Down Expand Up @@ -1307,15 +1308,7 @@ Deno.test("jsm - consumer name", async () => {
await cleanup(ns, nc);
});

Deno.test("jsm - consumer name apis are not used on old servers", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

// change the version of the server to force legacy apis
const nci = nc as NatsConnectionImpl;
nci.protocol.features = new Features({ major: 2, minor: 7, micro: 0 });

async function testConsumerNameAPI(nc: NatsConnection) {
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "A",
Expand Down Expand Up @@ -1374,6 +1367,35 @@ Deno.test("jsm - consumer name apis are not used on old servers", async () => {
ack_policy: AckPolicy.Explicit,
durable_name: "b",
}, "$JS.API.CONSUMER.DURABLE.CREATE.A.b");
}

Deno.test("jsm - consumer name apis are not used on old servers", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.7.0")) {
return;
}

// change the version of the server to force legacy apis
const nci = nc as NatsConnectionImpl;
nci.features.update("2.7.0");
await testConsumerNameAPI(nc);

await cleanup(ns, nc);
});

Deno.test("jsm - consumer name apis are not used when disabled", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.9.0")) {
return;
}

const nci = nc as NatsConnectionImpl;
nci.features.disable(Feature.JS_NEW_CONSUMER_CREATE_API);
await testConsumerNameAPI(nc);

await cleanup(ns, nc);
});
Expand Down
3 changes: 1 addition & 2 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import { Lock, NatsServer, notCompatible } from "./helpers/mod.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
import { JetStreamSubscriptionInfoable } from "../nats-base-client/jsclient.ts";
import { connect } from "../src/mod.ts";
import { Features } from "../nats-base-client/semver.ts";

Deno.test("kv - key validation", () => {
const bad = [
Expand Down Expand Up @@ -1406,7 +1405,7 @@ Deno.test("kv - allow direct", async () => {

// now let's pretend we got a server that doesn't support it
const nci = nc as NatsConnectionImpl;
nci.protocol.features = new Features({ major: 2, minor: 8, micro: 0 });
nci.features.update("2.8.0");
nci.info!.version = "2.8.0";

await assertRejects(
Expand Down
42 changes: 40 additions & 2 deletions tests/semver_test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { compare, parseSemVer } from "../nats-base-client/semver.ts";
import { assertEquals } from "https://deno.land/std@0.152.0/testing/asserts.ts";
import {
compare,
Feature,
Features,
parseSemVer,
} from "../nats-base-client/semver.ts";
import {
assert,
assertEquals,
assertFalse,
} from "https://deno.land/std@0.152.0/testing/asserts.ts";

Deno.test("semver", () => {
const pt: { a: string; b: string; r: number }[] = [
Expand All @@ -20,3 +29,32 @@ Deno.test("semver", () => {
);
});
});

Deno.test("semver - feature basics", () => {
// sanity version check
const f = new Features(parseSemVer("4.0.0"));
assert(f.require("4.0.0"));
assertFalse(f.require("5.0.0"));

// change the version to 2.8.3
f.update(parseSemVer("2.8.3"));
let info = f.get(Feature.JS_PULL_MAX_BYTES);
assertEquals(info.ok, true);
assertEquals(info.min, "2.8.3");
assert(f.supports(Feature.JS_PULL_MAX_BYTES));
assertFalse(f.isDisabled(Feature.JS_PULL_MAX_BYTES));
// disable the feature
f.disable(Feature.JS_PULL_MAX_BYTES);
assert(f.isDisabled(Feature.JS_PULL_MAX_BYTES));
assertFalse(f.supports(Feature.JS_PULL_MAX_BYTES));
info = f.get(Feature.JS_PULL_MAX_BYTES);
assertEquals(info.ok, false);
assertEquals(info.min, "unknown");

// remove all disablements
f.resetDisabled();
assert(f.supports(Feature.JS_PULL_MAX_BYTES));

f.update(parseSemVer("2.8.2"));
assertFalse(f.supports(Feature.JS_PULL_MAX_BYTES));
});

0 comments on commit 676eb9f

Please sign in to comment.