Skip to content

Commit

Permalink
[feat] added callback pre/post processing logic for ordered consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Oct 15, 2021
1 parent b2fa201 commit d3e3521
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 45 deletions.
35 changes: 14 additions & 21 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,6 @@ export class JetStreamClientImpl extends BaseApiClient
so.dispatchedFn = autoAckJsMsg;
}
if (jsi.callbackFn) {
//FIXME: need a callback here that uses the filters
// previously the protocol messages were filtered out
// now they are not.
so.callback = jsi.callbackFn;
}

Expand Down Expand Up @@ -556,19 +553,15 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
const subj = `${info.api.prefix}.CONSUMER.CREATE.${info.stream}`;

this.js._request(subj, this.info.config)
.catch(async (err) => {
this.sub.drain()
.then(() => {
const nerr = new NatsError(
`unable to recreate ordered consumer ${info.stream} at seq ${sseq}`,
ErrorCode.RequestError,
err,
);
this.sub.callback(nerr, {} as Msg);
})
.catch(() => {
// the sub should report it
});
.catch((err) => {
// to inform the subscription we inject an error this will
// be at after the last message if using an iterator.
const nerr = new NatsError(
`unable to recreate ordered consumer ${info.stream} at seq ${sseq}`,
ErrorCode.RequestError,
err,
);
this.sub.callback(nerr, {} as Msg);
});
}

Expand Down Expand Up @@ -658,11 +651,11 @@ interface JetStreamSubscriptionInfo extends ConsumerOpts {
api: BaseApiClient;
attached: boolean;
deliver: string;
ordered_consumer_sequence: { delivery_seq: number; stream_seq: number };
flow_control: {
heartbeat_count: number;
fc_count: number;
consumer_restarts: number;
"ordered_consumer_sequence": { "delivery_seq": number; "stream_seq": number };
"flow_control": {
"heartbeat_count": number;
"fc_count": number;
"consumer_restarts": number;
};
}

Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
if (this.noIterator) {
const uc = this.callback;

let ingestion = opts.ingestionFilterFn
const ingestion = opts.ingestionFilterFn
? opts.ingestionFilterFn
: (): IngestionFilterFnResult => {
return { ingest: true, protocol: false };
Expand Down
19 changes: 14 additions & 5 deletions nats-base-client/typedsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,20 @@ export class TypedSubscription<T> extends QueuedIteratorImpl<T>
const uh = opts.callback;
callback = (err: NatsError | null, msg: Msg) => {
const [jer, tm] = this.adapter(err, msg);
const ok = this.protocolFilterFn ? this.protocolFilterFn(tm) : true;
if (ok) {
uh(jer, tm);
if (this.dispatchedFn && tm) {
this.dispatchedFn(tm);
if (jer) {
uh(jer, null);
return;
}
const { ingest } = this.ingestionFilterFn
? this.ingestionFilterFn(tm, this)
: { ingest: true };
if (ingest) {
const ok = this.protocolFilterFn ? this.protocolFilterFn(tm) : true;
if (ok) {
uh(jer, tm);
if (this.dispatchedFn && tm) {
this.dispatchedFn(tm);
}
}
}
};
Expand Down
64 changes: 47 additions & 17 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2219,23 +2219,32 @@ Deno.test("jetstream - redelivery property works", async () => {
await cleanup(ns, nc);
});

Deno.test("jetstream - ordered consumer", async () => {
async function ocTest(
N: number,
S: number,
callback: boolean,
): Promise<void> {
if (N % 10 !== 0) {
throw new Error("N must be divisible by 10");
}

const storage = N * S + (1024 * 1024);
const { ns, nc } = await setup(jetstreamServerConf({
jetstream: {
max_file_store: 1024 * 1024 * 25,
max_file_store: storage,
},
}, true));
const { subj } = await initStream(nc);
const js = nc.jetstream();

const S = 1024;
const buf = new Uint8Array(S);
for (let i = 0; i < S; i++) {
buf[i] = "a".charCodeAt(0) + (i % 26);
}

const N = 500;
for (let i = 0; i < N; i++) {
// speed up the loading by sending 10 at time
const n = N / 10;
for (let i = 0; i < n; i++) {
await Promise.all([
js.publish(subj, buf),
js.publish(subj, buf),
Expand All @@ -2250,21 +2259,36 @@ Deno.test("jetstream - ordered consumer", async () => {
]);
}

const lock = Lock(N * 10, 1000 * 60);
const lock = Lock(N, 1000 * 60);
const opts = consumerOpts({ idle_heartbeat: nanos(1000) });
opts.orderedConsumer();
if (callback) {
opts.callback((err: NatsError | null, msg: JsMsg | null): void => {
if (err) {
fail(err.message);
return;
}
if (!msg) {
fail(`didn't expect to get null msg`);
return;
}
lock.unlock();
});
}

const sub = await js.subscribe(subj, opts);
(async () => {
for await (const jm of sub) {
lock.unlock();
}
})().then();
if (!callback) {
(async () => {
for await (const _jm of sub) {
lock.unlock();
}
})().then();
}
await lock;
//@ts-ignore: test
assertEquals(sub.sub.info.ordered_consumer_sequence.stream_seq, N * 10);
assertEquals(sub.sub.info.ordered_consumer_sequence.stream_seq, N);
//@ts-ignore: test
assertEquals(sub.sub.info.ordered_consumer_sequence.delivery_seq, N * 10);
assertEquals(sub.sub.info.ordered_consumer_sequence.delivery_seq, N);

await delay(3 * 1000);
// @ts-ignore: test
Expand All @@ -2277,8 +2301,6 @@ Deno.test("jetstream - ordered consumer", async () => {
// @ts-ignore: test
assert(sub.sub.info.flow_control.consumer_restarts >= 0);

//@ts-ignore

// @ts-ignore: test
assert(sub.sub.info.flow_control.heartbeat_count > 0);

Expand All @@ -2288,8 +2310,16 @@ Deno.test("jetstream - ordered consumer", async () => {
assertEquals(ci.config.ack_policy, AckPolicy.None);
assertEquals(ci.config.max_deliver, 1);
assertEquals(ci.num_pending, 0);
assertEquals(ci.delivered.consumer_seq, N * 10);
assertEquals(ci.delivered.stream_seq, N * 10);
assertEquals(ci.delivered.consumer_seq, N);
assertEquals(ci.delivered.stream_seq, N);

await cleanup(ns, nc);
}

Deno.test("jetstream - ordered consumer callback", async () => {
await ocTest(500, 1024, true);
});

Deno.test("jetstream - ordered consumer iterator", async () => {
await ocTest(500, 1024, false);
});
2 changes: 1 addition & 1 deletion tests/sub_extensions_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ Deno.test("extensions - filter called on callback", async () => {
const subj = createInbox();
const sub = nc.subscribe(subj, {
max: 3,
callback: (err, m) => {
callback: (_err, m) => {
processed.push(sc.decode(m.data));
},
}) as SubscriptionImpl;
Expand Down

0 comments on commit d3e3521

Please sign in to comment.