Skip to content

Commit

Permalink
[CHANGE] remove Result<T> from simplification apis (#503)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 2, 2023
1 parent 888d04d commit 0863710
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 256 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ bundle:
deno bundle --log-level info --unstable src/mod.ts ./nats.js

fmt:
deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ jetstream.md README.md services.md
deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ debug/ jetstream.md README.md services.md
22 changes: 10 additions & 12 deletions nats-base-client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
FetchOptions,
JsMsg,
ReplayPolicy,
Result,
} from "./types.ts";
import { timeout } from "./util.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
Expand Down Expand Up @@ -228,23 +227,22 @@ export class OrderedPullConsumerImpl implements Consumer {

internalHandler(serial: number) {
// this handler will be noop if the consumer's serial changes
return (r: Result<JsMsg>): void => {
return (m: JsMsg): void => {
if (this.serial !== serial) {
return;
}
if (!r.isError) {
const dseq = r.value.info.deliverySequence;
if (dseq !== this.cursor.deliver_seq + 1) {
this.reset(this.opts);
return;
}
this.cursor.deliver_seq = dseq;
this.cursor.stream_seq = r.value.info.streamSequence;
const dseq = m.info.deliverySequence;
if (dseq !== this.cursor.deliver_seq + 1) {
this.reset(this.opts);
return;
}
this.cursor.deliver_seq = dseq;
this.cursor.stream_seq = m.info.streamSequence;

if (this.userCallback) {
this.userCallback(r);
this.userCallback(m);
} else {
this.iter?.push(r);
this.iter?.push(m);
}
};
}
Expand Down
54 changes: 22 additions & 32 deletions nats-base-client/consumermessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import type {
ConsumerStatus,
FetchOptions,
PullOptions,
Result,
Subscription,
} from "./types.ts";
import {
Expand All @@ -41,7 +40,7 @@ import { MsgImpl } from "./msg.ts";
import { Timeout } from "./util.ts";
import { toJsMsg } from "./jsmsg.ts";

export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
consumer: PullConsumerImpl;
opts: Record<string, number>;
Expand All @@ -66,14 +65,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
super();
this.consumer = c;

const args = this.parseOptions(opts, refilling);
if (args.isError) {
throw args.error;
}
this.opts = this.parseOptions(opts, refilling);
this.callback = (opts as ConsumeCallback).callback || null;
this.noIterator = typeof this.callback === "function";

this.opts = args.value!;
this.monitor = null;
this.pong = null;
this.pending = { msgs: 0, bytes: 0, requests: 0 };
Expand Down Expand Up @@ -150,9 +144,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
// we got a bad request - no progress here
if (code === 400) {
const error = toErr();
this._push({
isError: true,
error,
//@ts-ignore: fn
this._push(() => {
this.stop(error);
});
} else if (code === 409 && description === "consumer deleted") {
const error = toErr();
Expand All @@ -166,7 +160,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
}
} else {
// push the user message
this._push({ isError: false, value: toJsMsg(msg) });
this._push(toJsMsg(msg));
this.received++;
if (this.pending.msgs) {
this.pending.msgs--;
Expand Down Expand Up @@ -249,7 +243,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
this.pull(this.pullOptions());
}

_push(r: Result<JsMsg>) {
_push(r: JsMsg) {
if (!this.callback) {
super.push(r);
} else {
Expand All @@ -261,18 +255,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
fn();
}
} catch (err) {
this.callback({ isError: true, error: err });
this.stop(err);
}
}
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
if (this.listeners.length > 0) {
this.listeners.forEach((l) => {
if (!(l as QueuedIteratorImpl<ConsumerStatus>).done) {
l.push({ type, data });
}
});
(() => {
this.listeners.forEach((l) => {
if (!(l as QueuedIteratorImpl<ConsumerStatus>).done) {
l.push({ type, data });
}
});
})();
}
}

Expand Down Expand Up @@ -381,18 +377,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
parseOptions(
opts: PullConsumerOptions,
refilling = false,
): Result<Record<string, number>> {
): Record<string, number> {
const args = (opts || {}) as Record<string, number>;
args.max_messages = args.max_messages || 0;
args.max_bytes = args.max_bytes || 0;

if (args.max_messages !== 0 && args.max_bytes !== 0) {
return {
isError: true,
error: new Error(
`only specify one of max_messages or max_bytes`,
),
};
throw new Error(
`only specify one of max_messages or max_bytes`,
);
}

// we must have at least one limit - default to 100 msgs
Expand All @@ -410,10 +403,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>

args.expires = args.expires || 30_000;
if (args.expires < 1000) {
return {
isError: true,
error: new Error("expires should be at least 1000ms"),
};
throw new Error("expires should be at least 1000ms");
}

// require idle_heartbeat
Expand All @@ -430,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
args.threshold_bytes = args.threshold_bytes || minBytes;
}

return { value: args, isError: false };
return args;
}

status(): Promise<AsyncIterable<ConsumerStatus>> {
Expand All @@ -440,7 +430,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
}
}

export class OrderedConsumerMessages extends QueuedIteratorImpl<Result<JsMsg>>
export class OrderedConsumerMessages extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
src!: PullConsumerMessagesImpl;

Expand Down
1 change: 0 additions & 1 deletion nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ export type {
Republish,
RequestManyOptions,
RequestOptions,
Result,
RoKV,
SeqMsgRequest,
SequenceInfo,
Expand Down
1 change: 0 additions & 1 deletion nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ export type {
RepublishHeaders,
RequestManyOptions,
RequestOptions,
Result,
RoKV,
SchemaInfo,
SeqMsgRequest,
Expand Down
36 changes: 21 additions & 15 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2642,6 +2642,10 @@ export interface ConsumerUpdateConfig {
metadata?: Record<string, string>;
}

export type Ordered = {
ordered: true;
};

export type ConsumeBytes =
& MaxBytes
& Partial<MaxMessages>
Expand All @@ -2657,6 +2661,8 @@ export type ConsumeMessages =
& IdleHeartbeat
& ConsumeCallback;

export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

/**
* Options for fetching
*/
Expand All @@ -2675,7 +2681,6 @@ export type FetchMessages =
& IdleHeartbeat;

export type FetchOptions = FetchBytes | FetchMessages;
export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

export type PullConsumerOptions = FetchOptions | ConsumeOptions;

Expand Down Expand Up @@ -2736,7 +2741,7 @@ export type IdleHeartbeat = {
idle_heartbeat?: number;
};

export type ConsumerCallbackFn = (r: Result<JsMsg>) => void;
export type ConsumerCallbackFn = (r: JsMsg) => void;
export type ConsumeCallback = {
/**
* Process messages using a callback instead of an iterator. Note that when using callbacks,
Expand Down Expand Up @@ -2791,6 +2796,15 @@ export interface ConsumerStatus {
data: unknown;
}

export interface ExportedConsumer {
fetch(
opts?: FetchOptions,
): Promise<ConsumerMessages>;
consume(
opts?: ConsumeOptions,
): Promise<ConsumerMessages>;
}

export interface Consumer extends ExportedConsumer {
info(cached?: boolean): Promise<ConsumerInfo>;
delete(): Promise<boolean>;
Expand All @@ -2800,31 +2814,23 @@ export interface Close {
close(): Promise<void>;
}

export type ValueResult<T> = {
type ValueResult<T> = {
isError: false;
value: T;
};

export type ErrorResult = {
type ErrorResult = {
isError: true;
error: Error;
};

/**
* Result is a value that may have resulted in an error.
*/
export type Result<T> = ValueResult<T> | ErrorResult;
export interface ConsumerMessages extends QueuedIterator<Result<JsMsg>>, Close {
status(): Promise<AsyncIterable<ConsumerStatus>>;
}
type Result<T> = ValueResult<T> | ErrorResult;

export interface ExportedConsumer {
fetch(
opts?: FetchOptions,
): Promise<ConsumerMessages>;
consume(
opts?: ConsumeOptions,
): Promise<ConsumerMessages>;
export interface ConsumerMessages extends QueuedIterator<JsMsg>, Close {
status(): Promise<AsyncIterable<ConsumerStatus>>;
}

export interface StreamNames {
Expand Down

0 comments on commit 0863710

Please sign in to comment.