Skip to content

Commit

Permalink
Merge branch 'dev' into js-s2
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Feb 8, 2023
2 parents 067f90f + 06e1a14 commit 3394a7e
Show file tree
Hide file tree
Showing 19 changed files with 663 additions and 86 deletions.
29 changes: 22 additions & 7 deletions nats-base-client/authenticator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 The NATS Authors
* Copyright 2020-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -73,22 +73,37 @@ export interface Authenticator {
(nonce?: string): Auth;
}

function multiAuthenticator(authenticators: Authenticator[]) {
return (nonce?: string): Auth => {
let auth: Partial<NoAuth & TokenAuth & UserPass & NKeyAuth & JwtAuth> = {};
authenticators.forEach((a) => {
const args = a(nonce) || {};
auth = Object.assign(auth, args);
});
return auth as Auth;
};
}

export function buildAuthenticator(
opts: ConnectionOptions,
): Authenticator {
const buf: Authenticator[] = [];
// jwtAuthenticator is created by the user, since it
// will require possibly reading files which
// some of the clients are simply unable to do
if (opts.authenticator) {
return opts.authenticator;
if (typeof opts.authenticator === "function") {
buf.push(opts.authenticator);
}
if (Array.isArray(opts.authenticator)) {
buf.push(...opts.authenticator);
}
if (opts.token) {
return tokenAuthenticator(opts.token);
buf.push(tokenAuthenticator(opts.token));
}
if (opts.user) {
return usernamePasswordAuthenticator(opts.user, opts.pass);
buf.push(usernamePasswordAuthenticator(opts.user, opts.pass));
}
return noAuthFn();
return buf.length === 0 ? noAuthFn() : multiAuthenticator(buf);
}

export function noAuthFn(): Authenticator {
Expand Down Expand Up @@ -174,7 +189,7 @@ export function jwtAuthenticator(
/**
* Returns an Authenticator function that returns a JwtAuth.
* This is a convenience Authenticator that parses the
* specifid creds and delegates to the jwtAuthenticator.
* specified creds and delegates to the jwtAuthenticator.
* @param {Uint8Array | () => Uint8Array } creds - the contents of a creds file or a function that returns the creds
* @returns {JwtAuth}
*/
Expand Down
16 changes: 15 additions & 1 deletion nats-base-client/jsmdirect_api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The NATS Authors
* Copyright 2022-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -27,6 +27,8 @@ import {
} from "./types.ts";
import { checkJsError, validateStreamName } from "./jsutil.ts";
import { MsgHdrs } from "./headers.ts";
import { Codec, JSONCodec } from "./codec.ts";
import { TD } from "./encoders.ts";

export class DirectStreamAPIImpl extends BaseApiClient
implements DirectStreamAPI {
Expand Down Expand Up @@ -71,6 +73,7 @@ export class DirectStreamAPIImpl extends BaseApiClient
export class DirectMsgImpl implements DirectMsg {
data: Uint8Array;
header: MsgHdrs;
static jc?: Codec<unknown>;

constructor(m: Msg) {
if (!m.headers) {
Expand All @@ -96,4 +99,15 @@ export class DirectMsgImpl implements DirectMsg {
get stream(): string {
return this.header.get(DirectMsgHeaders.Stream);
}

json<T = unknown>(): T {
if (!DirectMsgImpl.jc) {
DirectMsgImpl.jc = JSONCodec();
}
return DirectMsgImpl.jc.decode(this.data) as T;
}

string(): string {
return TD.decode(this.data);
}
}
10 changes: 9 additions & 1 deletion nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -193,4 +193,12 @@ export class JsMsgImpl implements JsMsg {
term() {
this.doAck(TERM);
}

json<T = unknown>(): T {
return this.msg.json();
}

string(): string {
return this.msg.string();
}
}
16 changes: 15 additions & 1 deletion nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -46,6 +46,8 @@ import { validateStreamName } from "./jsutil.ts";
import { headers, MsgHdrs, MsgHdrsImpl } from "./headers.ts";
import { kvPrefix, KvStatusImpl } from "./kv.ts";
import { ObjectStoreStatusImpl, osPrefix } from "./objectstore.ts";
import { Codec, JSONCodec } from "./codec.ts";
import { TD } from "./encoders.ts";

export function convertStreamSourceDomain(s?: StreamSource) {
if (s === undefined) {
Expand Down Expand Up @@ -309,6 +311,7 @@ export class StoredMsgImpl implements StoredMsg {
data: Uint8Array;
time: Date;
header: MsgHdrs;
static jc?: Codec<unknown>;

constructor(smr: StreamMsgResponse) {
this.subject = smr.message.subject;
Expand All @@ -332,4 +335,15 @@ export class StoredMsgImpl implements StoredMsg {
}
return bytes;
}

json<T = unknown>(): T {
if (!StoredMsgImpl.jc) {
StoredMsgImpl.jc = JSONCodec();
}
return StoredMsgImpl.jc.decode(this.data) as T;
}

string(): string {
return TD.decode(this.data);
}
}
22 changes: 19 additions & 3 deletions nats-base-client/jsutil.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -28,11 +28,27 @@ import { MsgImpl } from "./msg.ts";
import { Publisher } from "./protocol.ts";

export function validateDurableName(name?: string) {
return validateName("durable", name);
return minValidation("durable", name);
}

export function validateStreamName(name?: string) {
return validateName("stream", name);
return minValidation("stream", name);
}

function minValidation(context: string, name = "") {
// minimum validation on streams/consumers matches nats cli
if (name === "") {
throw Error(`${context} name required`);
}
const bad = [".", "*", ">", "/", "\\"];
bad.forEach((v) => {
if (name.indexOf(v) !== -1) {
throw Error(
`invalid ${context} name - ${context} name cannot contain '${v}'`,
);
}
});
return "";
}

export function validateName(context: string, name = "") {
Expand Down
124 changes: 103 additions & 21 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -434,30 +434,12 @@ export class Bucket implements KV, KvRemove {
}

smToEntry(sm: StoredMsg): KvEntry {
return {
bucket: this.bucket,
key: sm.subject.substring(this.prefixLen),
value: sm.data,
delta: 0,
created: sm.time,
revision: sm.seq,
operation: sm.header.get(kvOperationHdr) as OperationType || "PUT",
length: this.dataLen(sm.data, sm.header),
};
return new KvStoredEntryImpl(this.bucket, this.prefixLen, sm);
}

jmToEntry(jm: JsMsg): KvEntry {
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
return {
bucket: this.bucket,
key: key,
value: jm.data,
created: new Date(millis(jm.info.timestampNanos)),
revision: jm.seq,
operation: jm.headers?.get(kvOperationHdr) as OperationType || "PUT",
delta: jm.info.pending,
length: this.dataLen(jm.data, jm.headers),
} as KvEntry;
return new KvJsMsgEntryImpl(this.bucket, key, jm);
}

create(k: string, data: Uint8Array): Promise<number> {
Expand Down Expand Up @@ -920,3 +902,103 @@ export class KvStatusImpl implements KvStatus {
return this.si.state.bytes;
}
}

class KvStoredEntryImpl implements KvEntry {
bucket: string;
sm: StoredMsg;
prefixLen: number;

constructor(bucket: string, prefixLen: number, sm: StoredMsg) {
this.bucket = bucket;
this.prefixLen = prefixLen;
this.sm = sm;
}

get key(): string {
return this.sm.subject.substring(this.prefixLen);
}

get value(): Uint8Array {
return this.sm.data;
}

get delta(): number {
return 0;
}

get created(): Date {
return this.sm.time;
}

get revision(): number {
return this.sm.seq;
}

get operation(): OperationType {
return this.sm.header.get(kvOperationHdr) as OperationType || "PUT";
}

get length(): number {
const slen = this.sm.header.get(JsHeaders.MessageSizeHdr) || "";
if (slen !== "") {
return parseInt(slen, 10);
}
return this.sm.data.length;
}

json<T>(): T {
return this.sm.json();
}

string(): string {
return this.sm.string();
}
}

class KvJsMsgEntryImpl implements KvEntry {
bucket: string;
key: string;
sm: JsMsg;

constructor(bucket: string, key: string, sm: JsMsg) {
this.bucket = bucket;
this.key = key;
this.sm = sm;
}

get value(): Uint8Array {
return this.sm.data;
}

get created(): Date {
return new Date(millis(this.sm.info.timestampNanos));
}

get revision(): number {
return this.sm.seq;
}

get operation(): OperationType {
return this.sm.headers?.get(kvOperationHdr) as OperationType || "PUT";
}

get delta(): number {
return this.sm.info.pending;
}

get length(): number {
const slen = this.sm.headers?.get(JsHeaders.MessageSizeHdr) || "";
if (slen !== "") {
return parseInt(slen, 10);
}
return this.sm.data.length;
}

json<T>(): T {
return this.sm.json();
}

string(): string {
return this.sm.string();
}
}
15 changes: 14 additions & 1 deletion nats-base-client/msg.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 The NATS Authors
* Copyright 2020-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -18,6 +18,7 @@ import type { Publisher } from "./protocol.ts";
import type { MsgArg } from "./parser.ts";
import { TD } from "./encoders.ts";
import { ErrorCode, NatsError } from "./error.ts";
import { Codec, JSONCodec } from "./codec.ts";

export function isRequestError(msg: Msg): NatsError | null {
// NATS core only considers errors 503s on messages that have no payload
Expand All @@ -36,6 +37,7 @@ export class MsgImpl implements Msg {
_reply!: string;
_subject!: string;
publisher: Publisher;
static jc: Codec<unknown>;

constructor(msg: MsgArg, data: Uint8Array, publisher: Publisher) {
this._msg = msg;
Expand Down Expand Up @@ -98,4 +100,15 @@ export class MsgImpl implements Msg {
const payloadAndHeaders = this._msg.size === -1 ? 0 : this._msg.size;
return subj + reply + payloadAndHeaders;
}

json<T = unknown>(): T {
if (!MsgImpl.jc) {
MsgImpl.jc = JSONCodec();
}
return MsgImpl.jc.decode(this.data) as T;
}

string(): string {
return TD.decode(this.data);
}
}
Loading

0 comments on commit 3394a7e

Please sign in to comment.