Skip to content

Commit

Permalink
[bench] added flag to control pendingLimit
Browse files Browse the repository at this point in the history
[bench] added --rep, allowing --req/--rep to be run in different processes
[bench] changed Perf to use performance.now
  • Loading branch information
aricart committed May 31, 2023
1 parent ace0fec commit 12761ca
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 23 deletions.
27 changes: 24 additions & 3 deletions examples/bench.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const defaults = {
json: false,
csv: false,
csvheader: false,
pendingLimit: 32,
};

const argv = parse(
Expand All @@ -21,6 +22,7 @@ const argv = parse(
"d": ["debug"],
"p": ["payload"],
"i": ["iterations"],
"l": ["pendingLimit"],
},
default: defaults,
string: [
Expand All @@ -36,9 +38,9 @@ const argv = parse(
},
);

if (argv.h || argv.help || (!argv.sub && !argv.pub && !argv.req)) {
if (argv.h || argv.help || (!argv.sub && !argv.pub && !argv.req && !argv.rep)) {
console.log(
"usage: bench.ts [--json] [--csv] [--csvheader] [--callbacks] [--iterations <#loop: 1>] [--pub] [--sub] [--req (--asyncRequests)] [--count messages:1M] [--payload <#bytes>=128] [--server server] [--subject <subj>]\n",
"usage: bench.ts [--json] [--csv] [--csvheader] [--callbacks] [--iterations <#loop: 1>] [--pub] [--sub] [--req (--asyncRequests)] [--rep] [--count messages:1M] [--payload <#bytes>=128] [--server server] [--subject <subj>]\n",
);
Deno.exit(0);
}
Expand All @@ -47,6 +49,7 @@ const server = argv.server;
const count = parseInt(argv.count);
const bytes = parseInt(argv.payload);
const iters = parseInt(argv.iterations);
const pendingLimit = parseInt(argv.pendingLimit) * 1024;
const metrics = [];

for (let i = 0; i < iters; i++) {
Expand All @@ -59,13 +62,16 @@ for (let i = 0; i < iters; i++) {
pub: argv.pub,
sub: argv.sub,
req: argv.req,
rep: argv.rep,
subject: argv.subject,
};

nc.protocol.pendingLimit = pendingLimit;

const bench = new Bench(nc, opts);
const m = await bench.run();
metrics.push(...m);
await nc.close();
await nc.drain();
}

const reducer = (a, m) => {
Expand All @@ -90,6 +96,10 @@ if (!argv.json && !argv.csv) {
reducer,
new Metric("pubsub", 0),
);
const reqrep = metrics.filter((m) => m.name === "reqrep").reduce(
reducer,
new Metric("reqrep", 0),
);
const pub = metrics.filter((m) => m.name === "pub").reduce(
reducer,
new Metric("pub", 0),
Expand All @@ -103,9 +113,17 @@ if (!argv.json && !argv.csv) {
new Metric("req", 0),
);

const rep = metrics.filter((m) => m.name === "rep").reduce(
reducer,
new Metric("rep", 0),
);

if (pubsub && pubsub.msgs) {
console.log(pubsub.toString());
}
if (reqrep && reqrep.msgs) {
console.log(reqrep.toString());
}
if (pub && pub.msgs) {
console.log(pub.toString());
}
Expand All @@ -115,6 +133,9 @@ if (!argv.json && !argv.csv) {
if (req && req.msgs) {
console.log(req.toString());
}
if (rep && rep.msgs) {
console.log(rep.toString());
}
} else if (argv.json) {
console.log(JSON.stringify(metrics, null, 2));
} else if (argv.csv) {
Expand Down
77 changes: 58 additions & 19 deletions nats-base-client/bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,15 @@ export class Bench {
if (this.pub && this.sub) {
this.perf.measure("pubsub", "pubStart", "subStop");
}
if (this.req && this.rep) {
this.perf.measure("reqrep", "reqStart", "reqStop");
}

const measures = this.perf.getEntries();
const pubsub = measures.find((m) => m.name === "pubsub");
const reqrep = measures.find((m) => m.name === "reqrep");
const req = measures.find((m) => m.name === "req");
const rep = measures.find((m) => m.name === "rep");
const pub = measures.find((m) => m.name === "pub");
const sub = measures.find((m) => m.name === "sub");

Expand All @@ -175,6 +180,17 @@ export class Bench {
metrics.push(m);
}

if (reqrep) {
const { name, duration } = reqrep;
const m = new Metric(name, duration);
m.msgs = this.msgs * 2;
m.bytes = stats.inBytes + stats.outBytes;
m.lang = lang;
m.version = version;
m.payload = this.payload.length;
metrics.push(m);
}

if (pub) {
const { name, duration } = pub;
const m = new Metric(name, duration);
Expand All @@ -197,10 +213,21 @@ export class Bench {
metrics.push(m);
}

if (rep) {
const { name, duration } = rep;
const m = new Metric(name, duration);
m.msgs = this.msgs;
m.bytes = stats.inBytes + stats.outBytes;
m.lang = lang;
m.version = version;
m.payload = this.payload.length;
metrics.push(m);
}

if (req) {
const { name, duration } = req;
const m = new Metric(name, duration);
m.msgs = this.msgs * 2;
m.msgs = this.msgs;
m.bytes = stats.inBytes + stats.outBytes;
m.lang = lang;
m.version = version;
Expand All @@ -214,23 +241,6 @@ export class Bench {
async runCallbacks(): Promise<void> {
const jobs: Promise<void>[] = [];

if (this.req) {
const d = deferred<void>();
jobs.push(d);
const sub = this.nc.subscribe(
this.subject,
{
max: this.msgs,
callback: (_, m) => {
m.respond(this.payload);
if (sub.getProcessed() === this.msgs) {
d.resolve();
}
},
},
);
}

if (this.sub) {
const d = deferred<void>();
jobs.push(d);
Expand All @@ -251,6 +261,27 @@ export class Bench {
});
}

if (this.rep) {
const d = deferred<void>();
jobs.push(d);
let i = 0;
this.nc.subscribe(this.subject, {
max: this.msgs,
callback: (_, m) => {
m.respond(this.payload);
i++;
if (i === 1) {
this.perf.mark("repStart");
}
if (i === this.msgs) {
this.perf.mark("repStop");
this.perf.measure("rep", "repStart", "repStop");
d.resolve();
}
},
});
}

if (this.pub) {
const job = (async () => {
this.perf.mark("pubStart");
Expand Down Expand Up @@ -295,12 +326,20 @@ export class Bench {
async runAsync(): Promise<void> {
const jobs: Promise<void>[] = [];

if (this.req) {
if (this.rep) {
let first = false;
const sub = this.nc.subscribe(this.subject, { max: this.msgs });
const job = (async () => {
for await (const m of sub) {
if (!first) {
this.perf.mark("repStart");
first = true;
}
m.respond(this.payload);
}
await this.nc.flush();
this.perf.mark("repStop");
this.perf.measure("rep", "repStart", "repStop");
})();
jobs.push(job);
}
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export class Perf {
}

mark(key: string) {
this.timers.set(key, Date.now());
this.timers.set(key, performance.now());
}

measure(key: string, startKey: string, endKey: string) {
Expand Down

0 comments on commit 12761ca

Please sign in to comment.