Skip to content

Commit

Permalink
[FEAT] nc#rtt() returns the number of millis for a round trip to the …
Browse files Browse the repository at this point in the history
…server

[FEAT] jsm#getAccountInfo() now reports tiered usage and limits
[TEST] added a test validating underlying jetstream error codes
  • Loading branch information
aricart committed May 17, 2022
1 parent 3cbd24d commit f30083c
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 10 deletions.
9 changes: 9 additions & 0 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,13 @@ export class NatsConnectionImpl implements NatsConnection {
const info = this.info;
return info ? parseSemVer(info.version) : undefined;
}

async rtt(): Promise<number> {
if (!this.protocol._closed && !this.protocol.connected) {
throw NatsError.errorForCode(ErrorCode.Disconnect);
}
const start = Date.now();
await this.flush();
return Date.now() - start;
}
}
32 changes: 23 additions & 9 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export interface NatsConnection {

jetstreamManager(opts?: JetStreamOptions): Promise<JetStreamManager>;
jetstream(opts?: JetStreamOptions): JetStreamClient;
rtt(): Promise<number>;
}

export interface ConnectionOptions {
Expand Down Expand Up @@ -770,14 +771,34 @@ export interface MsgDeleteRequest extends SeqMsgRequest {
"no_erase"?: boolean;
}

export interface JetStreamAccountStats {
export interface AccountLimits {
"max_memory": number;
"max_storage": number;
"max_streams": number;
"max_consumers": number;
"memory_max_stream_bytes": number;
"storage_max_stream_bytes": number;
"max_bytes_required": number;
}

export interface JetStreamUsage {
memory: number;
storage: number;
streams: number;
consumers: number;
api: JetStreamApiStats;
}

export interface JetStreamUsageAccountLimits extends JetStreamUsage {
limits: AccountLimits;
}

export interface JetStreamAccountStats extends JetStreamUsageAccountLimits {
api: JetStreamApiStats;
domain?: string;
tiers?: {
R1?: JetStreamUsageAccountLimits;
R3?: JetStreamUsageAccountLimits;
};
}

export interface JetStreamApiStats {
Expand All @@ -788,13 +809,6 @@ export interface JetStreamApiStats {
export interface AccountInfoResponse
extends ApiResponse, JetStreamAccountStats {}

export interface AccountLimits {
"max_memory": number;
"max_storage": number;
"max_streams": number;
"max_consumers": number;
}

export interface ConsumerConfig extends ConsumerUpdateConfig {
"ack_policy": AckPolicy;
"deliver_policy": DeliverPolicy;
Expand Down
29 changes: 29 additions & 0 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -923,3 +923,32 @@ Deno.test("basics - port and server are mutually exclusive", async () => {
undefined,
);
});

Deno.test("basics - rtt", async () => {
const { ns, nc } = await setup({}, {
maxReconnectAttempts: 5,
reconnectTimeWait: 250,
});
const rtt = await nc.rtt();
assert(rtt > 0);

await ns.stop();
await delay(500);
await assertRejects(
async () => {
await nc.rtt();
},
Error,
ErrorCode.Disconnect,
);

await nc.closed();

await assertRejects(
async () => {
await nc.rtt();
},
Error,
ErrorCode.ConnectionClosed,
);
});
28 changes: 28 additions & 0 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3226,3 +3226,31 @@ Deno.test("jetstream - push bound", async () => {

await cleanup(ns, nc, nc2);
});

Deno.test("jetstream - detailed errors", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const jsm = await nc.jetstreamManager();

await assertRejects(async () => {
return jsm.streams.add({
name: "test",
num_replicas: 3,
subjects: ["foo"],
});
}, (err: Error) => {
const ne = err as NatsError;
assert(ne.api_error);
assertEquals(
ne.message,
"replicas > 1 not supported in non-clustered mode",
);
assertEquals(
ne.api_error.description,
"replicas > 1 not supported in non-clustered mode",
);
assertEquals(ne.api_error.code, 500);
assertEquals(ne.api_error.err_code, 10074);
});

await cleanup(ns, nc);
});
68 changes: 67 additions & 1 deletion tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import {
ErrorCode,
headers,
JSONCodec,
jwtAuthenticator,
nanos,
NatsError,
nkeys,
nuid,
StreamConfig,
StreamInfo,
Expand All @@ -43,8 +45,17 @@ import {
setup,
} from "./jstest_util.ts";
import { connect } from "../src/mod.ts";
import { assertThrowsAsyncErrorCode, notCompatible } from "./helpers/mod.ts";
import {
assertThrowsAsyncErrorCode,
NatsServer,
notCompatible,
} from "./helpers/mod.ts";
import { validateName } from "../nats-base-client/jsutil.ts";
import {
encodeAccount,
encodeOperator,
encodeUser,
} from "https://raw.githubusercontent.com/nats-io/jwt.js/main/src/jwt.ts";

const StreamNameRequired = "stream name required";
const ConsumerNameRequired = "durable name required";
Expand Down Expand Up @@ -1019,3 +1030,58 @@ Deno.test("jsm - stream info subjects", async () => {

await cleanup(ns, nc);
});

Deno.test("jsm - account limits", async () => {
const O = nkeys.createOperator();
const SYS = nkeys.createAccount();
const A = nkeys.createAccount();

const resolver: Record<string, string> = {};
resolver[A.getPublicKey()] = await encodeAccount("A", A, {
limits: {
conn: -1,
subs: -1,
tiered_limits: {
R1: {
disk_storage: 1024 * 1024,
consumer: -1,
streams: -1,
},
},
},
}, { signer: O });
resolver[SYS.getPublicKey()] = await encodeAccount("SYS", SYS, {
limits: {
conn: -1,
subs: -1,
},
}, { signer: O });

const conf = {
operator: await encodeOperator("O", O, {
system_account: SYS.getPublicKey(),
}),
resolver: "MEMORY",
"resolver_preload": resolver,
};

const ns = await NatsServer.start(jetstreamServerConf(conf, true), true);

const U = nkeys.createUser();
const ujwt = await encodeUser("U", U, A, { bearer_token: true });

const nc = await connect({
port: ns.port,
maxReconnectAttempts: -1,
authenticator: jwtAuthenticator(ujwt),
});

const jsm = await nc.jetstreamManager();

const ai = await jsm.getAccountInfo();
assertEquals(ai.tiers?.R1?.limits.max_storage, 1024 * 1024);
assertEquals(ai.tiers?.R1?.limits.max_consumers, -1);
assertEquals(ai.tiers?.R1?.limits.max_streams, -1);

await cleanup(ns, nc);
});

0 comments on commit f30083c

Please sign in to comment.