Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGE] remove Result<T> from simplification apis #503

Merged
merged 2 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ bundle:
deno bundle --log-level info --unstable src/mod.ts ./nats.js

fmt:
deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ jetstream.md README.md services.md
deno fmt src/ doc/ bin/ nats-base-client/ examples/ tests/ debug/ jetstream.md README.md services.md
22 changes: 10 additions & 12 deletions nats-base-client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
FetchOptions,
JsMsg,
ReplayPolicy,
Result,
} from "./types.ts";
import { timeout } from "./util.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
Expand Down Expand Up @@ -228,23 +227,22 @@ export class OrderedPullConsumerImpl implements Consumer {

internalHandler(serial: number) {
// this handler will be noop if the consumer's serial changes
return (r: Result<JsMsg>): void => {
return (m: JsMsg): void => {
if (this.serial !== serial) {
return;
}
if (!r.isError) {
const dseq = r.value.info.deliverySequence;
if (dseq !== this.cursor.deliver_seq + 1) {
this.reset(this.opts);
return;
}
this.cursor.deliver_seq = dseq;
this.cursor.stream_seq = r.value.info.streamSequence;
const dseq = m.info.deliverySequence;
if (dseq !== this.cursor.deliver_seq + 1) {
this.reset(this.opts);
return;
}
this.cursor.deliver_seq = dseq;
this.cursor.stream_seq = m.info.streamSequence;

if (this.userCallback) {
this.userCallback(r);
this.userCallback(m);
} else {
this.iter?.push(r);
this.iter?.push(m);
}
};
}
Expand Down
54 changes: 22 additions & 32 deletions nats-base-client/consumermessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import type {
ConsumerStatus,
FetchOptions,
PullOptions,
Result,
Subscription,
} from "./types.ts";
import {
Expand All @@ -41,7 +40,7 @@ import { MsgImpl } from "./msg.ts";
import { Timeout } from "./util.ts";
import { toJsMsg } from "./jsmsg.ts";

export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
consumer: PullConsumerImpl;
opts: Record<string, number>;
Expand All @@ -66,14 +65,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
super();
this.consumer = c;

const args = this.parseOptions(opts, refilling);
if (args.isError) {
throw args.error;
}
this.opts = this.parseOptions(opts, refilling);
this.callback = (opts as ConsumeCallback).callback || null;
this.noIterator = typeof this.callback === "function";

this.opts = args.value!;
this.monitor = null;
this.pong = null;
this.pending = { msgs: 0, bytes: 0, requests: 0 };
Expand Down Expand Up @@ -150,9 +144,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
// we got a bad request - no progress here
if (code === 400) {
const error = toErr();
this._push({
isError: true,
error,
//@ts-ignore: fn
this._push(() => {
this.stop(error);
});
} else if (code === 409 && description === "consumer deleted") {
const error = toErr();
Expand All @@ -166,7 +160,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
}
} else {
// push the user message
this._push({ isError: false, value: toJsMsg(msg) });
this._push(toJsMsg(msg));
this.received++;
if (this.pending.msgs) {
this.pending.msgs--;
Expand Down Expand Up @@ -249,7 +243,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
this.pull(this.pullOptions());
}

_push(r: Result<JsMsg>) {
_push(r: JsMsg) {
if (!this.callback) {
super.push(r);
} else {
Expand All @@ -261,18 +255,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
fn();
}
} catch (err) {
this.callback({ isError: true, error: err });
this.stop(err);
}
}
}

notify(type: ConsumerEvents | ConsumerDebugEvents, data: unknown) {
if (this.listeners.length > 0) {
this.listeners.forEach((l) => {
if (!(l as QueuedIteratorImpl<ConsumerStatus>).done) {
l.push({ type, data });
}
});
(() => {
this.listeners.forEach((l) => {
if (!(l as QueuedIteratorImpl<ConsumerStatus>).done) {
l.push({ type, data });
}
});
})();
}
}

Expand Down Expand Up @@ -381,18 +377,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
parseOptions(
opts: PullConsumerOptions,
refilling = false,
): Result<Record<string, number>> {
): Record<string, number> {
const args = (opts || {}) as Record<string, number>;
args.max_messages = args.max_messages || 0;
args.max_bytes = args.max_bytes || 0;

if (args.max_messages !== 0 && args.max_bytes !== 0) {
return {
isError: true,
error: new Error(
`only specify one of max_messages or max_bytes`,
),
};
throw new Error(
`only specify one of max_messages or max_bytes`,
);
}

// we must have at least one limit - default to 100 msgs
Expand All @@ -410,10 +403,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>

args.expires = args.expires || 30_000;
if (args.expires < 1000) {
return {
isError: true,
error: new Error("expires should be at least 1000ms"),
};
throw new Error("expires should be at least 1000ms");
}

// require idle_heartbeat
Expand All @@ -430,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
args.threshold_bytes = args.threshold_bytes || minBytes;
}

return { value: args, isError: false };
return args;
}

status(): Promise<AsyncIterable<ConsumerStatus>> {
Expand All @@ -440,7 +430,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<Result<JsMsg>>
}
}

export class OrderedConsumerMessages extends QueuedIteratorImpl<Result<JsMsg>>
export class OrderedConsumerMessages extends QueuedIteratorImpl<JsMsg>
implements ConsumerMessages {
src!: PullConsumerMessagesImpl;

Expand Down
1 change: 0 additions & 1 deletion nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ export type {
Republish,
RequestManyOptions,
RequestOptions,
Result,
RoKV,
SeqMsgRequest,
SequenceInfo,
Expand Down
1 change: 0 additions & 1 deletion nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ export type {
RepublishHeaders,
RequestManyOptions,
RequestOptions,
Result,
RoKV,
SchemaInfo,
SeqMsgRequest,
Expand Down
36 changes: 21 additions & 15 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2642,6 +2642,10 @@ export interface ConsumerUpdateConfig {
metadata?: Record<string, string>;
}

export type Ordered = {
ordered: true;
};

export type ConsumeBytes =
& MaxBytes
& Partial<MaxMessages>
Expand All @@ -2657,6 +2661,8 @@ export type ConsumeMessages =
& IdleHeartbeat
& ConsumeCallback;

export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

/**
* Options for fetching
*/
Expand All @@ -2675,7 +2681,6 @@ export type FetchMessages =
& IdleHeartbeat;

export type FetchOptions = FetchBytes | FetchMessages;
export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

export type PullConsumerOptions = FetchOptions | ConsumeOptions;

Expand Down Expand Up @@ -2736,7 +2741,7 @@ export type IdleHeartbeat = {
idle_heartbeat?: number;
};

export type ConsumerCallbackFn = (r: Result<JsMsg>) => void;
export type ConsumerCallbackFn = (r: JsMsg) => void;
export type ConsumeCallback = {
/**
* Process messages using a callback instead of an iterator. Note that when using callbacks,
Expand Down Expand Up @@ -2791,6 +2796,15 @@ export interface ConsumerStatus {
data: unknown;
}

export interface ExportedConsumer {
fetch(
opts?: FetchOptions,
): Promise<ConsumerMessages>;
consume(
opts?: ConsumeOptions,
): Promise<ConsumerMessages>;
}

export interface Consumer extends ExportedConsumer {
info(cached?: boolean): Promise<ConsumerInfo>;
delete(): Promise<boolean>;
Expand All @@ -2800,31 +2814,23 @@ export interface Close {
close(): Promise<void>;
}

export type ValueResult<T> = {
type ValueResult<T> = {
isError: false;
value: T;
};

export type ErrorResult = {
type ErrorResult = {
isError: true;
error: Error;
};

/**
* Result is a value that may have resulted in an error.
*/
export type Result<T> = ValueResult<T> | ErrorResult;
export interface ConsumerMessages extends QueuedIterator<Result<JsMsg>>, Close {
status(): Promise<AsyncIterable<ConsumerStatus>>;
}
type Result<T> = ValueResult<T> | ErrorResult;

export interface ExportedConsumer {
fetch(
opts?: FetchOptions,
): Promise<ConsumerMessages>;
consume(
opts?: ConsumeOptions,
): Promise<ConsumerMessages>;
export interface ConsumerMessages extends QueuedIterator<JsMsg>, Close {
status(): Promise<AsyncIterable<ConsumerStatus>>;
}

export interface StreamNames {
Expand Down
Loading
Loading