Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] allow micro to specify a queue name #590

Merged
merged 3 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -823,13 +823,19 @@ export type Endpoint = {
* Optional metadata about the endpoint
*/
metadata?: Record<string, string>;
/**
* Optional queue group to run this particular endpoint in. The service's configuration
* queue configuration will be used. See {@link ServiceConfig.queue}.
*/
queue?: string;
};
export type EndpointOptions = Partial<Endpoint>;

export type EndpointInfo = {
name: string;
subject: string;
metadata?: Record<string, string>;
queue_group?: string;
};

export interface Dispatcher<T> {
Expand Down Expand Up @@ -864,9 +870,13 @@ export interface ServiceGroup {
* A group is a subject prefix from which endpoints can be added.
* Can be empty to allow for prefixes or tokens that are set at runtime
* without requiring editing of the service.
* Note that an optional queue can be specified, all endpoints added to
* the group, will use the specified queue unless the endpoint overrides it.
* see {@link EndpointOptions.queue} and {@link ServiceConfig.queue}.
* @param subject
* @param queue
*/
addGroup(subject?: string): ServiceGroup;
addGroup(subject?: string, queue?: string): ServiceGroup;
}

export type ServiceMetadata = {
Expand Down Expand Up @@ -933,6 +943,10 @@ export type NamedEndpointStats = {
* Average processing_time is the total processing_time divided by the num_requests
*/
average_processing_time: Nanos;
/**
* The queue group the endpoint is listening on
*/
queue_group?: string;
};
/**
* Statistics for an endpoint
Expand Down Expand Up @@ -984,6 +998,12 @@ export type ServiceConfig = {
* Optional metadata about the service
*/
metadata?: Record<string, string>;
/**
* Optional queue group to run the service in. By default,
* then queue name is "q". Note that this configuration will
* be the default for all endpoints and groups.
*/
queue?: string;
};
/**
* The stats of a service
Expand Down
44 changes: 31 additions & 13 deletions nats-base-client/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ export class ServiceMsgImpl implements ServiceMsg {

export class ServiceGroupImpl implements ServiceGroup {
subject: string;
queue: string;
srv: ServiceImpl;
constructor(parent: ServiceGroup, name = "") {
constructor(parent: ServiceGroup, name = "", queue = "") {
if (name !== "") {
validInternalToken("service group", name);
}
Expand All @@ -127,11 +128,15 @@ export class ServiceGroupImpl implements ServiceGroup {
} else if (parent instanceof ServiceGroupImpl) {
const sg = parent as ServiceGroupImpl;
this.srv = sg.srv;
if (queue === "" && sg.queue !== "") {
queue = sg.queue;
}
root = sg.subject;
} else {
throw new Error("unknown ServiceGroup type");
}
this.subject = this.calcSubject(root, name);
this.queue = queue;
}

calcSubject(root: string, name = ""): string {
Expand All @@ -149,16 +154,18 @@ export class ServiceGroupImpl implements ServiceGroup {
? { handler: opts, subject: name }
: opts;
validateName("endpoint", name);
let { subject, handler, metadata } = args;
let { subject, handler, metadata, queue } = args;
subject = subject || name;
queue = queue || this.queue;
validSubjectName("endpoint subject", subject);
subject = this.calcSubject(this.subject, subject);
const ne = { name, subject, handler, metadata };

const ne = { name, subject, queue, handler, metadata };
return this.srv._addEndpoint(ne);
}

addGroup(name = ""): ServiceGroup {
return new ServiceGroupImpl(this, name);
addGroup(name = "", queue = ""): ServiceGroup {
return new ServiceGroupImpl(this, name, queue);
}
}

Expand Down Expand Up @@ -244,9 +251,15 @@ export class ServiceImpl implements Service {
config: ServiceConfig = { name: "", version: "" },
) {
this.nc = nc;
this.config = config;
this.config = Object.assign({}, config);
if (!this.config.queue) {
this.config.queue = "q";
}

// this will throw if no name
validateName("name", this.config.name);
validateName("queue", this.config.queue);

// this will throw if not semver
parseSemVer(this.config.version);
this._id = nuid.next();
Expand Down Expand Up @@ -314,14 +327,15 @@ export class ServiceImpl implements Service {
internal = false,
): ServiceSubscription {
// internals don't use a queue
const queue = internal ? "" : "q";
const queue = internal ? "" : (h.queue ? h.queue : this.config.queue);
const { name, subject, handler } = h as NamedEndpoint;
const sv = h as ServiceSubscription;
sv.internal = internal;
if (internal) {
this.internal.push(sv);
}
sv.stats = new NamedEndpointStatsImpl(name, subject);
sv.stats = new NamedEndpointStatsImpl(name, subject, queue);
sv.queue = queue;

const callback = handler
? (err: NatsError | null, msg: Msg) => {
Expand Down Expand Up @@ -379,8 +393,8 @@ export class ServiceImpl implements Service {

endpoints(): EndpointInfo[] {
return this.handlers.map((v) => {
const { subject, metadata, name } = v;
return { subject, metadata, name };
const { subject, metadata, name, queue } = v;
return { subject, metadata, name, queue_group: queue };
});
}

Expand Down Expand Up @@ -541,8 +555,8 @@ export class ServiceImpl implements Service {
}
}

addGroup(name: string): ServiceGroup {
return new ServiceGroupImpl(this, name);
addGroup(name: string, queue?: string): ServiceGroup {
return new ServiceGroupImpl(this, name, queue);
}

addEndpoint(
Expand Down Expand Up @@ -585,14 +599,16 @@ class NamedEndpointStatsImpl implements NamedEndpointStats {
last_error?: string;
data?: unknown;
metadata?: Record<string, string>;
queue: string;

constructor(name: string, subject: string) {
constructor(name: string, subject: string, queue = "") {
this.name = name;
this.subject = subject;
this.average_processing_time = 0;
this.num_errors = 0;
this.num_requests = 0;
this.processing_time = 0;
this.queue = queue;
}
reset(qi?: QueuedIterator<unknown>) {
this.num_requests = 0;
Expand Down Expand Up @@ -629,6 +645,7 @@ class NamedEndpointStatsImpl implements NamedEndpointStats {
processing_time,
last_error,
data,
queue,
} = this;
return {
name,
Expand All @@ -639,6 +656,7 @@ class NamedEndpointStatsImpl implements NamedEndpointStats {
processing_time,
last_error,
data,
queue_group: queue,
};
}

Expand Down
2 changes: 2 additions & 0 deletions tests/helpers/service-check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ const statsSchema: JSONSchemaType<ServiceStats> = {
processing_time: { type: "number" },
average_processing_time: { type: "number" },
data: { type: "string" },
queue_group: { type: "string" },
},
required: [
"num_requests",
Expand All @@ -280,6 +281,7 @@ const statsSchema: JSONSchemaType<ServiceStats> = {
"processing_time",
"average_processing_time",
"data",
"queue_group",
],
},
},
Expand Down
112 changes: 112 additions & 0 deletions tests/service_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
import {
connect,
createInbox,
EndpointInfo,
ErrorCode,
JSONCodec,
Msg,
NatsConnection,
NatsError,
nuid,
QueuedIterator,
Expand All @@ -47,6 +49,7 @@ import {
ServiceVerb,
StringCodec,
} from "../src/mod.ts";
import { SubscriptionImpl } from "../nats-base-client/protocol.ts";

Deno.test("service - control subject", () => {
const test = (verb: ServiceVerb) => {
Expand Down Expand Up @@ -902,3 +905,112 @@ Deno.test("service - json reviver", async () => {

await cleanup(ns, nc);
});

async function testQueueName(nc: NatsConnection, subj: string, q?: string) {
const srv = await nc.services.add({
name: "example",
version: "0.0.1",
metadata: { service: "1" },
queue: q,
});

srv.addEndpoint(subj, {
handler: (_err, msg) => {
msg.respond();
},
});

const nci = nc as NatsConnectionImpl;
const sub = nci.protocol.subscriptions.all().find((s) => {
return s.subject === subj;
});
assertExists(sub);
assertEquals(sub.queue, q !== undefined ? q : "q");
}

Deno.test("service - custom queue group", async () => {
const { ns, nc } = await setup();
await testQueueName(nc, "a");
await testQueueName(nc, "b", "q1");
await assertRejects(
async () => {
await testQueueName(nc, "c", "one two");
},
Error,
"invalid queue name - queue name cannot contain ' '",
);
await assertRejects(
async () => {
await testQueueName(nc, "d", " ");
},
Error,
"invalid queue name - queue name cannot contain ' '",
);

await cleanup(ns, nc);
});

function getSubscriptionBySubject(
nc: NatsConnection,
subject: string,
): SubscriptionImpl | undefined {
const nci = nc as NatsConnectionImpl;
return nci.protocol.subscriptions.all().find((v) => {
return v.subject === subject;
});
}

function getEndpointInfo(
srv: ServiceImpl,
subject: string,
): EndpointInfo | undefined {
return srv.endpoints().find((v) => {
return v.subject === subject;
});
}

function checkQueueGroup(srv: Service, subj: string, queue: string) {
const service = srv as ServiceImpl;
const si = getSubscriptionBySubject(service.nc, subj);
assertExists(si);
assertEquals(si.queue, queue);
const ei = getEndpointInfo(service, subj);
assertExists(ei);
assertEquals(ei.queue_group, queue);
}

Deno.test("service - endpoint default queue group", async () => {
const { ns, nc } = await setup();
const srv = await nc.services.add({
name: "example",
version: "0.0.1",
metadata: { service: "1" },
}) as ServiceImpl;

// svc config doesn't specify a queue group so we expect q
srv.addEndpoint("a");
checkQueueGroup(srv, "a", "q");

// we add another group, no queue
const dg = srv.addGroup("G");
dg.addEndpoint("a");
checkQueueGroup(srv, "G.a", "q");

// now a group with a queue - we expect endpoints/and subgroups
// to use this unless they override
const g = srv.addGroup("g", "qq");
g.addEndpoint("a");
checkQueueGroup(srv, "g.a", "qq");
// override
g.addEndpoint("b", { queue: "bb" });
checkQueueGroup(srv, "g.b", "bb");
// add a subgroup without, should inherit
const g2 = g.addGroup("g");
g2.addEndpoint("a");
checkQueueGroup(srv, "g.g.a", "qq");
// and override
g2.addEndpoint("b", { queue: "bb" });
checkQueueGroup(srv, "g.g.b", "bb");

await cleanup(ns, nc);
});
Loading