Skip to content

Commit

Permalink
[FEAT] [EXPERIMENTAL] noMux support for requestMany
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jul 28, 2022
1 parent 2c67f4c commit a9cc69b
Show file tree
Hide file tree
Showing 3 changed files with 377 additions and 27 deletions.
156 changes: 129 additions & 27 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,47 +174,149 @@ export class NatsConnectionImpl implements NatsConnection {
return Promise.reject(new NatsError("timeout", ErrorCode.InvalidOption));
}

// the iterator for user results
const qi = new QueuedIteratorImpl<Msg | Error>();
const stop = () => {
qi.stop();
};
function stop() {
//@ts-ignore: stop function
qi.push(() => {
qi.stop();
});
}

const callback = (err: Error | null, msg: Msg | null) => {
// callback for the subscription or the mux handler
// simply pushes errors and messages into the iterator
function callback(err: Error | null, msg: Msg | null) {
if (err || msg === null) {
// FIXME: the stop function should not require commenting
if (err !== null) {
qi.push(err);
}
//@ts-ignore: stop function after consuming
qi.push(stop);
stop();
} else {
qi.push(msg);
}
};
}

const rmo = opts as RequestManyOptionsInternal;
rmo.callback = callback;
if (opts.noMux) {
// we setup a subscription and manage it
const stack = new Error().stack;
let max = typeof opts.maxMessages === "number" && opts.maxMessages > 0
? opts.maxMessages
: -1;

const sub = this.subscribe(createInbox(this.options.inboxPrefix), {
callback: (err, msg) => {
// we only expect runtime errors or a no responders
if (
msg.data.length === 0 &&
msg?.headers?.status === ErrorCode.NoResponders
) {
err = NatsError.errorForCode(ErrorCode.NoResponders);
}
// augment any error with the current stack to provide context
// for the error on the suer code
if (err) {
err.stack += `\n\n${stack}`;
cancel(err);
return;
}
// push the message
callback(null, msg);
// see if the m request is completed
if (opts.strategy === RequestStrategy.Count) {
max--;
if (max === 0) {
cancel();
}
}
if (opts.strategy === RequestStrategy.JitterTimer) {
clearTimers();
timer = setTimeout(() => {
cancel();
}, 300);
}
if (opts.strategy === RequestStrategy.SentinelMsg) {
if (msg && msg.data.length === 0) {
cancel();
}
}
},
});

qi.iterClosed.then(() => {
r.cancel();
}).catch((err) => {
r.cancel(err);
});
sub.closed
.then(() => {
stop();
})
.catch((err: Error) => {
qi.push(err);
stop();
});

const r = new RequestMany(this.protocol.muxSubscriptions, subject, rmo);
this.protocol.request(r);
const cancel = (err?: Error) => {
if (err) {
qi.push(err);
}
clearTimers();
sub.drain()
.then(() => {
stop();
})
.catch((_err: Error) => {
stop();
});
};

qi.iterClosed
.then(() => {
clearTimers();
sub?.unsubscribe();
})
.catch((_err) => {
clearTimers();
sub?.unsubscribe();
});

try {
this.publish(
subject,
data,
{
reply: `${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
headers: opts.headers,
},
);
} catch (err) {
r.cancel(err);
try {
this.publish(subject, Empty, { reply: sub.getSubject() });
} catch (err) {
cancel(err);
}

let timer = setTimeout(() => {
cancel();
}, opts.maxWait);

const clearTimers = () => {
if (timer) {
clearTimeout(timer);
}
};
} else {
// the ingestion is the RequestMany
const rmo = opts as RequestManyOptionsInternal;
rmo.callback = callback;

qi.iterClosed.then(() => {
r.cancel();
}).catch((err) => {
r.cancel(err);
});

const r = new RequestMany(this.protocol.muxSubscriptions, subject, rmo);
this.protocol.request(r);

try {
this.publish(
subject,
data,
{
reply: `${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
headers: opts.headers,
},
);
} catch (err) {
r.cancel(err);
}
}

return Promise.resolve(qi);
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ export interface RequestManyOptions {
maxWait: number;
headers?: MsgHdrs;
maxMessages?: number;
noMux?: boolean;
}

export interface PublishOptions {
Expand Down
Loading

0 comments on commit a9cc69b

Please sign in to comment.