Skip to content

Commit

Permalink
FIX #476 - added json()/string() utilities to Msg, JsMsg, KvEntry, St…
Browse files Browse the repository at this point in the history
…oredMsg, DirectMsg this allows clients to decode payloads easier while still requiring accountability on the client (as decoding can fail)
  • Loading branch information
aricart committed Feb 6, 2023
1 parent 839642f commit d421c5a
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 26 deletions.
16 changes: 15 additions & 1 deletion nats-base-client/jsmdirect_api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The NATS Authors
* Copyright 2022-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -27,6 +27,8 @@ import {
} from "./types.ts";
import { checkJsError, validateStreamName } from "./jsutil.ts";
import { MsgHdrs } from "./headers.ts";
import { Codec, JSONCodec } from "./codec.ts";
import { TD } from "./encoders.ts";

export class DirectStreamAPIImpl extends BaseApiClient
implements DirectStreamAPI {
Expand Down Expand Up @@ -71,6 +73,7 @@ export class DirectStreamAPIImpl extends BaseApiClient
export class DirectMsgImpl implements DirectMsg {
data: Uint8Array;
header: MsgHdrs;
static jc?: Codec<unknown>;

constructor(m: Msg) {
if (!m.headers) {
Expand All @@ -96,4 +99,15 @@ export class DirectMsgImpl implements DirectMsg {
get stream(): string {
return this.header.get(DirectMsgHeaders.Stream);
}

json<T = unknown>(): T {
if (!DirectMsgImpl.jc) {
DirectMsgImpl.jc = JSONCodec();
}
return DirectMsgImpl.jc.decode(this.data) as T;
}

string(): string {
return TD.decode(this.data);
}
}
10 changes: 9 additions & 1 deletion nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -193,4 +193,12 @@ export class JsMsgImpl implements JsMsg {
term() {
this.doAck(TERM);
}

json<T = unknown>(): T {
return this.msg.json();
}

string(): string {
return this.msg.string();
}
}
16 changes: 15 additions & 1 deletion nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -46,6 +46,8 @@ import { validateStreamName } from "./jsutil.ts";
import { headers, MsgHdrs, MsgHdrsImpl } from "./headers.ts";
import { kvPrefix, KvStatusImpl } from "./kv.ts";
import { ObjectStoreStatusImpl, osPrefix } from "./objectstore.ts";
import { Codec, JSONCodec } from "./codec.ts";
import { TD } from "./encoders.ts";

export function convertStreamSourceDomain(s?: StreamSource) {
if (s === undefined) {
Expand Down Expand Up @@ -309,6 +311,7 @@ export class StoredMsgImpl implements StoredMsg {
data: Uint8Array;
time: Date;
header: MsgHdrs;
static jc?: Codec<unknown>;

constructor(smr: StreamMsgResponse) {
this.subject = smr.message.subject;
Expand All @@ -332,4 +335,15 @@ export class StoredMsgImpl implements StoredMsg {
}
return bytes;
}

json<T = unknown>(): T {
if (!StoredMsgImpl.jc) {
StoredMsgImpl.jc = JSONCodec();
}
return StoredMsgImpl.jc.decode(this.data) as T;
}

string(): string {
return TD.decode(this.data);
}
}
124 changes: 103 additions & 21 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 The NATS Authors
* Copyright 2021-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -434,30 +434,12 @@ export class Bucket implements KV, KvRemove {
}

smToEntry(sm: StoredMsg): KvEntry {
return {
bucket: this.bucket,
key: sm.subject.substring(this.prefixLen),
value: sm.data,
delta: 0,
created: sm.time,
revision: sm.seq,
operation: sm.header.get(kvOperationHdr) as OperationType || "PUT",
length: this.dataLen(sm.data, sm.header),
};
return new KvStoredEntryImpl(this.bucket, this.prefixLen, sm);
}

jmToEntry(jm: JsMsg): KvEntry {
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
return {
bucket: this.bucket,
key: key,
value: jm.data,
created: new Date(millis(jm.info.timestampNanos)),
revision: jm.seq,
operation: jm.headers?.get(kvOperationHdr) as OperationType || "PUT",
delta: jm.info.pending,
length: this.dataLen(jm.data, jm.headers),
} as KvEntry;
return new KvJsMsgEntryImpl(this.bucket, key, jm);
}

create(k: string, data: Uint8Array): Promise<number> {
Expand Down Expand Up @@ -920,3 +902,103 @@ export class KvStatusImpl implements KvStatus {
return this.si.state.bytes;
}
}

class KvStoredEntryImpl implements KvEntry {
bucket: string;
sm: StoredMsg;
prefixLen: number;

constructor(bucket: string, prefixLen: number, sm: StoredMsg) {
this.bucket = bucket;
this.prefixLen = prefixLen;
this.sm = sm;
}

get key(): string {
return this.sm.subject.substring(this.prefixLen);
}

get value(): Uint8Array {
return this.sm.data;
}

get delta(): number {
return 0;
}

get created(): Date {
return this.sm.time;
}

get revision(): number {
return this.sm.seq;
}

get operation(): OperationType {
return this.sm.header.get(kvOperationHdr) as OperationType || "PUT";
}

get length(): number {
const slen = this.sm.header.get(JsHeaders.MessageSizeHdr) || "";
if (slen !== "") {
return parseInt(slen, 10);
}
return this.sm.data.length;
}

json<T>(): T {
return this.sm.json();
}

string(): string {
return this.sm.string();
}
}

class KvJsMsgEntryImpl implements KvEntry {
bucket: string;
key: string;
sm: JsMsg;

constructor(bucket: string, key: string, sm: JsMsg) {
this.bucket = bucket;
this.key = key;
this.sm = sm;
}

get value(): Uint8Array {
return this.sm.data;
}

get created(): Date {
return new Date(millis(this.sm.info.timestampNanos));
}

get revision(): number {
return this.sm.seq;
}

get operation(): OperationType {
return this.sm.headers?.get(kvOperationHdr) as OperationType || "PUT";
}

get delta(): number {
return this.sm.info.pending;
}

get length(): number {
const slen = this.sm.headers?.get(JsHeaders.MessageSizeHdr) || "";
if (slen !== "") {
return parseInt(slen, 10);
}
return this.sm.data.length;
}

json<T>(): T {
return this.sm.json();
}

string(): string {
return this.sm.string();
}
}
15 changes: 14 additions & 1 deletion nats-base-client/msg.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 The NATS Authors
* Copyright 2020-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -18,6 +18,7 @@ import type { Publisher } from "./protocol.ts";
import type { MsgArg } from "./parser.ts";
import { TD } from "./encoders.ts";
import { ErrorCode, NatsError } from "./error.ts";
import { Codec, JSONCodec } from "./codec.ts";

export function isRequestError(msg: Msg): NatsError | null {
// NATS core only considers errors 503s on messages that have no payload
Expand All @@ -36,6 +37,7 @@ export class MsgImpl implements Msg {
_reply!: string;
_subject!: string;
publisher: Publisher;
static jc: Codec<unknown>;

constructor(msg: MsgArg, data: Uint8Array, publisher: Publisher) {
this._msg = msg;
Expand Down Expand Up @@ -98,4 +100,15 @@ export class MsgImpl implements Msg {
const payloadAndHeaders = this._msg.size === -1 ? 0 : this._msg.size;
return subj + reply + payloadAndHeaders;
}

json<T = unknown>(): T {
if (!MsgImpl.jc) {
MsgImpl.jc = JSONCodec();
}
return MsgImpl.jc.decode(this.data) as T;
}

string(): string {
return TD.decode(this.data);
}
}
8 changes: 8 additions & 0 deletions nats-base-client/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ export class ServiceMsgImpl implements ServiceMsg {
opts.headers?.set(ServiceErrorHeader, description);
return this.msg.respond(data, opts);
}

json<T = unknown>(): T {
return this.msg.json();
}

string(): string {
return this.msg.string();
}
}

export interface ServiceGroup {
Expand Down
46 changes: 46 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,18 @@ export interface Msg {
* @param opts
*/
respond(data?: Uint8Array, opts?: PublishOptions): boolean;

/**
* Convenience method to parse the message payload as JSON. This method
* will throw an exception if there's a parsing error;
*/
json<T>(): T;

/**
* Convenience method to parse the message payload as string. This method
* may throw an exception if there's a conversion error
*/
string(): string;
}

/**
Expand Down Expand Up @@ -1547,6 +1559,16 @@ export interface JsMsg {
* that the acknowledgement was received.
*/
ackAck(): Promise<boolean>;
/**
* Convenience method to parse the message payload as JSON. This method
* will throw an exception if there's a parsing error;
*/
json<T>(): T;
/**
* Convenience method to parse the message payload as string. This method
* may throw an exception if there's a conversion error
*/
string(): string;
}

export interface DeliveryInfo {
Expand Down Expand Up @@ -1617,6 +1639,18 @@ export interface StoredMsg {
* The time the message was received
*/
time: Date;

/**
* Convenience method to parse the message payload as JSON. This method
* will throw an exception if there's a parsing error;
*/
json<T>(): T;

/**
* Convenience method to parse the message payload as string. This method
* may throw an exception if there's a conversion error
*/
string(): string;
}

export interface DirectMsg extends StoredMsg {
Expand Down Expand Up @@ -2587,6 +2621,18 @@ export interface KvEntry {
delta?: number;
operation: "PUT" | "DEL" | "PURGE";
length: number;

/**
* Convenience method to parse the entry payload as JSON. This method
* will throw an exception if there's a parsing error;
*/
json<T>(): T;

/**
* Convenience method to parse the entry payload as string. This method
* may throw an exception if there's a conversion error
*/
string(): string;
}

/**
Expand Down

0 comments on commit d421c5a

Please sign in to comment.