From f30083c47ba0fba804d3da5b6f67daa69beddac7 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 17 May 2022 18:21:14 -0500 Subject: [PATCH] [FEAT] nc#rtt() returns the number of millis for a round trip to the server [FEAT] jsm#getAccountInfo() now reports tiered usage and limits [TEST] added a test validating underlying jetstream error codes --- nats-base-client/nats.ts | 9 ++++++ nats-base-client/types.ts | 32 ++++++++++++------ tests/basics_test.ts | 29 +++++++++++++++++ tests/jetstream_test.ts | 28 ++++++++++++++++ tests/jsm_test.ts | 68 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 156 insertions(+), 10 deletions(-) diff --git a/nats-base-client/nats.ts b/nats-base-client/nats.ts index 962e1c3b..51e72106 100644 --- a/nats-base-client/nats.ts +++ b/nats-base-client/nats.ts @@ -315,4 +315,13 @@ export class NatsConnectionImpl implements NatsConnection { const info = this.info; return info ? parseSemVer(info.version) : undefined; } + + async rtt(): Promise { + if (!this.protocol._closed && !this.protocol.connected) { + throw NatsError.errorForCode(ErrorCode.Disconnect); + } + const start = Date.now(); + await this.flush(); + return Date.now() - start; + } } diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index e2dd2e63..c3c3d348 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -78,6 +78,7 @@ export interface NatsConnection { jetstreamManager(opts?: JetStreamOptions): Promise; jetstream(opts?: JetStreamOptions): JetStreamClient; + rtt(): Promise; } export interface ConnectionOptions { @@ -770,14 +771,34 @@ export interface MsgDeleteRequest extends SeqMsgRequest { "no_erase"?: boolean; } -export interface JetStreamAccountStats { +export interface AccountLimits { + "max_memory": number; + "max_storage": number; + "max_streams": number; + "max_consumers": number; + "memory_max_stream_bytes": number; + "storage_max_stream_bytes": number; + "max_bytes_required": number; +} + +export interface JetStreamUsage { memory: number; storage: number; streams: number; consumers: number; - api: JetStreamApiStats; +} + +export interface JetStreamUsageAccountLimits extends JetStreamUsage { limits: AccountLimits; +} + +export interface JetStreamAccountStats extends JetStreamUsageAccountLimits { + api: JetStreamApiStats; domain?: string; + tiers?: { + R1?: JetStreamUsageAccountLimits; + R3?: JetStreamUsageAccountLimits; + }; } export interface JetStreamApiStats { @@ -788,13 +809,6 @@ export interface JetStreamApiStats { export interface AccountInfoResponse extends ApiResponse, JetStreamAccountStats {} -export interface AccountLimits { - "max_memory": number; - "max_storage": number; - "max_streams": number; - "max_consumers": number; -} - export interface ConsumerConfig extends ConsumerUpdateConfig { "ack_policy": AckPolicy; "deliver_policy": DeliverPolicy; diff --git a/tests/basics_test.ts b/tests/basics_test.ts index 51d211ba..1aa6b726 100644 --- a/tests/basics_test.ts +++ b/tests/basics_test.ts @@ -923,3 +923,32 @@ Deno.test("basics - port and server are mutually exclusive", async () => { undefined, ); }); + +Deno.test("basics - rtt", async () => { + const { ns, nc } = await setup({}, { + maxReconnectAttempts: 5, + reconnectTimeWait: 250, + }); + const rtt = await nc.rtt(); + assert(rtt > 0); + + await ns.stop(); + await delay(500); + await assertRejects( + async () => { + await nc.rtt(); + }, + Error, + ErrorCode.Disconnect, + ); + + await nc.closed(); + + await assertRejects( + async () => { + await nc.rtt(); + }, + Error, + ErrorCode.ConnectionClosed, + ); +}); diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index b84e3e0c..166e4a4b 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -3226,3 +3226,31 @@ Deno.test("jetstream - push bound", async () => { await cleanup(ns, nc, nc2); }); + +Deno.test("jetstream - detailed errors", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const jsm = await nc.jetstreamManager(); + + await assertRejects(async () => { + return jsm.streams.add({ + name: "test", + num_replicas: 3, + subjects: ["foo"], + }); + }, (err: Error) => { + const ne = err as NatsError; + assert(ne.api_error); + assertEquals( + ne.message, + "replicas > 1 not supported in non-clustered mode", + ); + assertEquals( + ne.api_error.description, + "replicas > 1 not supported in non-clustered mode", + ); + assertEquals(ne.api_error.code, 500); + assertEquals(ne.api_error.err_code, 10074); + }); + + await cleanup(ns, nc); +}); diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index a0bdde70..c8467e8b 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -28,8 +28,10 @@ import { ErrorCode, headers, JSONCodec, + jwtAuthenticator, nanos, NatsError, + nkeys, nuid, StreamConfig, StreamInfo, @@ -43,8 +45,17 @@ import { setup, } from "./jstest_util.ts"; import { connect } from "../src/mod.ts"; -import { assertThrowsAsyncErrorCode, notCompatible } from "./helpers/mod.ts"; +import { + assertThrowsAsyncErrorCode, + NatsServer, + notCompatible, +} from "./helpers/mod.ts"; import { validateName } from "../nats-base-client/jsutil.ts"; +import { + encodeAccount, + encodeOperator, + encodeUser, +} from "https://raw.githubusercontent.com/nats-io/jwt.js/main/src/jwt.ts"; const StreamNameRequired = "stream name required"; const ConsumerNameRequired = "durable name required"; @@ -1019,3 +1030,58 @@ Deno.test("jsm - stream info subjects", async () => { await cleanup(ns, nc); }); + +Deno.test("jsm - account limits", async () => { + const O = nkeys.createOperator(); + const SYS = nkeys.createAccount(); + const A = nkeys.createAccount(); + + const resolver: Record = {}; + resolver[A.getPublicKey()] = await encodeAccount("A", A, { + limits: { + conn: -1, + subs: -1, + tiered_limits: { + R1: { + disk_storage: 1024 * 1024, + consumer: -1, + streams: -1, + }, + }, + }, + }, { signer: O }); + resolver[SYS.getPublicKey()] = await encodeAccount("SYS", SYS, { + limits: { + conn: -1, + subs: -1, + }, + }, { signer: O }); + + const conf = { + operator: await encodeOperator("O", O, { + system_account: SYS.getPublicKey(), + }), + resolver: "MEMORY", + "resolver_preload": resolver, + }; + + const ns = await NatsServer.start(jetstreamServerConf(conf, true), true); + + const U = nkeys.createUser(); + const ujwt = await encodeUser("U", U, A, { bearer_token: true }); + + const nc = await connect({ + port: ns.port, + maxReconnectAttempts: -1, + authenticator: jwtAuthenticator(ujwt), + }); + + const jsm = await nc.jetstreamManager(); + + const ai = await jsm.getAccountInfo(); + assertEquals(ai.tiers?.R1?.limits.max_storage, 1024 * 1024); + assertEquals(ai.tiers?.R1?.limits.max_consumers, -1); + assertEquals(ai.tiers?.R1?.limits.max_streams, -1); + + await cleanup(ns, nc); +});