From 08637104a913e539115dac6d69200c3b3bc01aad Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 2 May 2023 15:38:10 -0500 Subject: [PATCH] [CHANGE] remove Result from simplification apis (#503) --- Makefile | 2 +- nats-base-client/consumer.ts | 22 ++- nats-base-client/consumermessages.ts | 54 +++---- nats-base-client/internal_mod.ts | 1 - nats-base-client/mod.ts | 1 - nats-base-client/types.ts | 36 +++-- tests/consumers_test.ts | 84 ++++------- tests/consumersordered_test.ts | 204 +++++++++------------------ 8 files changed, 148 insertions(+), 256 deletions(-) diff --git a/Makefile b/Makefile index 1f06630e..9d4c4536 100644 --- a/Makefile +++ b/Makefile @@ -23,4 +23,4 @@ bundle: deno bundle --log-level info --unstable src/mod.ts ./nats.js fmt: - deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ jetstream.md README.md services.md + deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ debug/ jetstream.md README.md services.md diff --git a/nats-base-client/consumer.ts b/nats-base-client/consumer.ts index 0aaecf95..04dc9e3e 100644 --- a/nats-base-client/consumer.ts +++ b/nats-base-client/consumer.ts @@ -27,7 +27,6 @@ import { FetchOptions, JsMsg, ReplayPolicy, - Result, } from "./types.ts"; import { timeout } from "./util.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; @@ -228,23 +227,22 @@ export class OrderedPullConsumerImpl implements Consumer { internalHandler(serial: number) { // this handler will be noop if the consumer's serial changes - return (r: Result): void => { + return (m: JsMsg): void => { if (this.serial !== serial) { return; } - if (!r.isError) { - const dseq = r.value.info.deliverySequence; - if (dseq !== this.cursor.deliver_seq + 1) { - this.reset(this.opts); - return; - } - this.cursor.deliver_seq = dseq; - this.cursor.stream_seq = r.value.info.streamSequence; + const dseq = m.info.deliverySequence; + if (dseq !== this.cursor.deliver_seq + 1) { + this.reset(this.opts); + return; } + this.cursor.deliver_seq = dseq; + this.cursor.stream_seq = m.info.streamSequence; + if (this.userCallback) { - this.userCallback(r); + this.userCallback(m); } else { - this.iter?.push(r); + this.iter?.push(m); } }; } diff --git a/nats-base-client/consumermessages.ts b/nats-base-client/consumermessages.ts index 3c27c634..e2041da4 100644 --- a/nats-base-client/consumermessages.ts +++ b/nats-base-client/consumermessages.ts @@ -20,7 +20,6 @@ import type { ConsumerStatus, FetchOptions, PullOptions, - Result, Subscription, } from "./types.ts"; import { @@ -41,7 +40,7 @@ import { MsgImpl } from "./msg.ts"; import { Timeout } from "./util.ts"; import { toJsMsg } from "./jsmsg.ts"; -export class PullConsumerMessagesImpl extends QueuedIteratorImpl> +export class PullConsumerMessagesImpl extends QueuedIteratorImpl implements ConsumerMessages { consumer: PullConsumerImpl; opts: Record; @@ -66,14 +65,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> super(); this.consumer = c; - const args = this.parseOptions(opts, refilling); - if (args.isError) { - throw args.error; - } + this.opts = this.parseOptions(opts, refilling); this.callback = (opts as ConsumeCallback).callback || null; this.noIterator = typeof this.callback === "function"; - - this.opts = args.value!; this.monitor = null; this.pong = null; this.pending = { msgs: 0, bytes: 0, requests: 0 }; @@ -150,9 +144,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> // we got a bad request - no progress here if (code === 400) { const error = toErr(); - this._push({ - isError: true, - error, + //@ts-ignore: fn + this._push(() => { + this.stop(error); }); } else if (code === 409 && description === "consumer deleted") { const error = toErr(); @@ -166,7 +160,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> } } else { // push the user message - this._push({ isError: false, value: toJsMsg(msg) }); + this._push(toJsMsg(msg)); this.received++; if (this.pending.msgs) { this.pending.msgs--; @@ -249,7 +243,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> this.pull(this.pullOptions()); } - _push(r: Result) { + _push(r: JsMsg) { if (!this.callback) { super.push(r); } else { @@ -261,18 +255,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> fn(); } } catch (err) { - this.callback({ isError: true, error: err }); + this.stop(err); } } } notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) { if (this.listeners.length > 0) { - this.listeners.forEach((l) => { - if (!(l as QueuedIteratorImpl).done) { - l.push({ type, data }); - } - }); + (() => { + this.listeners.forEach((l) => { + if (!(l as QueuedIteratorImpl).done) { + l.push({ type, data }); + } + }); + })(); } } @@ -381,18 +377,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> parseOptions( opts: PullConsumerOptions, refilling = false, - ): Result> { + ): Record { const args = (opts || {}) as Record; args.max_messages = args.max_messages || 0; args.max_bytes = args.max_bytes || 0; if (args.max_messages !== 0 && args.max_bytes !== 0) { - return { - isError: true, - error: new Error( - `only specify one of max_messages or max_bytes`, - ), - }; + throw new Error( + `only specify one of max_messages or max_bytes`, + ); } // we must have at least one limit - default to 100 msgs @@ -410,10 +403,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> args.expires = args.expires || 30_000; if (args.expires < 1000) { - return { - isError: true, - error: new Error("expires should be at least 1000ms"), - }; + throw new Error("expires should be at least 1000ms"); } // require idle_heartbeat @@ -430,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> args.threshold_bytes = args.threshold_bytes || minBytes; } - return { value: args, isError: false }; + return args; } status(): Promise> { @@ -440,7 +430,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl> } } -export class OrderedConsumerMessages extends QueuedIteratorImpl> +export class OrderedConsumerMessages extends QueuedIteratorImpl implements ConsumerMessages { src!: PullConsumerMessagesImpl; diff --git a/nats-base-client/internal_mod.ts b/nats-base-client/internal_mod.ts index 11a6da56..563e636f 100644 --- a/nats-base-client/internal_mod.ts +++ b/nats-base-client/internal_mod.ts @@ -76,7 +76,6 @@ export type { Republish, RequestManyOptions, RequestOptions, - Result, RoKV, SeqMsgRequest, SequenceInfo, diff --git a/nats-base-client/mod.ts b/nats-base-client/mod.ts index 6910842c..d5ec9e25 100644 --- a/nats-base-client/mod.ts +++ b/nats-base-client/mod.ts @@ -139,7 +139,6 @@ export type { RepublishHeaders, RequestManyOptions, RequestOptions, - Result, RoKV, SchemaInfo, SeqMsgRequest, diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 9b8c64f7..d3a19f33 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -2642,6 +2642,10 @@ export interface ConsumerUpdateConfig { metadata?: Record; } +export type Ordered = { + ordered: true; +}; + export type ConsumeBytes = & MaxBytes & Partial @@ -2657,6 +2661,8 @@ export type ConsumeMessages = & IdleHeartbeat & ConsumeCallback; +export type ConsumeOptions = ConsumeBytes | ConsumeMessages; + /** * Options for fetching */ @@ -2675,7 +2681,6 @@ export type FetchMessages = & IdleHeartbeat; export type FetchOptions = FetchBytes | FetchMessages; -export type ConsumeOptions = ConsumeBytes | ConsumeMessages; export type PullConsumerOptions = FetchOptions | ConsumeOptions; @@ -2736,7 +2741,7 @@ export type IdleHeartbeat = { idle_heartbeat?: number; }; -export type ConsumerCallbackFn = (r: Result) => void; +export type ConsumerCallbackFn = (r: JsMsg) => void; export type ConsumeCallback = { /** * Process messages using a callback instead of an iterator. Note that when using callbacks, @@ -2791,6 +2796,15 @@ export interface ConsumerStatus { data: unknown; } +export interface ExportedConsumer { + fetch( + opts?: FetchOptions, + ): Promise; + consume( + opts?: ConsumeOptions, + ): Promise; +} + export interface Consumer extends ExportedConsumer { info(cached?: boolean): Promise; delete(): Promise; @@ -2800,12 +2814,12 @@ export interface Close { close(): Promise; } -export type ValueResult = { +type ValueResult = { isError: false; value: T; }; -export type ErrorResult = { +type ErrorResult = { isError: true; error: Error; }; @@ -2813,18 +2827,10 @@ export type ErrorResult = { /** * Result is a value that may have resulted in an error. */ -export type Result = ValueResult | ErrorResult; -export interface ConsumerMessages extends QueuedIterator>, Close { - status(): Promise>; -} +type Result = ValueResult | ErrorResult; -export interface ExportedConsumer { - fetch( - opts?: FetchOptions, - ): Promise; - consume( - opts?: ConsumeOptions, - ): Promise; +export interface ConsumerMessages extends QueuedIterator, Close { + status(): Promise>; } export interface StreamNames { diff --git a/tests/consumers_test.ts b/tests/consumers_test.ts index 49bf9f2b..22c67185 100644 --- a/tests/consumers_test.ts +++ b/tests/consumers_test.ts @@ -38,7 +38,6 @@ import { } from "../nats-base-client/types.ts"; import { StringCodec } from "../nats-base-client/codec.ts"; import { deferred, nanos, nuid } from "../nats-base-client/mod.ts"; -import { fail } from "https://deno.land/std@0.179.0/testing/asserts.ts"; import { NatsServer } from "./helpers/launcher.ts"; import { connect } from "../src/connect.ts"; import { PullConsumerMessagesImpl } from "../nats-base-client/consumermessages.ts"; @@ -176,13 +175,8 @@ Deno.test("consumers - fetch no messages", async () => { max_messages: 100, expires: 1000, }); - for await (const o of iter) { - if (o.isError) { - console.error(o.error); - continue; - } - console.log(o.value.seq); - o.value?.ack(); + for await (const m of iter) { + m.ack(); } assertEquals(iter.getReceived(), 0); assertEquals(iter.getProcessed(), 0); @@ -206,12 +200,8 @@ Deno.test("consumers - fetch less messages", async () => { const consumer = await js.consumers.get(stream, "b"); assertEquals((await consumer.info(true)).num_pending, 1); const iter = await consumer.fetch({ expires: 1000, max_messages: 10 }); - for await (const o of iter) { - if (o.isError) { - console.error(o.error); - continue; - } - o.value.ack(); + for await (const m of iter) { + m.ack(); } assertEquals(iter.getReceived(), 1); assertEquals(iter.getProcessed(), 1); @@ -242,12 +232,8 @@ Deno.test("consumers - fetch exactly messages", async () => { assertEquals((await consumer.info(true)).num_pending, 200); const iter = await consumer.fetch({ expires: 5000, max_messages: 100 }); - for await (const o of iter) { - if (o.isError) { - fail(`failed with ${o.error}`); - } else { - o.value.ack(); - } + for await (const m of iter) { + m.ack(); } assertEquals(iter.getReceived(), 100); assertEquals(iter.getProcessed(), 100); @@ -280,18 +266,14 @@ Deno.test("consumers - consume", async () => { expires: 10_000, max_messages: 50_000, }); - for await (const o of iter) { - if (o.isError) { - fail(`failed with ${o.error}`); - } else { - o.value.ack(); - if (o.value.seq === count) { - const millis = Date.now() - start; - console.log( - `consumer: ${millis}ms - ${count / (millis / 1000)} msgs/sec`, - ); - break; - } + for await (const m of iter) { + m.ack(); + if (m.seq === count) { + const millis = Date.now() - start; + console.log( + `consumer: ${millis}ms - ${count / (millis / 1000)} msgs/sec`, + ); + break; } } assertEquals(iter.getReceived(), count); @@ -319,12 +301,8 @@ Deno.test("consumers - consume callback rejects iter", async () => { const iter = await consumer.consume({ expires: 10_000, max_messages: 50_000, - callback: (r) => { - if (r.isError) { - fail(`failed with ${r.error}`); - } else { - r.value.ack(); - } + callback: (m) => { + m.ack(); }, }); await assertRejects( @@ -712,13 +690,9 @@ Deno.test("consumers - threshold_messages", async () => { } })().then(); - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - if (r.value.info.pending === 0) { - iter.stop(); - } + for await (const m of iter) { + if (m.info.pending === 0) { + iter.stop(); } } @@ -777,19 +751,13 @@ Deno.test("consumers - threshold_messages bytes", async () => { } })().then(); - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - // console.log(((r.value as JsMsgImpl).msg as MsgImpl).size()) - const seq = r.value.seq; - a[seq] = true; - r.value.ack(); - if (r.value.info.pending === 0) { - setTimeout(() => { - iter.stop(); - }, 1000); - } + for await (const m of iter) { + a[m.seq] = true; + m.ack(); + if (m.info.pending === 0) { + setTimeout(() => { + iter.stop(); + }, 1000); } } diff --git a/tests/consumersordered_test.ts b/tests/consumersordered_test.ts index 9e7c3bc8..81ceb6ec 100644 --- a/tests/consumersordered_test.ts +++ b/tests/consumersordered_test.ts @@ -18,10 +18,9 @@ import { assertEquals, assertExists, assertRejects, - fail, } from "https://deno.land/std@0.125.0/testing/asserts.ts"; import { OrderedPullConsumerImpl } from "../nats-base-client/consumer.ts"; -import { DeliverPolicy, JsMsg, Result } from "../nats-base-client/types.ts"; +import { DeliverPolicy, JsMsg } from "../nats-base-client/types.ts"; import { deferred } from "../nats-base-client/mod.ts"; import { notCompatible } from "./helpers/mod.ts"; import { delay } from "../nats-base-client/util.ts"; @@ -72,23 +71,15 @@ Deno.test("ordered - fetch", async () => { assertExists(oc); let iter = await oc.fetch({ max_messages: 1 }); - for await (const r of iter) { - if (r.isError) { - throw r.error; - } else { - assertEquals(r.value.subject, "test.a"); - assertEquals(r.value.seq, 1); - } + for await (const m of iter) { + assertEquals(m.subject, "test.a"); + assertEquals(m.seq, 1); } iter = await oc.fetch({ max_messages: 1 }); - for await (const r of iter) { - if (r.isError) { - throw r.error; - } else { - assertEquals(r.value.subject, "test.b"); - assertEquals(r.value.seq, 2); - } + for await (const m of iter) { + assertEquals(m.subject, "test.b"); + assertEquals(m.seq, 2); } await cleanup(ns, nc); @@ -113,20 +104,16 @@ Deno.test("ordered - fetch reset", async () => { const seen: number[] = new Array(3).fill(0); let done = deferred(); - const callback = (r: Result) => { - if (r.isError) { - throw r.error; - } else { - const idx = r.value.seq - 1; - seen[idx]++; - // mess with the internals so we see these again - if (seen[idx] === 1) { - oc.cursor.deliver_seq--; - oc.cursor.stream_seq--; - } - iter.stop(); - done.resolve(); + const callback = (m: JsMsg) => { + const idx = m.seq - 1; + seen[idx]++; + // mess with the internals so we see these again + if (seen[idx] === 1) { + oc.cursor.deliver_seq--; + oc.cursor.stream_seq--; } + iter.stop(); + done.resolve(); }; let iter = await oc.fetch({ @@ -201,21 +188,17 @@ Deno.test("ordered - consume reset", async () => { const seen: number[] = new Array(3).fill(0); const done = deferred(); - const callback = (r: Result) => { - if (r.isError) { - throw r.error; - } else { - const idx = r.value.seq - 1; - seen[idx]++; - // mess with the internals so we see these again - if (seen[idx] === 1) { - oc.cursor.deliver_seq--; - oc.cursor.stream_seq--; - } - if (r.value.info.pending === 0) { - iter.stop(); - done.resolve(); - } + const callback = (r: JsMsg) => { + const idx = r.seq - 1; + seen[idx]++; + // mess with the internals so we see these again + if (seen[idx] === 1) { + oc.cursor.deliver_seq--; + oc.cursor.stream_seq--; + } + if (r.info.pending === 0) { + iter.stop(); + done.resolve(); } }; @@ -248,13 +231,9 @@ Deno.test("ordered - consume", async () => { assertExists(oc); const iter = await oc.consume({ max_messages: 1 }); - for await (const r of iter) { - if (r.isError) { - throw r.error; - } else { - if (r.value.info.pending === 0) { - break; - } + for await (const m of iter) { + if (m.info.pending === 0) { + break; } } @@ -279,16 +258,13 @@ Deno.test("ordered - filters consume", async () => { assertExists(oc); const iter = await oc.consume(); - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals("test.b", r.value.subject); - if (r.value.info.pending === 0) { - break; - } + for await (const m of iter) { + assertEquals("test.b", m.subject); + if (m.info.pending === 0) { + break; } } + assertEquals(iter.getProcessed(), 1); await cleanup(ns, nc); @@ -312,12 +288,8 @@ Deno.test("ordered - filters fetch", async () => { assertExists(oc); const iter = await oc.fetch({ expires: 1000 }); - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals("test.b", r.value.subject); - } + for await (const m of iter) { + assertEquals("test.b", m.subject); } assertEquals(iter.getProcessed(), 1); @@ -419,12 +391,8 @@ Deno.test("ordered - last per subject", async () => { }); let iter = await oc.fetch({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 2); - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 2); } })(); @@ -433,13 +401,9 @@ Deno.test("ordered - last per subject", async () => { }); iter = await oc.consume({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 2); - break; - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 2); + break; } })(); @@ -466,12 +430,8 @@ Deno.test("ordered - start sequence", async () => { let iter = await oc.fetch({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 2); - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 2); } })(); @@ -481,13 +441,9 @@ Deno.test("ordered - start sequence", async () => { iter = await oc.consume({ max_messages: 1 }); await (async () => { for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 2); - assertEquals(r.value.subject, "test.b"); - break; - } + assertEquals(r.info.streamSequence, 2); + assertEquals(r.subject, "test.b"); + break; } })(); @@ -514,13 +470,9 @@ Deno.test("ordered - last", async () => { let iter = await oc.fetch({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 2); - assertEquals(r.value.subject, "test.b"); - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 2); + assertEquals(m.subject, "test.b"); } })(); @@ -529,14 +481,10 @@ Deno.test("ordered - last", async () => { }); iter = await oc.consume({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 2); - assertEquals(r.value.subject, "test.b"); - break; - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 2); + assertEquals(m.subject, "test.b"); + break; } })(); @@ -564,13 +512,9 @@ Deno.test("ordered - new", async () => { let iter = await oc.fetch({ max_messages: 1 }); await (async () => { await js.publish("test.c"); - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 3); - assertEquals(r.value.subject, "test.c"); - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 3); + assertEquals(m.subject, "test.c"); } })(); @@ -580,14 +524,10 @@ Deno.test("ordered - new", async () => { iter = await oc.consume({ max_messages: 1 }); await (async () => { await js.publish("test.d"); - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 4); - assertEquals(r.value.subject, "test.d"); - break; - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 4); + assertEquals(m.subject, "test.d"); + break; } })(); @@ -620,13 +560,9 @@ Deno.test("ordered - start time", async () => { let iter = await oc.fetch({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 3); - assertEquals(r.value.subject, "test.c"); - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 3); + assertEquals(m.subject, "test.c"); } })(); @@ -636,14 +572,10 @@ Deno.test("ordered - start time", async () => { }); iter = await oc.consume({ max_messages: 1 }); await (async () => { - for await (const r of iter) { - if (r.isError) { - fail(r.error.message); - } else { - assertEquals(r.value.info.streamSequence, 3); - assertEquals(r.value.subject, "test.c"); - break; - } + for await (const m of iter) { + assertEquals(m.info.streamSequence, 3); + assertEquals(m.subject, "test.c"); + break; } })();