diff --git a/jetstream/jsclient.ts b/jetstream/jsclient.ts index b4d614a8..45c8c44d 100644 --- a/jetstream/jsclient.ts +++ b/jetstream/jsclient.ts @@ -118,7 +118,7 @@ class ViewsImpl implements Views { ); } if (opts.bindOnly) { - return Bucket.bind(this.js, name); + return Bucket.bind(this.js, name, opts); } return Bucket.create(this.js, name, opts); diff --git a/jetstream/kv.ts b/jetstream/kv.ts index d7b62f99..158ba08b 100644 --- a/jetstream/kv.ts +++ b/jetstream/kv.ts @@ -207,13 +207,19 @@ export class Bucket implements KV, KvRemove { static async bind( js: JetStreamClient, name: string, - opts: Partial<{ codec: KvCodecs }> = {}, + opts: Partial = {}, ): Promise { const jsm = await js.jetstreamManager(); - const info = await jsm.streams.info(`${kvPrefix}${name}`); - validateBucket(info.config.name); + const info = { + config: { + allow_direct: opts.allow_direct, + }, + } as StreamInfo; + validateBucket(name); const bucket = new Bucket(name, js, jsm); + info.config.name = opts.streamName ?? bucket.bucketName(); Object.assign(bucket, info); + bucket.stream = info.config.name; bucket.codec = opts.codec || NoopKvCodecs(); bucket.direct = info.config.allow_direct ?? false; bucket.initializePrefixes(info); diff --git a/jetstream/tests/kv_test.ts b/jetstream/tests/kv_test.ts index 2671a432..0f441d9c 100644 --- a/jetstream/tests/kv_test.ts +++ b/jetstream/tests/kv_test.ts @@ -191,13 +191,6 @@ Deno.test("kv - bind to existing KV", async () => { const status = await kv.status(); assertEquals(status.bucket, `${n}`); await crud(kv); - await assertRejects( - async () => { - await js.views.kv("does_not_exist", { bindOnly: true }); - }, - NatsError, - "stream not found", - ); await cleanup(ns, nc); }); @@ -567,8 +560,7 @@ Deno.test("kv - ttl", async () => { const e = await b.get("x"); assert(e); assertEquals(sc.decode(e.value), "hello"); - - await delay(1500); + await delay(2000); assertEquals(await b.get("x"), null); await cleanup(ns, nc); @@ -2014,3 +2006,33 @@ Deno.test("kv - purge key if revision", async () => { await b.purge("a", { previousSeq: seq }); await cleanup(ns, nc); }); + +Deno.test("kv - bind no info", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + await js.views.kv("A"); + + const d = deferred(); + nc.subscribe("$JS.API.STREAM.INFO.>", { + callback: (_err, msg) => { + d.reject(new Error("saw stream info")); + }, + }); + + const kv = await js.views.kv("A", { bindOnly: true, allow_direct: true }); + await kv.put("a", "hello"); + const e = await kv.get("a"); + assertEquals(e?.string(), "hello"); + await kv.delete("a"); + + d.resolve(); + // shouldn't have rejected earlier + await d; + + await cleanup(ns, nc); +});