Skip to content

Commit

Permalink
Merge pull request #523 from nats-io/srv-info-changes
Browse files Browse the repository at this point in the history
Changes to service info and stats
  • Loading branch information
aricart committed May 31, 2023
2 parents f400c09 + 05eb6cd commit ace0fec
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 29 deletions.
18 changes: 10 additions & 8 deletions nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,12 @@ export type Endpoint = {
};
export type EndpointOptions = Partial<Endpoint>;

export type EndpointInfo = {
name: string;
subject: string;
metadata?: Record<string, string>;
};

export interface Dispatcher<T> {
push(v: T): void;
}
Expand Down Expand Up @@ -920,10 +926,6 @@ export type NamedEndpointStats = {
* Average processing_time is the total processing_time divided by the num_requests
*/
average_processing_time: Nanos;
/**
* Endpoint Metadata
*/
metadata?: Record<string, string>;
};
/**
* Statistics for an endpoint
Expand All @@ -940,14 +942,14 @@ export type ServiceInfo = ServiceIdentity & {
* Description for the service
*/
description: string;
/**
* Subject where the service can be invoked
*/
subjects: string[];
/**
* Service metadata
*/
metadata?: Record<string, string>;
/**
* Information about the Endpoints
*/
endpoints: EndpointInfo[];
};
export type ServiceConfig = {
/**
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export type {
ConnectionOptions,
Dispatcher,
Endpoint,
EndpointInfo,
EndpointOptions,
EndpointStats,
JwtAuth,
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export type {
Deferred,
DispatchedFn,
Endpoint,
EndpointInfo,
EndpointOptions,
EndpointStats,
IngestionFilterFn,
Expand Down
18 changes: 12 additions & 6 deletions nats-base-client/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { parseSemVer } from "./semver.ts";
import { Empty } from "./encoders.ts";
import {
Endpoint,
EndpointInfo,
EndpointOptions,
Msg,
MsgHdrs,
Expand All @@ -47,6 +48,7 @@ import {
ServiceVerb,
Sub,
} from "./core.ts";
import { ReviverFn } from "../jetstream/types.ts";

/**
* Services have common backplane subject pattern:
Expand Down Expand Up @@ -102,8 +104,8 @@ export class ServiceMsgImpl implements ServiceMsg {
return this.msg.respond(data, opts);
}

json<T = unknown>(): T {
return this.msg.json();
json<T = unknown>(reviver?: ReviverFn): T {
return this.msg.json(reviver);
}

string(): string {
Expand Down Expand Up @@ -320,7 +322,6 @@ export class ServiceImpl implements Service {
this.internal.push(sv);
}
sv.stats = new NamedEndpointStatsImpl(name, subject);
sv.stats.metadata = h.metadata;

const callback = handler
? (err: NatsError | null, msg: Msg) => {
Expand Down Expand Up @@ -371,11 +372,18 @@ export class ServiceImpl implements Service {
id: this.id,
version: this.version,
description: this.description,
subjects: this.subjects,
metadata: this.metadata,
endpoints: this.endpoints(),
} as ServiceInfo;
}

endpoints(): EndpointInfo[] {
return this.handlers.map((v) => {
const { subject, metadata, name } = v;
return { subject, metadata, name };
});
}

async stats(): Promise<ServiceStats> {
const endpoints: NamedEndpointStats[] = [];
for (const h of this.handlers) {
Expand Down Expand Up @@ -621,7 +629,6 @@ class NamedEndpointStatsImpl implements NamedEndpointStats {
processing_time,
last_error,
data,
metadata,
} = this;
return {
name,
Expand All @@ -632,7 +639,6 @@ class NamedEndpointStatsImpl implements NamedEndpointStats {
processing_time,
last_error,
data,
metadata,
};
}

Expand Down
28 changes: 18 additions & 10 deletions tests/helpers/service-check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async function invoke(nc: NatsConnection, name: string): Promise<void> {
const infos = await collect(await sc.info(name));

let proms = infos.map((v) => {
return nc.request(v.subjects[0]);
return nc.request(v.endpoints[0].subject);
});
let responses = await Promise.all(proms);
responses.forEach((m) => {
Expand All @@ -154,7 +154,7 @@ async function invoke(nc: NatsConnection, name: string): Promise<void> {

// the service should throw/register an error if "error" is specified as payload
proms = infos.map((v) => {
return nc.request(v.subjects[0], StringCodec().encode("error"));
return nc.request(v.endpoints[0].subject, StringCodec().encode("error"));
});
responses = await Promise.all(proms);
responses.forEach((m) => {
Expand All @@ -166,7 +166,10 @@ async function invoke(nc: NatsConnection, name: string): Promise<void> {
});

proms = infos.map((v, idx) => {
return nc.request(v.subjects[0], StringCodec().encode(`hello ${idx}`));
return nc.request(
v.endpoints[0].subject,
StringCodec().encode(`hello ${idx}`),
);
});
responses = await Promise.all(proms);
responses.forEach((m, idx) => {
Expand Down Expand Up @@ -269,10 +272,6 @@ const statsSchema: JSONSchemaType<ServiceStats> = {
processing_time: { type: "number" },
average_processing_time: { type: "number" },
data: { type: "string" },
metadata: {
type: "object",
minProperties: 1,
},
},
required: [
"num_requests",
Expand All @@ -281,7 +280,6 @@ const statsSchema: JSONSchemaType<ServiceStats> = {
"processing_time",
"average_processing_time",
"data",
"metadata",
],
},
},
Expand All @@ -305,13 +303,23 @@ const infoSchema: JSONSchemaType<ServiceInfo> = {
id: { type: "string" },
version: { type: "string" },
description: { type: "string" },
subjects: { type: "array", items: { type: "string" } },
endpoints: {
type: "array",
items: {
type: "object",
properties: {
name: { type: "string" },
subject: { type: "string" },
metadata: { type: "object", minProperties: 1 },
},
},
},
metadata: {
type: "object",
minProperties: 1,
},
},
required: ["type", "name", "id", "version", "subjects", "metadata"],
required: ["type", "name", "id", "version", "metadata", "endpoints"],
additionalProperties: false,
};

Expand Down
42 changes: 37 additions & 5 deletions tests/service_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,20 @@ Deno.test("service - client", async () => {
const info = infos[0];
assertEquals(info.type, ServiceResponseType.INFO);
assertEquals(info.description, srv.description);
assertEquals(info.subjects.length, srv.subjects.length);
assertArrayIncludes(info.subjects, srv.subjects);
assertEquals(info.endpoints.length, srv.endpoints().length);
assertArrayIncludes(
info.endpoints.map((e) => {
return e.subject;
}),
srv.subjects,
);
const r = info as unknown as Record<string, unknown>;
delete r.type;
delete r.version;
delete r.name;
delete r.id;
delete r.description;
delete r.subjects;
delete r.endpoints;
assertEquals(Object.keys(r).length, 0, JSON.stringify(r));
}

Expand Down Expand Up @@ -605,7 +610,7 @@ Deno.test("service - version must be semver", async () => {
assertEquals(info.name, srv.name);
assertEquals(info.version, "v1.2.3-hello");
assertEquals(info.description, srv.description);
assertEquals(info.subjects.length, 0);
assertEquals(info.endpoints.length, 0);

await cleanup(ns, nc);
});
Expand Down Expand Up @@ -847,7 +852,6 @@ Deno.test("service - metadata", async () => {
assertEquals(info.metadata, { service: "1" });
const stats = await srv.stats();
assertEquals(stats.endpoints?.length, 1);
assertEquals(stats.endpoints?.[0].metadata, { endpoint: "endpoint" });

await cleanup(ns, nc);
});
Expand All @@ -870,3 +874,31 @@ Deno.test("service - schema metadata", async () => {

await cleanup(ns, nc);
});

Deno.test("service - json reviver", async () => {
const { ns, nc } = await setup();
const srv = await nc.services.add({
name: "example",
version: "0.0.1",
metadata: { service: "1" },
});
srv.addGroup("group").addEndpoint("endpoint", {
handler: (_err, msg) => {
const d = msg.json<{ date: Date }>((k, v) => {
if (k === "date") {
return new Date(v);
}
return v;
});
assert(d.date instanceof Date);
msg.respond();
},
metadata: {
endpoint: "endpoint",
},
});

await nc.request("group.endpoint", JSONCodec().encode({ date: Date.now() }));

await cleanup(ns, nc);
});

0 comments on commit ace0fec

Please sign in to comment.