Skip to content

Commit

Permalink
Merge pull request #531 from nats-io/fix-consumer-drain
Browse files Browse the repository at this point in the history
[FIX] connection drain for consume messages iterators
  • Loading branch information
aricart committed Jun 6, 2023
2 parents a0d00a5 + 40fff42 commit d8191c9
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 1 deletion.
11 changes: 11 additions & 0 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
PullOptions,
ReplayPolicy,
} from "./jsapi_types.ts";
import { SubscriptionImpl } from "../nats-base-client/protocol.ts";

enum PullConsumerType {
Unset = -1,
Expand Down Expand Up @@ -371,6 +372,16 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
},
});

this.sub.closed.then(() => {
// for ordered consumer we cannot break the iterator
if ((this.sub as SubscriptionImpl).draining) {
// @ts-ignore: we are pushing the pull fn
this._push(() => {
this.stop();
});
}
});

if (idle_heartbeat) {
this.monitor = new core.IdleHeartbeat(idle_heartbeat, (data): boolean => {
// for the pull consumer - missing heartbeats may be corrected
Expand Down
28 changes: 28 additions & 0 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
ConsumerStatus,
PullConsumerMessagesImpl,
} from "../consumer.ts";
import { deadline } from "../../nats-base-client/util.ts";

Deno.test("consumers - min supported server", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
Expand Down Expand Up @@ -915,3 +916,30 @@ Deno.test("consumers - sub leaks consume()", async () => {
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("consumers - consume drain", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});
//@ts-ignore: test
const js = nc.jetstream();
const c = await js.consumers.get(stream, stream);
const iter = await c.consume({ expires: 30000 });
setTimeout(() => {
nc.drain();
}, 100);
const done = (async () => {
for await (const _m of iter) {
// nothing
}
})().then();

await deadline(done, 1000);

await cleanup(ns, nc);
});
23 changes: 22 additions & 1 deletion jetstream/tests/consumersordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
notCompatible,
setup,
} from "../../tests/helpers/mod.ts";
import { delay } from "../../nats-base-client/util.ts";
import { deadline, delay } from "../../nats-base-client/util.ts";

Deno.test("ordered - get", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
Expand Down Expand Up @@ -632,3 +632,24 @@ Deno.test("ordered - sub leaks consume()", async () => {
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("ordered - consume drain", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

const js = nc.jetstream();
const c = await js.consumers.get(stream);
const iter = await c.consume({ expires: 30000 });
setTimeout(() => {
nc.drain();
}, 100);
const done = (async () => {
for await (const _m of iter) {
// nothing
}
})().then();

await deadline(done, 1000);

await cleanup(ns, nc);
});
1 change: 1 addition & 0 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
return Promise.reject(NatsError.errorForCode(ErrorCode.SubClosed));
}
if (!this.drained) {
this.draining = true;
this.protocol.unsub(this);
this.drained = this.protocol.flush(deferred<void>())
.then(() => {
Expand Down

0 comments on commit d8191c9

Please sign in to comment.