Skip to content

Commit

Permalink
[CHANGE] ordered consumers can not be created by calling `js.consumer…
Browse files Browse the repository at this point in the history
…s.get()` (#505)
  • Loading branch information
aricart committed May 8, 2023
1 parent b22aef0 commit acd6837
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 38 deletions.
8 changes: 7 additions & 1 deletion nats-base-client/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ export class ConsumersImpl implements Consumers {
return Promise.resolve();
}

async get(stream: string, name: string): Promise<Consumer> {
async get(
stream: string,
name: string | Partial<OrderedConsumerOptions> = {},
): Promise<Consumer> {
if (typeof name === "object") {
return this.ordered(stream, name);
}
// check we have support for pending msgs and header notifications
await this.checkVersion();

Expand Down
25 changes: 10 additions & 15 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1075,26 +1075,21 @@ export interface JetStreamClient {
export interface Consumers {
/**
* Returns the Consumer configured for the specified stream having the specified name.
* Consumers are created with {@link JetStreamManager}.
* Consumers are typically created with {@link JetStreamManager}. If no name is specified,
* the Consumers API will return an ordered consumer.
*
* {@link Consumer}.
* @param stream
* @param name
*/
get(stream: string, name: string): Promise<Consumer>;

/**
* Returns an ordered consumer for the specified stream. Ordered consumers
* track that messages must be delivered in order. If there's any inconsistency
* the ordered consumer will recreate the underlying consumer at the appropiate
* sequence.
* An ordered consumer expects messages to be delivered in order. If there's
* any inconsistency, the ordered consumer will recreate the underlying consumer at the
* correct sequence. Note that ordered consumers don't yield messages that can be acked
* because the client can simply recreate the consumer.
*
* {@link Consumer}.
* @param stream
* @param opts
* @param name or OrderedConsumer options - if not specified an ordered consumer is returned.
*/
ordered(
get(
stream: string,
opts?: Partial<OrderedConsumerOptions>,
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Deno.test("consumers - min supported server", async () => {

await assertRejects(
async () => {
await js.consumers.ordered(stream);
await js.consumers.get(stream);
},
Error,
"consumers framework is only supported on servers",
Expand Down
42 changes: 21 additions & 21 deletions tests/consumersordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Deno.test("ordered - get", async () => {

await assertRejects(
async () => {
await js.consumers.ordered("a");
await js.consumers.get("a");
},
Error,
"stream not found",
Expand All @@ -44,7 +44,7 @@ Deno.test("ordered - get", async () => {
await jsm.streams.add({ name: "test", subjects: ["test"] });
await js.publish("test");

const oc = await js.consumers.ordered("test") as OrderedPullConsumerImpl;
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
assertExists(oc);

const ci = await oc.info();
Expand All @@ -67,7 +67,7 @@ Deno.test("ordered - fetch", async () => {
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.ordered("test") as OrderedPullConsumerImpl;
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
assertExists(oc);

let iter = await oc.fetch({ max_messages: 1 });
Expand Down Expand Up @@ -98,7 +98,7 @@ Deno.test("ordered - fetch reset", async () => {
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.ordered("test") as OrderedPullConsumerImpl;
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
assertExists(oc);

const seen: number[] = new Array(3).fill(0);
Expand Down Expand Up @@ -182,7 +182,7 @@ Deno.test("ordered - consume reset", async () => {
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.ordered("test") as OrderedPullConsumerImpl;
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
assertExists(oc);

const seen: number[] = new Array(3).fill(0);
Expand Down Expand Up @@ -227,7 +227,7 @@ Deno.test("ordered - consume", async () => {
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.ordered("test") as OrderedPullConsumerImpl;
const oc = await js.consumers.get("test") as OrderedPullConsumerImpl;
assertExists(oc);

const iter = await oc.consume({ max_messages: 1 });
Expand All @@ -254,7 +254,7 @@ Deno.test("ordered - filters consume", async () => {
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.ordered("test", { filterSubjects: ["test.b"] });
const oc = await js.consumers.get("test", { filterSubjects: ["test.b"] });
assertExists(oc);

const iter = await oc.consume();
Expand Down Expand Up @@ -284,7 +284,7 @@ Deno.test("ordered - filters fetch", async () => {
await js.publish("test.b");
await js.publish("test.c");

const oc = await js.consumers.ordered("test", { filterSubjects: ["test.b"] });
const oc = await js.consumers.get("test", { filterSubjects: ["test.b"] });
assertExists(oc);

const iter = await oc.fetch({ expires: 1000 });
Expand All @@ -305,7 +305,7 @@ Deno.test("ordered - fetch reject consumer type change or concurrency", async ()
await jsm.streams.add({ name: "test", subjects: ["test.*"] });

const js = nc.jetstream();
const oc = await js.consumers.ordered("test");
const oc = await js.consumers.get("test");
const iter = await oc.fetch({ expires: 3000 });
(async () => {
for await (const _r of iter) {
Expand Down Expand Up @@ -343,7 +343,7 @@ Deno.test("ordered - consume reject consumer type change or concurrency", async
await jsm.streams.add({ name: "test", subjects: ["test.*"] });

const js = nc.jetstream();
const oc = await js.consumers.ordered("test");
const oc = await js.consumers.get("test");
const iter = await oc.consume({ expires: 3000 });
(async () => {
for await (const _r of iter) {
Expand Down Expand Up @@ -386,7 +386,7 @@ Deno.test("ordered - last per subject", async () => {
js.publish("test.a"),
]);

let oc = await js.consumers.ordered("test", {
let oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.LastPerSubject,
});
let iter = await oc.fetch({ max_messages: 1 });
Expand All @@ -396,7 +396,7 @@ Deno.test("ordered - last per subject", async () => {
}
})();

oc = await js.consumers.ordered("test", {
oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.LastPerSubject,
});
iter = await oc.consume({ max_messages: 1 });
Expand Down Expand Up @@ -424,7 +424,7 @@ Deno.test("ordered - start sequence", async () => {
js.publish("test.b"),
]);

let oc = await js.consumers.ordered("test", {
let oc = await js.consumers.get("test", {
opt_start_seq: 2,
});

Expand All @@ -435,7 +435,7 @@ Deno.test("ordered - start sequence", async () => {
}
})();

oc = await js.consumers.ordered("test", {
oc = await js.consumers.get("test", {
opt_start_seq: 2,
});
iter = await oc.consume({ max_messages: 1 });
Expand Down Expand Up @@ -464,7 +464,7 @@ Deno.test("ordered - last", async () => {
js.publish("test.b"),
]);

let oc = await js.consumers.ordered("test", {
let oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.Last,
});

Expand All @@ -476,7 +476,7 @@ Deno.test("ordered - last", async () => {
}
})();

oc = await js.consumers.ordered("test", {
oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.Last,
});
iter = await oc.consume({ max_messages: 1 });
Expand Down Expand Up @@ -505,7 +505,7 @@ Deno.test("ordered - new", async () => {
js.publish("test.b"),
]);

let oc = await js.consumers.ordered("test", {
let oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.New,
});

Expand All @@ -518,7 +518,7 @@ Deno.test("ordered - new", async () => {
}
})();

oc = await js.consumers.ordered("test", {
oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.New,
});
iter = await oc.consume({ max_messages: 1 });
Expand Down Expand Up @@ -551,7 +551,7 @@ Deno.test("ordered - start time", async () => {
await delay(500);
const date = new Date().toISOString();

let oc = await js.consumers.ordered("test", {
let oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.StartTime,
opt_start_time: date,
});
Expand All @@ -566,7 +566,7 @@ Deno.test("ordered - start time", async () => {
}
})();

oc = await js.consumers.ordered("test", {
oc = await js.consumers.get("test", {
deliver_policy: DeliverPolicy.StartTime,
opt_start_time: date,
});
Expand All @@ -591,7 +591,7 @@ Deno.test("ordered - next", async () => {
await jsm.streams.add({ name: "test", subjects: ["test"] });
const js = nc.jetstream();

const c = await js.consumers.ordered("test");
const c = await js.consumers.get("test");
let m = await c.next({ expires: 1000 });
assertEquals(m, null);

Expand Down

0 comments on commit acd6837

Please sign in to comment.