Skip to content

Commit

Permalink
feat: concurrency mode (#77)
Browse files Browse the repository at this point in the history
* init

* update

* update

* update

* p-limit

* p-limit

* readme
  • Loading branch information
Aslemammad committed Apr 21, 2024
1 parent 8e702ef commit 86866ab
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 46 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Expand Up @@ -17,6 +17,7 @@
"commonjs": true
},
"rules": {
"import/no-extraneous-dependencies": "off",
"no-restricted-syntax": "off",
"no-await-in-loop": "off",
"no-plusplus": "off",
Expand Down
21 changes: 20 additions & 1 deletion README.md
Expand Up @@ -143,8 +143,9 @@ export type Hook = (task: Task, mode: "warmup" | "run") => void | Promise<void>;
```

- `async run()`: run the added tasks that were registered using the `add` method
- `async runConcurrently(limit: number = Infinity)`: similar to the `run` method but runs concurrently rather than sequentially
- `async runConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: similar to the `run` method but runs concurrently rather than sequentially. See the [Concurrency](#Concurrency) section.
- `async warmup()`: warm up the benchmark tasks
- `async warmupConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: warm up the benchmark tasks concurrently
- `reset()`: reset each task and remove its result
- `add(name: string, fn: Fn, opts?: FnOpts)`: add a benchmark task to the task map
- `Fn`: `() => any | Promise<any>`
Expand Down Expand Up @@ -372,6 +373,24 @@ import { hrtimeNow } from 'tinybench';
```
It may make your benchmarks slower, check #42.

## Concurrency

- When `mode` is set to `null` (default), concurrency is disabled.
- When `mode` is set to 'task', each task's iterations (calls of a task function) run concurrently.
- When `mode` is set to 'bench', different tasks within the bench run concurrently. Concurrent cycles.

```ts
// options way (recommended)
bench.threshold = 10 // The maximum number of concurrent tasks to run. Defaults to Infinity.
bench.concurrency = "task" // The concurrency mode to determine how tasks are run.
// await bench.warmup()
await bench.run()

// standalone method way
// await bench.warmupConcurrently(10, "task")
await bench.runConcurrently(10, "task") // with runConcurrently, mode is set to 'bench' by default
```

## Prior art

- [Benchmark.js](https://github.com/bestiejs/benchmark.js)
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -39,6 +39,7 @@
"eslint-config-airbnb-base": "^15.0.0",
"eslint-plugin-import": "^2.26.0",
"nano-staged": "^0.5.0",
"p-limit": "^4.0.0",
"size-limit": "^7.0.8",
"tsup": "^5.11.7",
"typescript": "^5.2.2",
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 65 additions & 23 deletions src/bench.ts
@@ -1,3 +1,4 @@
import pLimit from 'p-limit';
import type {
Hook,
Options,
Expand All @@ -24,6 +25,20 @@ export default class Bench extends EventTarget {

_todos: Map<string, Task> = new Map();

/**
* Executes tasks concurrently based on the specified concurrency mode.
*
* - When `mode` is set to `null` (default), concurrency is disabled.
* - When `mode` is set to 'task', each task's iterations (calls of a task function) run concurrently.
* - When `mode` is set to 'bench', different tasks within the bench run concurrently.
*/
concurrency: 'task' | 'bench' | null = null;

/**
* The maximum number of concurrent tasks to run. Defaults to Infinity.
*/
threshold = Infinity;

signal?: AbortSignal;

throws: boolean;
Expand Down Expand Up @@ -67,7 +82,7 @@ export default class Bench extends EventTarget {
}
}

runTask(task: Task) {
private runTask(task: Task) {
if (this.signal?.aborted) return task;
return task.run();
}
Expand All @@ -77,7 +92,11 @@ export default class Bench extends EventTarget {
* {@link add} method.
* Note: This method does not do any warmup. Call {@link warmup} for that.
*/
async run() {
async run(): Promise<Task[]> {
if (this.concurrency === 'bench') {
// TODO: in the next major, we should remove runConcurrently
return this.runConcurrently(this.threshold, this.concurrency);
}
this.dispatchEvent(createBenchEvent('start'));
const values: Task[] = [];
for (const task of [...this._tasks.values()]) {
Expand All @@ -88,32 +107,26 @@ export default class Bench extends EventTarget {
}

/**
* similar to the {@link run} method but runs concurrently rather than sequentially
* default limit is Infinity
* See Bench.{@link concurrency}
*/
async runConcurrently(limit = Infinity) {
this.dispatchEvent(createBenchEvent('start'));
async runConcurrently(threshold = Infinity, mode: NonNullable<Bench['concurrency']> = 'bench'): Promise<Task[]> {
this.threshold = threshold;
this.concurrency = mode;

const remainingTasks = [...this._tasks.values()];
const values: Task[] = [];
if (mode === 'task') {
return this.run();
}

const handleConcurrency = async () => {
while (remainingTasks.length > 0) {
const runningTasks: (Promise<Task> | Task)[] = [];
this.dispatchEvent(createBenchEvent('start'));

// Start tasks up to the concurrency limit
while (runningTasks.length < limit && remainingTasks.length > 0) {
const task = remainingTasks.pop()!;
runningTasks.push(this.runTask(task));
}
const limit = pLimit(threshold);

// Wait for all running tasks to complete
const completedTasks = await Promise.all(runningTasks);
values.push(...completedTasks);
}
};
const promises: Promise<Task>[] = [];
for (const task of [...this._tasks.values()]) {
promises.push(limit(() => this.runTask(task)));
}

await handleConcurrency();
const values = await Promise.all(promises);

this.dispatchEvent(createBenchEvent('complete'));

Expand All @@ -124,13 +137,42 @@ export default class Bench extends EventTarget {
* warmup the benchmark tasks.
* This is not run by default by the {@link run} method.
*/
async warmup() {
async warmup(): Promise<void> {
if (this.concurrency === 'bench') {
// TODO: in the next major, we should remove *Concurrently methods
await this.warmupConcurrently(this.threshold, this.concurrency);
return;
}
this.dispatchEvent(createBenchEvent('warmup'));
for (const [, task] of this._tasks) {
await task.warmup();
}
}

/**
* warmup the benchmark tasks concurrently.
* This is not run by default by the {@link runConcurrently} method.
*/
async warmupConcurrently(threshold = Infinity, mode: NonNullable<Bench['concurrency']> = 'bench'): Promise<void> {
this.threshold = threshold;
this.concurrency = mode;

if (mode === 'task') {
await this.warmup();
return;
}

this.dispatchEvent(createBenchEvent('warmup'));
const limit = pLimit(threshold);
const promises: Promise<void>[] = [];

for (const [, task] of this._tasks) {
promises.push(limit(() => task.warmup()));
}

await Promise.all(promises);
}

/**
* reset each task and remove its result
*/
Expand Down
56 changes: 36 additions & 20 deletions src/task.ts
@@ -1,3 +1,4 @@
import pLimit from 'p-limit';
import type {
Fn,
TaskEvents,
Expand Down Expand Up @@ -52,6 +53,8 @@ export default class Task extends EventTarget {
}

private async loop(time: number, iterations: number): Promise<{ error?: unknown, samples?: number[] }> {
const concurrent = this.bench.concurrency === 'task';
const { threshold } = this.bench;
let totalTime = 0; // ms
const samples: number[] = [];
if (this.opts.beforeAll != null) {
Expand All @@ -63,33 +66,46 @@ export default class Task extends EventTarget {
}
const isAsync = await isAsyncTask(this);

const executeTask = async () => {
if (this.opts.beforeEach != null) {
await this.opts.beforeEach.call(this);
}

let taskTime = 0;
if (isAsync) {
const taskStart = this.bench.now();
await this.fn.call(this);
taskTime = this.bench.now() - taskStart;
} else {
const taskStart = this.bench.now();
this.fn.call(this);
taskTime = this.bench.now() - taskStart;
}

samples.push(taskTime);
totalTime += taskTime;

if (this.opts.afterEach != null) {
await this.opts.afterEach.call(this);
}
};

const limit = pLimit(threshold);
try {
const promises: Promise<void>[] = []; // only for task level concurrency
while (
(totalTime < time || samples.length < iterations)
(totalTime < time || ((samples.length + limit.activeCount + limit.pendingCount) < iterations))
&& !this.bench.signal?.aborted
) {
if (this.opts.beforeEach != null) {
await this.opts.beforeEach.call(this);
}

let taskTime = 0;
if (isAsync) {
const taskStart = this.bench.now();
await this.fn.call(this);
taskTime = this.bench.now() - taskStart;
if (concurrent) {
promises.push(limit(executeTask));
} else {
const taskStart = this.bench.now();
this.fn.call(this);
taskTime = this.bench.now() - taskStart;
}

samples.push(taskTime);
totalTime += taskTime;

if (this.opts.afterEach != null) {
await this.opts.afterEach.call(this);
await executeTask();
}
}
if (promises.length) {
await Promise.all(promises);
}
} catch (error) {
return { error };
}
Expand Down
69 changes: 67 additions & 2 deletions test/sequential.test.ts
Expand Up @@ -29,10 +29,11 @@ test('sequential', async () => {
expect(isFirstTaskDefined).toBe(true);
});

test('concurrent', async () => {
test.each(['warmup', 'run'])('%s concurrent (bench level)', async (mode) => {
const concurrentBench = new Bench({
time: 0,
iterations: 100,
throws: true,
});

let shouldBeDefined1: true;
Expand All @@ -52,7 +53,12 @@ test('concurrent', async () => {
shouldNotBeDefinedFirst2 = true;
});

concurrentBench.runConcurrently();
if (mode === 'warmup') {
concurrentBench.warmupConcurrently();
} else {
concurrentBench.runConcurrently();
}

await setTimeout(0);
expect(shouldBeDefined1!).toBeDefined();
expect(shouldBeDefined2!).toBeDefined();
Expand All @@ -62,3 +68,62 @@ test('concurrent', async () => {
expect(shouldNotBeDefinedFirst1!).toBeDefined();
expect(shouldNotBeDefinedFirst2!).toBeDefined();
});

test.each(['warmup', 'run'])('%s concurrent (task level)', async (mode) => {
const iterations = 10;
const concurrentBench = new Bench({
time: 0,
warmupTime: 0,
iterations,
warmupIterations: iterations,
});
const key = 'sample 1';

const runs = { value: 0 };
concurrentBench
.add(key, async () => {
runs.value++;
await setTimeout(10);
// all task function should be here after 10ms
expect(runs.value).toEqual(iterations);
await setTimeout(10);
});

if (mode === 'warmup') {
await concurrentBench.warmup();
} else {
await concurrentBench.run();
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
}
}
expect(concurrentBench.getTask(key)!.runs).toEqual(0);

concurrentBench.reset();
runs.value = 0;

if (mode === 'warmup') {
await concurrentBench.warmupConcurrently();
} else {
await concurrentBench.runConcurrently();
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
}
}
expect(concurrentBench.getTask(key)!.runs).toEqual(0);
concurrentBench.reset();
runs.value = 0;

if (mode === 'warmup') {
await concurrentBench.warmupConcurrently(Infinity, 'task');
expect(runs.value).toEqual(10);
} else {
await concurrentBench.runConcurrently(Infinity, 'task');

for (const result of concurrentBench.results) {
expect(result?.error).toBeUndefined();
}
expect(runs.value).toEqual(10);
expect(concurrentBench.getTask(key)!.runs).toEqual(10);
}
});

0 comments on commit 86866ab

Please sign in to comment.