Skip to content

Commit

Permalink
subscription iterators (#2)
Browse files Browse the repository at this point in the history
Removed callback API for subscription (still possible to provide one as `callback` subscription option.
`Subscription` objects are async iterators.
  • Loading branch information
aricart committed Jul 9, 2020
1 parent c85199e commit ec07630
Show file tree
Hide file tree
Showing 25 changed files with 857 additions and 589 deletions.
11 changes: 4 additions & 7 deletions examples/nats-sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ nc.addEventListener(
);

console.info(`subscribing to ${subject}`);
const sub = nc.subscribe(subject, (err, msg) => {
if (err) {
console.error(err);
return;
}
console.log(`[${sub.getReceived()}]: ${msg.subject}: ${msg.data}`);
});
const sub = nc.subscribe(subject);
for await (const m of sub) {
console.log(`[${sub.getReceived()}]: ${m.subject}: ${m.data}`);
}
6 changes: 3 additions & 3 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ export enum ErrorCode {
DISCONNECT = "DISCONNECT",
INVALID_PAYLOAD_TYPE = "INVALID_PAYLOAD",
NOT_FUNC = "NOT_FUNC",
UNKNOWN = "UNKNOWN_ERROR",
WSS_REQUIRED = "WSS_REQUIRED",
SERVER_OPTION_NA = "SERVER_OPT_NA",

SUB_CLOSED = "SUB_CLOSED",
SUB_DRAINING = "SUB_DRAINING",
TIMEOUT = "TIMEOUT",
UNKNOWN = "UNKNOWN_ERROR",
WSS_REQUIRED = "WSS_REQUIRED",

// emitted by the server
PERMISSIONS_VIOLATION = "PERMISSIONS_VIOLATION",
Expand Down
2 changes: 0 additions & 2 deletions nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ export {
Payload,
Req,
ServersChanged,
Sub,
SubscriptionOptions,
defaultReq,
defaultSub,
} from "./types.ts";
export {
Transport,
Expand Down
64 changes: 31 additions & 33 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

//@ts-ignore
import { extend, isUint8Array } from "./util.ts";
import { deferred, extend, isUint8Array, timeout } from "./util.ts";
import {
Payload,
ConnectionOptions,
Expand All @@ -33,7 +33,7 @@ import {
import { ErrorCode, NatsError } from "./error.ts";
//@ts-ignore
import { Nuid } from "./nuid.ts";
import { DebugEvents, defaultReq, defaultSub } from "./types.ts";
import { DebugEvents, defaultReq } from "./types.ts";
import { parseOptions } from "./options.ts";

export const nuid = new Nuid();
Expand Down Expand Up @@ -105,7 +105,6 @@ export class NatsConnection extends EventTarget {

subscribe(
subject: string,
cb: (error: NatsError | null, msg: Msg) => void,
opts: SubscriptionOptions = {},
): Subscription {
if (this.isClosed()) {
Expand All @@ -119,16 +118,15 @@ export class NatsConnection extends EventTarget {
throw NatsError.errorForCode(ErrorCode.BAD_SUBJECT);
}

let s = defaultSub();
extend(s, opts);
s.subject = subject;
s.callback = cb;
return this.protocol.subscribe(s);
const sub = new Subscription(this.protocol, subject);
extend(sub, opts);
this.protocol.subscribe(sub);
return sub;
}

request(
subject: string,
timeout: number = 1000,
timeoutMillis: number = 1000,
data: any = undefined,
): Promise<Msg> {
if (this.isClosed()) {
Expand All @@ -145,31 +143,31 @@ export class NatsConnection extends EventTarget {
if (subject.length === 0) {
return Promise.reject(NatsError.errorForCode(ErrorCode.BAD_SUBJECT));
}

return new Promise<Msg>((resolve, reject) => {
let r = defaultReq();
let opts = { max: 1 } as RequestOptions;
extend(r, opts);
r.token = nuid.next();
//@ts-ignore
r.timeout = setTimeout(() => {
request.cancel();
reject(NatsError.errorForCode(ErrorCode.CONNECTION_TIMEOUT));
}, timeout);
r.callback = (err: Error | null, msg: Msg) => {
if (err) {
reject(msg);
} else {
resolve(msg);
}
};
let request = this.protocol.request(r);
this.publish(
subject,
data,
`${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
);
const d = deferred<Msg>();
const to = timeout<Msg>(timeoutMillis);
const r = defaultReq();
const opts = { max: 1 } as RequestOptions;
extend(r, opts);
r.token = nuid.next();
r.callback = (err: Error | null, msg: Msg) => {
to.cancel();
if (err) {
d.reject(msg);
} else {
d.resolve(msg);
}
};
const request = this.protocol.request(r);
this.publish(
subject,
data,
`${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
);
const p = Promise.race([to, d]);
p.catch(() => {
request.cancel();
});
return p;
}

/***
Expand Down
Loading

0 comments on commit ec07630

Please sign in to comment.