Skip to content

Commit

Permalink
[FEAT] consumer.next() to retrieve single message
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 2, 2023
1 parent 0863710 commit d363bab
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 17 deletions.
61 changes: 60 additions & 1 deletion nats-base-client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import {
FetchMessages,
FetchOptions,
JsMsg,
NextOptions,
ReplayPolicy,
} from "./types.ts";
import { timeout } from "./util.ts";
import { deferred, timeout } from "./util.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import {
OrderedConsumerMessages,
Expand Down Expand Up @@ -86,6 +87,38 @@ export class PullConsumerImpl implements Consumer {
return Promise.resolve(m);
}

next(
opts: NextOptions = { expires: 30_000 },
): Promise<JsMsg | null> {
const d = deferred<JsMsg | null>();
const fopts = opts as FetchMessages;
fopts.max_messages = 1;

const iter = new PullConsumerMessagesImpl(this, fopts, false);
(async () => {
for await (const m of iter) {
d.resolve(m);
break;
}
})().catch();
// FIXME: need some way to pad this correctly
const to = Math.round(iter.opts.expires * 1.05);
const timer = timeout(to);
iter.closed().then(() => {
d.resolve(null);
timer.cancel();
}).catch((err) => {
d.reject(err);
});
timer.catch((err) => {
d.resolve(null);
iter.close().catch();
});
iter.trackTimeout(timer);

return d;
}

delete(): Promise<boolean> {
const { stream_name, name } = this._info;
return this.api.delete(stream_name, name);
Expand Down Expand Up @@ -320,6 +353,32 @@ export class OrderedPullConsumerImpl implements Consumer {
return this.reset(opts, true);
}

async next(
opts: NextOptions = { expires: 30_000 },
): Promise<JsMsg | null> {
const d = deferred<JsMsg | null>();
const copts = opts as ConsumeOptions;
copts.max_messages = 1;
copts.callback = (m) => {
// we can clobber the callback, because they are not supported
// except on consume, which will fail when we try to fetch
this.userCallback = null;
d.resolve(m);
};
const iter = await this.fetch(
copts as FetchMessages,
) as OrderedConsumerMessages;
iter.iterClosed
.then(() => {
d.resolve(null);
})
.catch((err) => {
d.reject(err);
});

return d;
}

delete(): Promise<boolean> {
if (!this.currentConsumer) {
return Promise.resolve(false);
Expand Down
1 change: 0 additions & 1 deletion nats-base-client/consumermessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}

closed(): Promise<void> {
// FIXME: check what ordered consumer usage of this is by user code
return this.iterClosed;
}

Expand Down
20 changes: 5 additions & 15 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,8 @@ export type Ordered = {
ordered: true;
};

export type NextOptions = Expires;

export type ConsumeBytes =
& MaxBytes
& Partial<MaxMessages>
Expand Down Expand Up @@ -2797,6 +2799,9 @@ export interface ConsumerStatus {
}

export interface ExportedConsumer {
next(
opts?: NextOptions,
): Promise<JsMsg | null>;
fetch(
opts?: FetchOptions,
): Promise<ConsumerMessages>;
Expand All @@ -2814,21 +2819,6 @@ export interface Close {
close(): Promise<void>;
}

type ValueResult<T> = {
isError: false;
value: T;
};

type ErrorResult = {
isError: true;
error: Error;
};

/**
* Result is a value that may have resulted in an error.
*/
type Result<T> = ValueResult<T> | ErrorResult;

export interface ConsumerMessages extends QueuedIterator<JsMsg>, Close {
status(): Promise<AsyncIterable<ConsumerStatus>>;
}
Expand Down
15 changes: 15 additions & 0 deletions nats-base-client/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ export const CRLF = DataBuffer.fromAscii(CR_LF);
export const CR = new Uint8Array(CRLF)[0]; // 13
export const LF = new Uint8Array(CRLF)[1]; // 10

export type ValueResult<T> = {
isError: false;
value: T;
};

export type ErrorResult = {
isError: true;
error: Error;
};

/**
* Result is a value that may have resulted in an error.
*/
export type Result<T> = ValueResult<T> | ErrorResult;

export function isUint8Array(a: unknown): boolean {
return a instanceof Uint8Array;
}
Expand Down
36 changes: 36 additions & 0 deletions tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -781,3 +781,39 @@ Deno.test("consumers - threshold_messages bytes", async () => {

await cleanup(ns, nc);
});

Deno.test("consumers - next", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
const c = await js.consumers.get(stream, stream);
let ci = await c.info(true);
assertEquals(ci.num_pending, 0);

let m = await c.next({ expires: 1000 });
assertEquals(m, null);

await Promise.all([js.publish(subj), js.publish(subj)]);
ci = await c.info();
assertEquals(ci.num_pending, 2);

m = await c.next();
assertEquals(m?.seq, 1);
m?.ack();
await nc.flush();

ci = await c.info();
assertEquals(ci?.num_pending, 1);
m = await c.next();
assertEquals(m?.seq, 2);
m?.ack();

await cleanup(ns, nc);
});
27 changes: 27 additions & 0 deletions tests/consumersordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,30 @@ Deno.test("ordered - start time", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered - next", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "test", subjects: ["test"] });
const js = nc.jetstream();

const c = await js.consumers.ordered("test");
let m = await c.next({ expires: 1000 });
assertEquals(m, null);

await Promise.all([
js.publish("test"),
js.publish("test"),
]);

m = await c.next();
assertEquals(m?.seq, 1);

m = await c.next();
assertEquals(m?.seq, 2);

await cleanup(ns, nc);
});

0 comments on commit d363bab

Please sign in to comment.