Skip to content

Commit

Permalink
Merge pull request #653 from nats-io/pause-resume
Browse files Browse the repository at this point in the history
[FEAT] create paused consumer - jsm apis to pause and resume a consumer
  • Loading branch information
aricart committed Mar 1, 2024
2 parents 7bfb9fc + cd60b32 commit c3f2bfd
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 2 deletions.
19 changes: 19 additions & 0 deletions jetstream/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,19 @@ export interface ConsumerInfo {
* on servers 2.10.x or better
*/
"ts"?: string;

/**
* Set to true if the consumer is paused.
* This field is only available on servers 2.11.x or better
*/
paused?: boolean;

/**
* If the consumer was paused with a resume date, this field specifies the amount of time
* in nanoseconds remaining until the consumer will be automatically resumed. This field
* is only available on servers 2.11.x or better
*/
"pause_remaining": Nanos;
}

export interface ConsumerListResponse extends ApiResponse, ApiPaged {
Expand Down Expand Up @@ -910,6 +923,12 @@ export interface ConsumerConfig extends ConsumerUpdateConfig {
* How messages are played back to the Consumer
*/
"replay_policy": ReplayPolicy;

/**
* Creates a consumer that is initially paused, but will resume at the specified Date and time.
* Specified as an ISO date time string (Date#toISOString()).
*/
"pause_until"?: string;
}

export interface ConsumerUpdateConfig {
Expand Down
41 changes: 40 additions & 1 deletion jetstream/jsmconsumer_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import {
} from "./jsutil.ts";
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
import { Feature } from "../nats-base-client/semver.ts";
import { JetStreamOptions, NatsConnection } from "../nats-base-client/core.ts";
import {
JetStreamOptions,
Nanos,
NatsConnection,
} from "../nats-base-client/core.ts";
import {
ConsumerApiAction,
ConsumerConfig,
Expand Down Expand Up @@ -72,6 +76,17 @@ export interface ConsumerAPI {
* @param stream
*/
list(stream: string): Lister<ConsumerInfo>;

pause(
stream: string,
name: string,
until?: Date,
): Promise<{ paused: boolean; pause_until?: string }>;

resume(
stream: string,
name: string,
): Promise<{ paused: boolean; pause_until?: string }>;
}

export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
Expand Down Expand Up @@ -212,4 +227,28 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
const subj = `${this.prefix}.CONSUMER.LIST.${stream}`;
return new ListerImpl<ConsumerInfo>(subj, filter, this);
}

pause(
stream: string,
name: string,
until: Date,
): Promise<{ paused: boolean; pause_until: string; pause_remaining: Nanos }> {
const subj = `${this.prefix}.CONSUMER.PAUSE.${stream}.${name}`;
let payload = undefined;
const opts = {
pause_until: until.toISOString(),
};
return this._request(subj, opts) as Promise<
{ paused: boolean; pause_until: string; pause_remaining: Nanos }
>;
}

resume(
stream: string,
name: string,
): Promise<{ paused: boolean; pause_until?: string }> {
return this.pause(stream, name, new Date(0)) as Promise<
{ paused: boolean; pause_until?: string; pause_remaining: Nanos }
>;
}
}
56 changes: 55 additions & 1 deletion jetstream/tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ async function testConsumerNameAPI(nc: NatsConnection) {
return Promise.resolve(ci!);
}

// an named ephemeral
// a named ephemeral
await assertRejects(
async () => {
await addC({
Expand Down Expand Up @@ -2731,3 +2731,57 @@ Deno.test("jsm - api check ok", async () => {
assertEquals(count, 3);
await cleanup(ns, nc);
});

Deno.test("jsm - consumer create paused", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}

const jsm = await nc.jetstreamManager();
jsm.streams.add({
name: "A",
subjects: ["a.>"],
});

const tomorrow = Date.now() + 24 * 60 * 60 * 1000;
const ci = await jsm.consumers.add("A", {
durable_name: "a",
ack_policy: AckPolicy.None,
pause_until: new Date(tomorrow).toISOString(),
});
assertEquals(ci.paused, true);

await cleanup(ns, nc);
});

Deno.test("jsm - pause/unpause", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}

const jsm = await nc.jetstreamManager();
jsm.streams.add({
name: "A",
subjects: ["a.>"],
});

const ci = await jsm.consumers.add("A", {
durable_name: "a",
ack_policy: AckPolicy.None,
});
assertEquals(ci.paused, undefined);

let pi = await jsm.consumers.pause(
"A",
"a",
new Date(Date.now() + 24 * 60 * 60 * 1000),
);
assertEquals(pi.paused, true);

pi = await jsm.consumers.resume("A", "a");
assertEquals(pi.paused, false);

await cleanup(ns, nc);
});

0 comments on commit c3f2bfd

Please sign in to comment.