Skip to content

Commit

Permalink
[fix/feat] consumer options don't provide access to all the options p…
Browse files Browse the repository at this point in the history
…ossible - provided a path for specifying a new consumer configuration to the builder, unblocking the requirement to pre-create those types of consumers. (#156)

FIX #153
  • Loading branch information
aricart committed May 14, 2021
1 parent 7a7a9e6 commit bb5379b
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 38 deletions.
2 changes: 1 addition & 1 deletion nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export type {
Consumer,
ConsumerConfig,
ConsumerOpts,
ConsumerOptsBuilder,
DeliveryInfo,
JetStreamAccountStats,
JetStreamApiStats,
Expand Down Expand Up @@ -64,7 +65,6 @@ export {
} from "./types.ts";

export { consumerOpts } from "./jsconsumeropts.ts";
export type { ConsumerOptsBuilder } from "./jsconsumeropts.ts";
export { toJsMsg } from "./jsmsg.ts";

export { DebugEvents, Empty, Events } from "./types.ts";
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { Timeout, timeout } from "./util.ts";
import { createInbox } from "./protocol.ts";
import { headers } from "./headers.ts";
import type { ConsumerOptsBuilder } from "./jsconsumeropts.ts";
import type { ConsumerOptsBuilder } from "./types.ts";
import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";

export interface JetStreamSubscriptionInfoable {
Expand Down
47 changes: 14 additions & 33 deletions nats-base-client/jsconsumeropts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,36 @@ import {
AckPolicy,
ConsumerConfig,
ConsumerOpts,
ConsumerOptsBuilder,
DeliverPolicy,
JsMsgCallback,
Nanos,
} from "./types.ts";
import { defaultConsumer, validateDurableName } from "./jsutil.ts";

export interface ConsumerOptsBuilder {
deliverTo(subject: string): void;
manualAck(): void;
durable(name: string): void;
deliverAll(): void;
deliverLast(): void;
deliverNew(): void;
startSequence(seq: number): void;
startTime(time: Date | Nanos): void;
ackNone(): void;
ackAll(): void;
ackExplicit(): void;
maxDeliver(max: number): void;
maxAckPending(max: number): void;
maxWaiting(max: number): void;
maxMessages(max: number): void;
callback(fn: JsMsgCallback): void;

// FIXME: 503:
// maxRetries()
// retryBackoff()

// ackWait(time)
// replayOriginal()
// rateLimit(bytesPerSec)
}

export function consumerOpts(): ConsumerOptsBuilder {
return new ConsumerOptsBuilderImpl();
export function consumerOpts(
opts?: Partial<ConsumerConfig>,
): ConsumerOptsBuilder {
return new ConsumerOptsBuilderImpl(opts);
}

// FIXME: some items here that may need to be addressed
// 503s?
// maxRetries()
// retryBackoff()
// ackWait(time)
// replayOriginal()
// rateLimit(bytesPerSec)
export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
config: Partial<ConsumerConfig>;
mack: boolean;
stream: string;
callbackFn?: JsMsgCallback;
max?: number;

constructor() {
constructor(opts?: Partial<ConsumerConfig>) {
this.stream = "";
this.mack = false;
this.config = defaultConsumer("");
this.config = defaultConsumer("", opts || {});
// not set
this.config.ack_policy = AckPolicy.All;
}
Expand Down Expand Up @@ -132,7 +114,6 @@ export class ConsumerOptsBuilderImpl implements ConsumerOptsBuilder {
maxDeliver(max: number) {
this.config.max_deliver = max;
}

maxAckPending(max: number) {
this.config.max_ack_pending = max;
}
Expand Down
19 changes: 17 additions & 2 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,22 +325,37 @@ export interface ConsumerOpts {
}

export interface ConsumerOptsBuilder {
// deliverTo sets the subject a push consumer receives messages on
deliverTo(subject: string): void;
// prevents the consumer implementation from auto-acking messages
manualAck(): void;
// sets the durable name
durable(name: string): void;
// consumer will start at first available message on the stream
deliverAll(): void;
// consumer will start at the last message
deliverLast(): void;
// consumer will start with new messages (not yet in the stream)
deliverNew(): void;
// consumer will start at the message with the specified sequence
startSequence(seq: number): void;
// consumer will start with messages received on the specified time/date
startTime(time: Date | Nanos): void;
// the consumer will not ack messages
ackNone(): void;
// acking a message, implicitly acks all messages with a lower sequence
ackAll(): void;
// consumer will ack all messages
ackExplicit(): void;
// number of re-delivery attempts for a particular message
maxDeliver(max: number): void;
// max number of outstanding acks before the server stops sending new messages
maxAckPending(max: number): void;
// FIXME: pullMaxWaiting
// max count of outstanding messages scheduled via batch pulls (pulls are additive)
maxWaiting(max: number): void;
// standard nats subscribe option for the maximum number of messages to receive on the subscription
maxMessages(max: number): void;
// callback to process messages (or iterator is returned)
callback(fn: JsMsgCallback): void;
}

Expand Down Expand Up @@ -661,7 +676,7 @@ export interface ConsumerConfig {
"opt_start_seq"?: number;
"opt_start_time"?: string;
"ack_policy": AckPolicy;
"ack_wait"?: number;
"ack_wait"?: Nanos;
"max_deliver"?: number;
"filter_subject"?: string;
"replay_policy": ReplayPolicy;
Expand Down
2 changes: 1 addition & 1 deletion tests/consumeropts_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Deno.test("consumeropts - maxDeliver", () => {
assertEquals(args.config.max_deliver, 100);
});

Deno.test("consumeropts - maxAcPending", () => {
Deno.test("consumeropts - maxAckPending", () => {
const opts = consumerOpts() as ConsumerOptsBuilderImpl;
opts.maxAckPending(100);
assertEquals(opts.config.max_ack_pending, 100);
Expand Down

0 comments on commit bb5379b

Please sign in to comment.