Skip to content

Commit

Permalink
disable KV tests on older servers
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jul 29, 2021
1 parent 5921009 commit 4a936e1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 22 deletions.
21 changes: 2 additions & 19 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ export class Bucket implements KV {
await this.js.publish(this.subjectForKey(k), Empty, { headers: h });
}

async consumerOn(k: string, lastOnly = false): Promise<ConsumerInfo> {
consumerOn(k: string, lastOnly = false): Promise<ConsumerInfo> {
const ji = this.js as JetStreamClientImpl;
const nc = ji.nc;
const inbox = createInbox(nc.options.inboxPrefix);
Expand All @@ -264,24 +264,7 @@ export class Bucket implements KV {
"filter_subject": this.subjectForKey(k),
"flow_control": k === "*",
};
try {
const ci = await this.jsm.consumers.add(this.stream, opts);
return ci;
} catch (err) {
if (
err.message === "invalid json" &&
opts.deliver_policy === DeliverPolicy.LastPerSubject
) {
// FIXME: this here while supported server becomes available
console.error(
`\u001B[33m KV feature running on a non-supported server \u001B[0m`,
);
opts.deliver_policy = DeliverPolicy.Last;
return await this.jsm.consumers.add(this.stream, opts);
} else {
throw err;
}
}
return this.jsm.consumers.add(this.stream, opts);
}

async history(k: string): Promise<QueuedIterator<Entry>> {
Expand Down
50 changes: 47 additions & 3 deletions tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { yellow } from "https://deno.land/std@0.95.0/fmt/colors.ts";
import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts";
import {
deferred,
Empty,
NatsConnection,
NatsConnectionImpl,
nuid,
StringCodec,
Expand All @@ -28,9 +29,14 @@ import {

import { Bucket, Entry } from "../nats-base-client/kv.ts";
import { EncodedBucket } from "../nats-base-client/ekv.ts";
import { NatsServer } from "./helpers/launcher.ts";
import { compare, parseSemVer } from "./helpers/mod.ts";

Deno.test("kv - init creates stream", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc)) {
return;
}
const jsm = await nc.jetstreamManager();
let streams = await jsm.streams.list().next();
assertEquals(streams.length, 0);
Expand All @@ -49,6 +55,9 @@ Deno.test("kv - crud", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const sc = StringCodec();
const jsm = await nc.jetstreamManager();
let streams = await jsm.streams.list().next();
Expand Down Expand Up @@ -93,6 +102,9 @@ Deno.test("kv - history", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const n = nuid.next();
const bucket = await Bucket.create(nc, n, { history: 2 });
let status = await bucket.status();
Expand All @@ -113,7 +125,9 @@ Deno.test("kv - empty iterator ends", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

if (await notCompatible(ns, nc)) {
return;
}
const n = nuid.next();
const bucket = await Bucket.create(nc, n);
const h = await bucket.history("k");
Expand All @@ -129,7 +143,9 @@ Deno.test("kv - key watch", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);

if (await notCompatible(ns, nc)) {
return;
}
const n = nuid.next();
const bucket = await Bucket.create(nc, n);
const iter = await bucket.watch({ key: "k" });
Expand Down Expand Up @@ -159,6 +175,9 @@ Deno.test("kv - bucket watch", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const sc = StringCodec();
const m: Map<string, string> = new Map();
const n = nuid.next();
Expand Down Expand Up @@ -199,6 +218,9 @@ Deno.test("kv - keys", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const sc = StringCodec();
const bucket = await Bucket.create(nc, nuid.next());

Expand All @@ -224,6 +246,9 @@ Deno.test("encoded kv - crud", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}
const sc = StringCodec();
const jsm = await nc.jetstreamManager();
let streams = await jsm.streams.list().next();
Expand Down Expand Up @@ -272,6 +297,9 @@ Deno.test("kv - not found", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc)) {
return;
}

const b = await Bucket.create(nc, nuid.next()) as Bucket;
assertEquals(await b.get("x"), null);
Expand All @@ -282,3 +310,19 @@ Deno.test("kv - not found", async () => {

await cleanup(ns, nc);
});

async function notCompatible(
ns: NatsServer,
nc: NatsConnection,
): Promise<boolean> {
const varz = await ns.varz() as unknown as Record<string, string>;
const sv = parseSemVer(varz.version);
if (compare(sv, parseSemVer("2.3.3")) < 0) {
console.error(
yellow("skipping KV test as server doesn't support it"),
);
await cleanup(ns, nc);
return true;
}
return false;
}

0 comments on commit 4a936e1

Please sign in to comment.