Skip to content

Commit

Permalink
[FIX] [CONSUMERS] fixed subscription leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 18, 2023
1 parent 17d3a76 commit fa87854
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 3 deletions.
1 change: 1 addition & 0 deletions nats-base-client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class PullConsumerImpl implements Consumer {
(s.data as number) >= 2
) {
d.reject(new Error("consumer missed heartbeats"));
break;
}
}
})().catch();
Expand Down
7 changes: 6 additions & 1 deletion nats-base-client/consumermessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
});

const { sub } = this;
if (sub) {
sub.unsubscribe();
}

this.sub = c.api.nc.subscribe(this.inbox, {
callback: (err, msg) => {
if (err) {
Expand Down Expand Up @@ -333,7 +338,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
if (bytesLeft) {
discard.bytesLeft = parseInt(bytesLeft);
}
// FIXME: batch complete header goes here...

return discard;
}
Expand Down Expand Up @@ -363,6 +367,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}

stop(err?: Error) {
this.sub?.unsubscribe();
this.clearTimers();
//@ts-ignore: fn
this._push(() => {
Expand Down
73 changes: 73 additions & 0 deletions tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,3 +836,76 @@ Deno.test("consumers - next", async () => {

await cleanup(ns, nc);
});

Deno.test("consumers - sub leaks next()", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
const js = nc.jetstream();
const c = await js.consumers.get(stream, stream);
await c.next({ expires: 1000 });
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("consumers - sub leaks fetch()", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
const js = nc.jetstream();
const c = await js.consumers.get(stream, stream);
const iter = await c.fetch({ expires: 1000 });
const done = (async () => {
for await (const _m of iter) {
// nothing
}
})().then();
await done;
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("consumers - sub leaks consume()", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
const js = nc.jetstream();
const c = await js.consumers.get(stream, stream);
const iter = await c.consume({ expires: 30000 });
const done = (async () => {
for await (const _m of iter) {
// nothing
}
})().then();
setTimeout(() => {
iter.close();
}, 1000);

await done;
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});
79 changes: 77 additions & 2 deletions tests/consumersordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
* limitations under the License.
*/

import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts";
import {
cleanup,
initStream,
jetstreamServerConf,
setup,
} from "./jstest_util.ts";
import {
assertEquals,
assertExists,
assertRejects,
} from "https://deno.land/std@0.125.0/testing/asserts.ts";
import { OrderedPullConsumerImpl } from "../nats-base-client/consumer.ts";
import { DeliverPolicy, JsMsg } from "../nats-base-client/types.ts";
import { AckPolicy, DeliverPolicy, JsMsg } from "../nats-base-client/types.ts";
import { deferred } from "../nats-base-client/mod.ts";
import { notCompatible } from "./helpers/mod.ts";
import { delay } from "../nats-base-client/util.ts";
Expand Down Expand Up @@ -608,3 +613,73 @@ Deno.test("ordered - next", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered - sub leaks next()", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const { stream } = await initStream(nc);

//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
const js = nc.jetstream();
const c = await js.consumers.get(stream);
await c.next({ expires: 1000 });
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("ordered - sub leaks fetch()", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const { stream } = await initStream(nc);

//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
const js = nc.jetstream();
const c = await js.consumers.get(stream);
const iter = await c.fetch({ expires: 1000 });
const done = (async () => {
for await (const _m of iter) {
// nothing
}
})().then();
await done;
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

Deno.test("ordered - sub leaks consume()", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}
const { stream } = await initStream(nc);

//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
const js = nc.jetstream();
const c = await js.consumers.get(stream);
const iter = await c.consume({ expires: 30000 });
const done = (async () => {
for await (const _m of iter) {
// nothing
}
})().then();
setTimeout(() => {
iter.close();
}, 1000);

await done;
//@ts-ignore: test
assertEquals(nc.protocol.subscriptions.size(), 1);
await cleanup(ns, nc);
});

0 comments on commit fa87854

Please sign in to comment.