Skip to content

Commit

Permalink
[CHANGES] service api
Browse files Browse the repository at this point in the history
[CHANGE] separated service error header into two headers, one with the code the other with the message (#424)
[CHANGE] clamped down on the service name to be a filesystem safe name
[CHANGE] STATUS is now STATS to better reflect its intention, things named with status, changed to be stats.
[CHANGE] PING now returns name and id of the service and is intended for client RTT calculations
[CHANGE] control subject (for invoking monitoring) can now specify a prefix (this replaces `$SRV` and is intended for cross account).
  • Loading branch information
aricart committed Dec 9, 2022
1 parent 86c41bc commit b54cdd1
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 55 deletions.
1 change: 1 addition & 0 deletions nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export { compare, parseSemVer } from "./semver.ts";
export {
addService,
ServiceError,
ServiceErrorCodeHeader,
ServiceErrorHeader,
ServiceVerb,
} from "./service.ts";
Expand Down
14 changes: 14 additions & 0 deletions nats-base-client/jsutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ export function validName(name = ""): string {
return "";
}

export function strictValidName(name = ""): string {
if (name === "") {
throw Error(`name required`);
}
const bad = [".", "*", ">", "<", ":", '"', "/", "\\", "|", "?"];
for (let i = 0; i < bad.length; i++) {
const v = bad[i];
if (name.indexOf(v) !== -1) {
return `cannot contain '${v}'`;
}
}
return "";
}

export function defaultConsumer(
name: string,
opts: Partial<ConsumerConfig> = {},
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export {
RequestStrategy,
RetentionPolicy,
ServiceError,
ServiceErrorCodeHeader,
ServiceErrorHeader,
ServiceVerb,
StorageType,
Expand Down
119 changes: 71 additions & 48 deletions nats-base-client/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@ import {
Sub,
} from "./types.ts";
import { headers, JSONCodec, nuid } from "./mod.ts";
import { nanos, validName } from "./jsutil.ts";
import { nanos, strictValidName } from "./jsutil.ts";

// FIXME: implement service iterator

/**
* Services have common backplane subject pattern:
*
* `$SRV.PING|STATUS|INFO|SCHEMA` - pings or retrieves status for all services
* `$SRV.PING|STATUS|INFO|SCHEMA.<kind>` - pings or retrieves status for all services having the specified kind
* `$SRV.PING|STATUS|INFO|SCHEMA.<kind>.<id>` - pings or retrieves status of a particular service
* `$SRV.PING|STATS|INFO|SCHEMA` - pings or retrieves status for all services
* `$SRV.PING|STATS|INFO|SCHEMA.<kind>` - pings or retrieves status for all services having the specified kind
* `$SRV.PING|STATS|INFO|SCHEMA.<kind>.<id>` - pings or retrieves status of a particular service
*/
export const ServiceApiPrefix = "$SRV";
export const ServiceErrorHeader = "Nats-Service-Error";
export const ServiceErrorCodeHeader = "Nats-Service-Error-Code";

export enum ServiceVerb {
PING = "PING",
STATUS = "STATUS",
STATS = "STATS",
INFO = "INFO",
SCHEMA = "SCHEMA",
}
Expand Down Expand Up @@ -70,7 +73,7 @@ export type Service = {
stopped: boolean;
/**
* Returns the stats for the service.
* @param internal if true, aggregates status for the generated internal endpoints.
* @param internal if true, aggregates stats for the generated internal endpoints.
*/
stats(internal: boolean): ServiceStats;
/**
Expand Down Expand Up @@ -106,7 +109,7 @@ export type EndpointStats = {
*/
last_error?: Error;
/**
* A field that can be customized with any data as returned by status handler see {@link ServiceConfig}
* A field that can be customized with any data as returned by stats handler see {@link ServiceConfig}
*/
data?: unknown;
/**
Expand All @@ -126,11 +129,11 @@ export type ServiceSchema = {

export type ServiceInfo = {
/**
* The kind of the service reporting the status
* The kind of the service reporting the stats
*/
name: string;
/**
* The unique ID of the service reporting the status
* The unique ID of the service reporting the stats
*/
id: string;
/**
Expand Down Expand Up @@ -171,11 +174,11 @@ export type ServiceConfig = {
*/
endpoint: Endpoint;
/**
* A customized handler for the status of an endpoint. The
* A customized handler for the stats of an endpoint. The
* data returned by the endpoint will be serialized as is
* @param endpoint
*/
statusHandler?: (
statsHandler?: (
endpoint: Endpoint,
) => Promise<unknown | null>;
};
Expand Down Expand Up @@ -208,23 +211,23 @@ type ServiceSubscription<T = unknown> =
};

/**
* The status of a service
* The stats of a service
*/
export type ServiceStats = {
/**
* Name for the endpoint
*/
name: string;
/**
* The unique ID of the service reporting the status
* The unique ID of the service reporting the stats
*/
id: string;
/**
* The version identifier for the service
*/
version?: string;
/**
* An EndpointStatus per each endpoint on the service
* An EndpointStats per each endpoint on the service
*/
stats: EndpointStats[];
};
Expand Down Expand Up @@ -268,18 +271,30 @@ export class ServiceImpl implements Service {
internal: ServiceSubscription[];
stopped: boolean;
watched: Promise<void>[];
statuses: Map<Endpoint, EndpointStats>;
allStats: Map<Endpoint, EndpointStats>;
interval!: number;

static controlSubject(verb: ServiceVerb, name = "", id = "") {
/**
* @param verb
* @param name
* @param id
* @param prefix - this is only supplied by tooling when building control subject that crosses an account
*/
static controlSubject(
verb: ServiceVerb,
name = "",
id = "",
prefix?: string,
) {
const pre = prefix ?? ServiceApiPrefix;
if (name === "" && id === "") {
return `${ServiceApiPrefix}.${verb}`;
return `${pre}.${verb}`;
}
name = name.toUpperCase();
id = id.toUpperCase();
return id !== ""
? `${ServiceApiPrefix}.${verb}.${name}.${id}`
: `${ServiceApiPrefix}.${verb}.${name}`;
? `${pre}.${verb}.${name}.${id}`
: `${pre}.${verb}.${name}`;
}

constructor(
Expand All @@ -288,7 +303,7 @@ export class ServiceImpl implements Service {
) {
this.nc = nc;
this.config = config;
const n = validName(this.name);
const n = strictValidName(this.name);
if (n !== "") {
throw new Error(`name ${n}`);
}
Expand All @@ -299,7 +314,7 @@ export class ServiceImpl implements Service {
this.watched = [];
this._done = deferred();
this.stopped = false;
this.statuses = new Map<ServiceSubscription, EndpointStats>();
this.allStats = new Map<ServiceSubscription, EndpointStats>();

this.nc.closed()
.then(() => {
Expand Down Expand Up @@ -338,9 +353,11 @@ export class ServiceImpl implements Service {
const h = headers();
if (err instanceof ServiceError) {
const se = err as ServiceError;
h.set(ServiceErrorHeader, `${se.code} ${se.message}`);
h.set(ServiceErrorHeader, se.message);
h.set(ServiceErrorCodeHeader, `${se.code}`);
} else {
h.set(ServiceErrorHeader, `400 ${err.message}`);
h.set(ServiceErrorHeader, err.message);
h.set(ServiceErrorCodeHeader, "400");
}
return h;
}
Expand All @@ -355,7 +372,7 @@ export class ServiceImpl implements Service {
this.internal.push(sv);
}
const { name } = h as InternalEndpoint;
const status: EndpointStats = {
const stats: EndpointStats = {
name: name ? name : this.name,
num_requests: 0,
num_errors: 0,
Expand All @@ -364,14 +381,14 @@ export class ServiceImpl implements Service {
};

const countLatency = function (start: number) {
status.total_latency = nanos(Date.now() - start);
status.average_latency = Math.round(
status.total_latency / status.num_requests,
stats.total_latency = nanos(Date.now() - start);
stats.average_latency = Math.round(
stats.total_latency / stats.num_requests,
);
};
const countError = function (err: Error) {
status.num_errors++;
status.last_error = err;
stats.num_errors++;
stats.last_error = err;
};

sv.sub = this.nc.subscribe(subject, {
Expand All @@ -381,7 +398,7 @@ export class ServiceImpl implements Service {
return;
}
const start = Date.now();
status.num_requests++;
stats.num_requests++;
try {
handler(err, msg)
.catch((err) => {
Expand All @@ -398,7 +415,7 @@ export class ServiceImpl implements Service {
},
queue,
});
this.statuses.set(h, status);
this.allStats.set(h, stats);

sv.sub.closed
.then(() => {
Expand All @@ -420,31 +437,31 @@ export class ServiceImpl implements Service {

stats(internal = false): ServiceStats {
const ss: ServiceStats = {
// status: status ? status : null,
// stats: stats ? stats : null,
name: this.name,
id: this.id,
version: this.version,
stats: [],
};

// the status for the service handler
const status = this.statuses.get(this.handler);
if (status) {
if (typeof this.config.statusHandler === "function") {
// the stats for the service handler
const stats = this.allStats.get(this.handler);
if (stats) {
if (typeof this.config.statsHandler === "function") {
try {
status.data = this.config.statusHandler(this.handler as Endpoint);
stats.data = this.config.statsHandler(this.handler as Endpoint);
} catch (err) {
status.last_error = err;
stats.last_error = err;
}
}
ss.stats.push(status);
ss.stats.push(stats);
}

if (internal) {
this.internal.forEach((h) => {
const status = this.statuses.get(h);
if (status) {
ss.stats.push(status);
const stats = this.allStats.get(h);
if (stats) {
ss.stats.push(stats);
}
});
}
Expand Down Expand Up @@ -483,7 +500,7 @@ export class ServiceImpl implements Service {
}

start(): Promise<Service> {
const statusHandler = (err: Error | null, msg: Msg): Promise<void> => {
const statsHandler = (err: Error | null, msg: Msg): Promise<void> => {
if (err) {
this.close(err);
return Promise.reject(err);
Expand All @@ -498,8 +515,8 @@ export class ServiceImpl implements Service {
// ignored
}

const status = this.stats(internal);
msg?.respond(jc.encode(status));
const stats = this.stats(internal);
msg?.respond(jc.encode(stats));
return Promise.resolve();
};

Expand All @@ -519,8 +536,14 @@ export class ServiceImpl implements Service {
return Promise.resolve();
};

const ping = jc.encode({ name: this.name, id: this.id });
const pingHandler = (err: Error | null, msg: Msg): Promise<void> => {
return infoHandler(err, msg);
if (err) {
this.close(err).then().catch();
return Promise.reject(err);
}
msg.respond(ping);
return Promise.resolve();
};

const schemaHandler = (err: Error | null, msg: Msg): Promise<void> => {
Expand All @@ -534,7 +557,7 @@ export class ServiceImpl implements Service {
};

this.addInternalHandler(ServiceVerb.PING, pingHandler);
this.addInternalHandler(ServiceVerb.STATUS, statusHandler);
this.addInternalHandler(ServiceVerb.STATS, statsHandler);
this.addInternalHandler(ServiceVerb.INFO, infoHandler);
if (
this.config.schema?.request !== "" || this.config.schema?.response !== ""
Expand Down Expand Up @@ -584,7 +607,7 @@ export class ServiceImpl implements Service {
}

reset(): void {
const iter = this.statuses.values();
const iter = this.allStats.values();
for (const s of iter) {
s.average_latency = 0;
s.num_errors = 0;
Expand Down
Loading

0 comments on commit b54cdd1

Please sign in to comment.