Skip to content

Commit

Permalink
[WIP] service framework
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Oct 3, 2022
1 parent a75d862 commit 63eb594
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 0 deletions.
171 changes: 171 additions & 0 deletions nats-base-client/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Deferred, deferred } from "./util.ts";
import { Msg, NatsConnection } from "./types.ts";
import { JSONCodec } from "./mod.ts";

export type Srv = {
handlers: SvcHandler[];
status: Promise<SrvStatus[]>;
stop(): Promise<null | Error>;
done: Promise<null | Error>;
stopped: boolean;
heartbeatInterval: number;
};

export type SrvStatusEntry = {
requests: number;
errors: number;
uptime: number;
lastError?: string;
version: number;
};

export type SvcHandler<T = unknown> = {
name: string;
description: string;
schema: {
request: string;
response: string;
};
version: number;
subject: string;
handler: (err: Error, msg: T) => void;
statusHandler: () => Promise<SrvStatusEntry>;
stop(): Promise<void>;
queueGroup: string;
};

export type SrvStatus = {
name: string;
status: SrvStatusEntry | null;
error: Error | null;
};

export function addService(
nc: NatsConnection,
kind: string,
id: string,
handlers: SvcHandler[] = [],
): Promise<Srv> {
const s = new SrvImpl(nc, kind, id, handlers);
try {
return s.start();
} catch (err) {
return Promise.reject(err);
}
}

const jc = JSONCodec();

export class SrvImpl implements Srv {
nc: NatsConnection;
kind: string;
id: string;
_done: Deferred<Error | null>;
handlers: SvcHandler[];
heartbeatInterval: number;
stopped: boolean;

static controlSubject(kind: string, verb: string, id = "") {
return id !== "" ? `svc.${verb}.${kind}.${id}` : `svc.${verb}.${kind}`;
}

constructor(
nc: NatsConnection,
kind: string,
id: string,
handlers: SvcHandler[],
) {
this.nc = nc;
this.kind = kind;
this.id = id;
this.handlers = handlers;
this._done = deferred();
this.heartbeatInterval = 15 * 1000;
this.stopped = false;
}

start(): Promise<Srv> {
const pingHandler = (err: Error | null, msg: Msg) => {
if (err) {
this.close(err);
return;
}
msg.respond();
};

const statusHandler = async (msg: Msg) => {
const status = await this.status;
msg.respond(jc.encode(status));
};

this.nc.subscribe(`svc.PING.${this.kind}.${this.id}`, {
callback: pingHandler,
});

this.nc.subscribe(`svc.PING.${this.kind}`, {
callback: pingHandler,
});

const sh = this.nc.subscribe(`svc.STATUS.${this.kind}.${this.id}`);
(async () => {
for await (const m of sh) {
await statusHandler(m);
}
})();

this.nc.subscribe(`svc.STATUS.${this.kind}`);
(async () => {
for await (const m of sh) {
await statusHandler(m);
}
})();

return Promise.resolve(this);
}

close(err?: Error): Promise<null | Error> {
this.stopped = true;
this._done.resolve(err ? err : null);
return this._done;
}

get done(): Promise<null | Error> {
return this._done;
}

stop(): Promise<null | Error> {
this.stopped = true;
this._done.resolve();
return this._done;
}

get status(): Promise<SrvStatus[]> {
const proms = this.handlers.map((h) => {
const d = deferred<SrvStatus>();
h.statusHandler()
.then((s: SrvStatusEntry) => {
d.resolve({ name: h.name, status: s, error: null });
})
.catch((err) => {
d.resolve({ name: h.name, status: null, error: err });
});
return d;
});

return Promise.all(proms);
}
}
17 changes: 17 additions & 0 deletions tests/service_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { cleanup, setup } from "./jstest_util.ts";
import { addService, SrvImpl } from "../nats-base-client/service.ts";
import { assertRejects } from "https://deno.land/std@0.125.0/testing/asserts.ts";

Deno.test("svc - basics", async () => {
const { ns, nc } = await setup();

await addService(nc, "test", "a");

await nc.request(SrvImpl.controlSubject("test", "PING"));
await nc.request(SrvImpl.controlSubject("test", "PING", "a"));
await assertRejects(async () => {
await nc.request(SrvImpl.controlSubject("test", "PING", "b"));
});

await cleanup(ns, nc);
});

0 comments on commit 63eb594

Please sign in to comment.