diff --git a/nats-base-client/kv.ts b/nats-base-client/kv.ts index a9d9e2fa..0e9f9dda 100644 --- a/nats-base-client/kv.ts +++ b/nats-base-client/kv.ts @@ -251,7 +251,7 @@ export class Bucket implements KV { await this.js.publish(this.subjectForKey(k), Empty, { headers: h }); } - async consumerOn(k: string, lastOnly = false): Promise { + consumerOn(k: string, lastOnly = false): Promise { const ji = this.js as JetStreamClientImpl; const nc = ji.nc; const inbox = createInbox(nc.options.inboxPrefix); @@ -264,24 +264,7 @@ export class Bucket implements KV { "filter_subject": this.subjectForKey(k), "flow_control": k === "*", }; - try { - const ci = await this.jsm.consumers.add(this.stream, opts); - return ci; - } catch (err) { - if ( - err.message === "invalid json" && - opts.deliver_policy === DeliverPolicy.LastPerSubject - ) { - // FIXME: this here while supported server becomes available - console.error( - `\u001B[33m KV feature running on a non-supported server \u001B[0m`, - ); - opts.deliver_policy = DeliverPolicy.Last; - return await this.jsm.consumers.add(this.stream, opts); - } else { - throw err; - } - } + return this.jsm.consumers.add(this.stream, opts); } async history(k: string): Promise> { diff --git a/tests/kv_test.ts b/tests/kv_test.ts index 70f4d59f..8ca4978e 100644 --- a/tests/kv_test.ts +++ b/tests/kv_test.ts @@ -12,11 +12,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +import { yellow } from "https://deno.land/std@0.95.0/fmt/colors.ts"; import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts"; import { deferred, Empty, + NatsConnection, NatsConnectionImpl, nuid, StringCodec, @@ -28,9 +29,14 @@ import { import { Bucket, Entry } from "../nats-base-client/kv.ts"; import { EncodedBucket } from "../nats-base-client/ekv.ts"; +import { NatsServer } from "./helpers/launcher.ts"; +import { compare, parseSemVer } from "./helpers/mod.ts"; Deno.test("kv - init creates stream", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc)) { + return; + } const jsm = await nc.jetstreamManager(); let streams = await jsm.streams.list().next(); assertEquals(streams.length, 0); @@ -49,6 +55,9 @@ Deno.test("kv - crud", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); + if (await notCompatible(ns, nc)) { + return; + } const sc = StringCodec(); const jsm = await nc.jetstreamManager(); let streams = await jsm.streams.list().next(); @@ -93,6 +102,9 @@ Deno.test("kv - history", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); + if (await notCompatible(ns, nc)) { + return; + } const n = nuid.next(); const bucket = await Bucket.create(nc, n, { history: 2 }); let status = await bucket.status(); @@ -113,7 +125,9 @@ Deno.test("kv - empty iterator ends", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - + if (await notCompatible(ns, nc)) { + return; + } const n = nuid.next(); const bucket = await Bucket.create(nc, n); const h = await bucket.history("k"); @@ -129,7 +143,9 @@ Deno.test("kv - key watch", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - + if (await notCompatible(ns, nc)) { + return; + } const n = nuid.next(); const bucket = await Bucket.create(nc, n); const iter = await bucket.watch({ key: "k" }); @@ -159,6 +175,9 @@ Deno.test("kv - bucket watch", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); + if (await notCompatible(ns, nc)) { + return; + } const sc = StringCodec(); const m: Map = new Map(); const n = nuid.next(); @@ -199,6 +218,9 @@ Deno.test("kv - keys", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); + if (await notCompatible(ns, nc)) { + return; + } const sc = StringCodec(); const bucket = await Bucket.create(nc, nuid.next()); @@ -224,6 +246,9 @@ Deno.test("encoded kv - crud", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); + if (await notCompatible(ns, nc)) { + return; + } const sc = StringCodec(); const jsm = await nc.jetstreamManager(); let streams = await jsm.streams.list().next(); @@ -272,6 +297,9 @@ Deno.test("kv - not found", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); + if (await notCompatible(ns, nc)) { + return; + } const b = await Bucket.create(nc, nuid.next()) as Bucket; assertEquals(await b.get("x"), null); @@ -282,3 +310,19 @@ Deno.test("kv - not found", async () => { await cleanup(ns, nc); }); + +async function notCompatible( + ns: NatsServer, + nc: NatsConnection, +): Promise { + const varz = await ns.varz() as unknown as Record; + const sv = parseSemVer(varz.version); + if (compare(sv, parseSemVer("2.3.3")) < 0) { + console.error( + yellow("skipping KV test as server doesn't support it"), + ); + await cleanup(ns, nc); + return true; + } + return false; +}