Skip to content

Commit

Permalink
[chore] renamed NatsHeaders to MsgHdrsImpl (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jul 31, 2020
1 parent 3794bf0 commit a5909bb
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 41 deletions.
11 changes: 7 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

- [X] Refactor core to share with nats.ws as a separate module
- [X] Remove callback from flush()
- [ ] Add timeout from subscription as option
- [ ] Subscription timeouts not notified via callback to the subscription
- [X] Add timeout from subscription as option
- [X] Subscription timeout, reject the iterator
- [X] Changed subscribe signature to just return the subjection (no promise)
- [X] Changed drain/sub.drain to return Promise<void>
- [X] Remove argument from flush()
- [X] Binary apis changed to be Uint8Array
- [X] Subscriptions as iterators
- [X] Stale connection


- [ ] Remove encoders from client, changing payload signatures to Uint8Arrays only.
- [ ] Package nuidjs as its own project
- [ ] Move nats-base-client to its own project
- [ ] Transport send batching
- [ ] Transport certificate authentication

## BUGS

Expand Down
1 change: 1 addition & 0 deletions doc/snippets/sub_timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const sub = nc.subscribe("hello", { timeout: 1000 });
} else {
console.log(`sub iterator got an error!`);
}
nc.close();
});

await nc.closed();
3 changes: 2 additions & 1 deletion examples/nats-pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import { parse } from "https://deno.land/std@0.61.0/flags/mod.ts";
import { ConnectionOptions, connect } from "../src/mod.ts";
import { delay, headers, MsgHdrs } from "../nats-base-client/mod.ts";
import { headers, MsgHdrs } from "../nats-base-client/mod.ts";
import { delay } from "../nats-base-client/internal_mod.ts";

const argv = parse(
Deno.args,
Expand Down
2 changes: 1 addition & 1 deletion examples/nats-req.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { parse } from "https://deno.land/std@0.61.0/flags/mod.ts";
import { ConnectionOptions, connect } from "../src/mod.ts";
import { delay } from "../nats-base-client/mod.ts";
import { delay } from "../nats-base-client/internal_mod.ts";

const argv = parse(
Deno.args,
Expand Down
34 changes: 17 additions & 17 deletions nats-base-client/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ export interface MsgHdrs extends Iterable<[string, string[]]> {
}

export function headers(): MsgHdrs {
return new NatsHeaders();
return new MsgHdrsImpl();
}

export class NatsHeaders implements MsgHdrs {
export class MsgHdrsImpl implements MsgHdrs {
static CRLF = "\r\n";
static SEP = ":";
static HEADER = "NATS/1.0";
Expand All @@ -51,7 +51,7 @@ export class NatsHeaders implements MsgHdrs {
return count;
}

equals(mh: NatsHeaders) {
equals(mh: MsgHdrsImpl) {
if (
mh && this.headers.size === mh.headers.size &&
this.error === mh.error
Expand All @@ -74,18 +74,18 @@ export class NatsHeaders implements MsgHdrs {
return false;
}

static decode(a: Uint8Array): NatsHeaders {
const mh = new NatsHeaders();
static decode(a: Uint8Array): MsgHdrsImpl {
const mh = new MsgHdrsImpl();
const s = new TextDecoder().decode(a);
const lines = s.split(NatsHeaders.CRLF);
const lines = s.split(MsgHdrsImpl.CRLF);
const h = lines[0];
if (h !== NatsHeaders.HEADER) {
const str = h.replace(NatsHeaders.HEADER, "");
if (h !== MsgHdrsImpl.HEADER) {
const str = h.replace(MsgHdrsImpl.HEADER, "");
mh.error = parseInt(str, 10);
} else {
lines.slice(1).map((s) => {
if (s) {
const idx = s.indexOf(NatsHeaders.SEP);
const idx = s.indexOf(MsgHdrsImpl.SEP);
const k = s.slice(0, idx);
const v = s.slice(idx + 1);
mh.append(k, v);
Expand All @@ -99,7 +99,7 @@ export class NatsHeaders implements MsgHdrs {
if (this.headers.size === 0) {
return "";
}
let s = NatsHeaders.HEADER;
let s = MsgHdrsImpl.HEADER;
for (const [k, v] of this.headers) {
for (let i = 0; i < v.length; i++) {
s = `${s}\r\n${k}:${v[i]}`;
Expand Down Expand Up @@ -168,7 +168,7 @@ export class NatsHeaders implements MsgHdrs {
}

get(k: string): string {
const key = NatsHeaders.canonicalMIMEHeaderKey(k);
const key = MsgHdrsImpl.canonicalMIMEHeaderKey(k);
const a = this.headers.get(key);
return a ? a[0] : "";
}
Expand All @@ -178,14 +178,14 @@ export class NatsHeaders implements MsgHdrs {
}

set(k: string, v: string): void {
const key = NatsHeaders.canonicalMIMEHeaderKey(k);
const value = NatsHeaders.validHeaderValue(v);
const key = MsgHdrsImpl.canonicalMIMEHeaderKey(k);
const value = MsgHdrsImpl.validHeaderValue(v);
this.headers.set(key, [value]);
}

append(k: string, v: string): void {
const key = NatsHeaders.canonicalMIMEHeaderKey(k);
const value = NatsHeaders.validHeaderValue(v);
const key = MsgHdrsImpl.canonicalMIMEHeaderKey(k);
const value = MsgHdrsImpl.validHeaderValue(v);
let a = this.headers.get(key);
if (!a) {
a = [];
Expand All @@ -195,12 +195,12 @@ export class NatsHeaders implements MsgHdrs {
}

values(k: string): string[] {
const key = NatsHeaders.canonicalMIMEHeaderKey(k);
const key = MsgHdrsImpl.canonicalMIMEHeaderKey(k);
return this.headers.get(key) || [];
}

delete(k: string): void {
const key = NatsHeaders.canonicalMIMEHeaderKey(k);
const key = MsgHdrsImpl.canonicalMIMEHeaderKey(k);
this.headers.delete(key);
}
}
5 changes: 4 additions & 1 deletion nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export {
Subscription,
ServerInfo,
} from "./types.ts";
export {
MsgImpl,
} from "./msg.ts";
export {
SubscriptionImpl,
} from "./subscription.ts";
Expand Down Expand Up @@ -40,7 +43,7 @@ export {
timeout,
} from "./util.ts";
export {
NatsHeaders,
MsgHdrsImpl,
MsgHdrs,
headers,
} from "./headers.ts";
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ export {
headers,
jwtAuthenticator,
Msg,
MsgHdrs,
NatsConnection,
NatsError,
NatsHeaders,
nkeyAuthenticator,
Nuid,
Payload,
Expand Down
6 changes: 3 additions & 3 deletions nats-base-client/msgImpl.ts → nats-base-client/msg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/
import { Msg } from "./types.ts";
import { NatsHeaders } from "./headers.ts";
import { MsgHdrsImpl } from "./headers.ts";
import { Publisher } from "./protocol.ts";

export class MsgImpl implements Msg {
Expand All @@ -22,14 +22,14 @@ export class MsgImpl implements Msg {
sid!: number;
reply?: string;
data?: any;
headers?: NatsHeaders;
headers?: MsgHdrsImpl;

constructor(publisher: Publisher) {
this.publisher = publisher;
}

// eslint-ignore-next-line @typescript-eslint/no-explicit-any
respond(data?: any, headers?: NatsHeaders): boolean {
respond(data?: any, headers?: MsgHdrsImpl): boolean {
if (this.reply) {
this.publisher.publish(this.reply, data, { headers: headers });
return true;
Expand Down
6 changes: 3 additions & 3 deletions nats-base-client/msgbuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
*/
import { Msg, Payload } from "./types.ts";
import { ErrorCode, NatsError } from "./error.ts";
import { MsgImpl } from "./msgImpl.ts";
import { MsgImpl } from "./msg.ts";
import { CR_LF_LEN } from "./util.ts";
import { DataBuffer } from "./databuffer.ts";
import { NatsHeaders } from "./headers.ts";
import { MsgHdrsImpl } from "./headers.ts";
import { Publisher } from "./protocol.ts";

export class MsgBuffer {
Expand Down Expand Up @@ -60,7 +60,7 @@ export class MsgBuffer {
? this.buf.slice(0, this.headerLen)
: undefined;
if (headers) {
this.msg.headers = NatsHeaders.decode(headers);
this.msg.headers = MsgHdrsImpl.decode(headers);
}
this.msg.data = this.buf.slice(this.headerLen, this.buf.length - 2);

Expand Down
4 changes: 2 additions & 2 deletions nats-base-client/muxsubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import { Request } from "./request.ts";
import { Msg } from "./types.ts";
import { ErrorCode, NatsError } from "./error.ts";
import { NatsHeaders } from "./headers.ts";
import { MsgHdrsImpl } from "./headers.ts";
import { createInbox } from "./protocol.ts";

export class MuxSubscription {
Expand Down Expand Up @@ -61,7 +61,7 @@ export class MuxSubscription {
let r = this.get(token);
if (r) {
if (err === null && m.headers) {
const headers = m.headers as NatsHeaders;
const headers = m.headers as MsgHdrsImpl;
if (headers.error) {
err = new NatsError(
headers.error.toString(),
Expand Down
4 changes: 2 additions & 2 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { Nuid } from "./nuid.ts";
import { DataBuffer } from "./databuffer.ts";
import { Server, Servers } from "./servers.ts";
import { QueuedIterator } from "./queued_iterator.ts";
import { MsgHdrs, NatsHeaders } from "./headers.ts";
import { MsgHdrs, MsgHdrsImpl } from "./headers.ts";
import { SubscriptionImpl } from "./subscription.ts";
import { Subscriptions } from "./subscriptions.ts";
import { MuxSubscription } from "./muxsubscription.ts";
Expand Down Expand Up @@ -492,7 +492,7 @@ export class ProtocolHandler {
if (!this.options.headers) {
throw new NatsError("headers", ErrorCode.SERVER_OPTION_NA);
}
const hdrs = options.headers as NatsHeaders;
const hdrs = options.headers as MsgHdrsImpl;
const h = hdrs.encode();
data = DataBuffer.concat(h, data);
len = data.length;
Expand Down
12 changes: 6 additions & 6 deletions tests/msgheaders_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import {
assertThrows,
assertArrayContains,
} from "https://deno.land/std@0.61.0/testing/asserts.ts";
import { NatsError, NatsHeaders } from "../src/mod.ts";
import { NatsError, MsgHdrsImpl } from "../nats-base-client/internal_mod.ts";

Deno.test("msgheaders - basics", () => {
const h = new NatsHeaders();
const h = new MsgHdrsImpl();
assertEquals(h.size(), 0);
assert(!h.has("foo"));
h.append("foo", "bar");
Expand All @@ -34,7 +34,7 @@ Deno.test("msgheaders - basics", () => {
h.delete("bar-foo");
assertEquals(h.size(), 3);

let header = NatsHeaders.canonicalMIMEHeaderKey("foo");
let header = MsgHdrsImpl.canonicalMIMEHeaderKey("foo");
assertEquals("Foo", header);
assert(h.has("Foo"));
assert(h.has("foo"));
Expand All @@ -43,22 +43,22 @@ Deno.test("msgheaders - basics", () => {
assertArrayContains(foos, ["bar", "bam"]);
assert(foos.indexOf("baz") === -1);

header = NatsHeaders.canonicalMIMEHeaderKey("foo-bar");
header = MsgHdrsImpl.canonicalMIMEHeaderKey("foo-bar");
assertEquals("Foo-Bar", header);
const foobars = h.values(header);
assertEquals(1, foobars.length);
assertArrayContains(foobars, ["baz"]);

const a = h.encode();
const hh = NatsHeaders.decode(a);
const hh = MsgHdrsImpl.decode(a);
assert(h.equals(hh));

hh.set("foo-bar-baz", "fbb");
assert(!h.equals(hh));
});

Deno.test("msgheaders - illegal key", () => {
const h = new NatsHeaders();
const h = new MsgHdrsImpl();
["bad:", "bad ", String.fromCharCode(127)].forEach((v) => {
assertThrows(() => {
h.set(v, "aaa");
Expand Down

0 comments on commit a5909bb

Please sign in to comment.