Skip to content

Commit 6d93a1e

Browse files
fix: replace plimit with own promises limiter implementation to plug mem leak (#386)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org> Co-authored-by: Aras Abbasi <aras.abbasi@googlemail.com>
1 parent e63c4c5 commit 6d93a1e

File tree

12 files changed

+600
-89
lines changed

12 files changed

+600
-89
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
"eslint-plugin-perfectionist": "^4.15.1",
6161
"nano-staged": "^0.8.0",
6262
"neostandard": "^0.12.2",
63-
"p-limit": "^7.2.0",
6463
"simple-git-hooks": "^2.13.1",
6564
"size-limit": "^11.2.0",
6665
"tsdown": "^0.16.0",

pnpm-lock.yaml

Lines changed: 0 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/bench.ts

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import pLimit from 'p-limit'
2-
31
import type {
42
AddEventListenerOptionsArgument,
53
BenchEvents,
@@ -26,7 +24,7 @@ import {
2624
defaultConvertTaskResultForConsoleTable,
2725
invariant,
2826
type JSRuntime,
29-
now,
27+
performanceNow,
3028
runtime,
3129
runtimeVersion,
3230
} from './utils'
@@ -109,10 +107,13 @@ export class Bench extends EventTarget {
109107
this.name = name
110108
this.runtime = runtime
111109
this.runtimeVersion = runtimeVersion
110+
this.concurrency = restOptions.concurrency ?? null
111+
this.threshold = restOptions.threshold ?? Infinity
112+
112113
this.opts = {
113114
...{
114115
iterations: defaultMinimumIterations,
115-
now,
116+
now: performanceNow,
116117
setup: emptyFunction,
117118
teardown: emptyFunction,
118119
throws: false,
@@ -122,6 +123,8 @@ export class Bench extends EventTarget {
122123
warmupTime: defaultMinimumWarmupTime,
123124
},
124125
...restOptions,
126+
...(restOptions.iterations !== undefined && restOptions.time === undefined && { time: 0 }),
127+
...(restOptions.warmupIterations !== undefined && restOptions.warmupTime === undefined && { warmupTime: 0 }),
125128
}
126129

127130
if (this.opts.signal) {
@@ -195,15 +198,23 @@ export class Bench extends EventTarget {
195198
if (this.opts.warmup) {
196199
await this.#warmupTasks()
197200
}
198-
let values: Task[] = []
201+
199202
this.dispatchEvent(new BenchEvent('start'))
203+
204+
let values: Task[] = []
205+
200206
if (this.concurrency === 'bench') {
201-
values = await this.#mapTasksConcurrently(task => task.run())
207+
const taskPromises = []
208+
for (const task of this.#tasks.values()) {
209+
taskPromises.push(task.run())
210+
}
211+
values = await Promise.all(taskPromises)
202212
} else {
203213
for (const task of this.#tasks.values()) {
204214
values.push(await task.run())
205215
}
206216
}
217+
207218
this.dispatchEvent(new BenchEvent('complete'))
208219
return values
209220
}
@@ -240,39 +251,18 @@ export class Bench extends EventTarget {
240251
return this.tasks.map(convert)
241252
}
242253

243-
/**
244-
* Applies a worker function to all registered tasks using the concurrency limit.
245-
*
246-
* Scheduling is handled via p-limit with the current threshold. The returned array preserves
247-
* the iteration order of the tasks. If any scheduled worker function rejects, the returned promise
248-
* rejects with the first error after the scheduled worker functions settle, as per Promise.all semantics.
249-
*
250-
* Notes:
251-
* - Concurrency is controlled by Bench.threshold (Infinity means unlimited).
252-
* - No measurements are performed here; measurements happen inside Task.
253-
* - Used internally by run() and warmupTasks() when concurrency === 'bench'.
254-
* @template R The resolved type produced by the worker function for each task.
255-
* @param workerFn A function invoked for each Task; it must return a Promise<R>.
256-
* @returns Promise that resolves to an array of results in the same order as task iteration.
257-
*/
258-
async #mapTasksConcurrently<R>(
259-
workerFn: (task: Task) => Promise<R>
260-
): Promise<R[]> {
261-
const limit = pLimit(Math.max(1, Math.floor(this.threshold)))
262-
const promises: Promise<R>[] = []
263-
for (const task of this.#tasks.values()) {
264-
promises.push(limit(() => workerFn(task)))
265-
}
266-
return Promise.all(promises)
267-
}
268-
269254
/**
270255
* warmup the benchmark tasks.
271256
*/
272257
async #warmupTasks (): Promise<void> {
273258
this.dispatchEvent(new BenchEvent('warmup'))
259+
274260
if (this.concurrency === 'bench') {
275-
await this.#mapTasksConcurrently(task => task.warmup())
261+
const taskPromises = []
262+
for (const task of this.#tasks.values()) {
263+
taskPromises.push(task.warmup())
264+
}
265+
await Promise.all(taskPromises)
276266
} else {
277267
for (const task of this.#tasks.values()) {
278268
await task.warmup()

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ export type {
1616
TaskResult,
1717
} from './types'
1818
export type { JSRuntime } from './utils'
19-
export { hrtimeNow, now, nToMs } from './utils'
19+
export { hrtimeNow, performanceNow as now, nToMs } from './utils'

src/task.ts

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import pLimit from 'p-limit'
2-
31
import type { Bench } from './bench'
42
import type {
53
AddEventListenerOptionsArgument,
@@ -14,6 +12,7 @@ import type {
1412
} from './types'
1513

1614
import { BenchEvent } from './event'
15+
import { withConcurrency } from './utils'
1716
import {
1817
getStatisticsSorted,
1918
invariant,
@@ -181,11 +180,11 @@ export class Task extends EventTarget {
181180
this.#result = { state: 'started' }
182181
this.dispatchEvent(new BenchEvent('start', this))
183182
await this.#bench.opts.setup(this, 'run')
184-
const { error, samples: latencySamples } = (await this.#benchmark(
183+
const { error, samples: latencySamples } = await this.#benchmark(
185184
'run',
186185
this.#bench.opts.time,
187186
this.#bench.opts.iterations
188-
))
187+
)
189188
await this.#bench.opts.teardown(this, 'run')
190189

191190
this.#processRunResult({ error, latencySamples })
@@ -325,37 +324,33 @@ export class Task extends EventTarget {
325324
}
326325
}
327326
}
328-
329-
try {
330-
const promises: Promise<void>[] = [] // only for task level concurrency
331-
let limit: ReturnType<typeof pLimit> | undefined // only for task level concurrency
332-
333-
if (this.#bench.concurrency === 'task') {
334-
limit = pLimit(Math.max(1, Math.floor(this.#bench.threshold)))
327+
if (this.#bench.concurrency === 'task') {
328+
try {
329+
await withConcurrency({
330+
fn: benchmarkTask,
331+
iterations,
332+
limit: Math.max(1, Math.floor(this.#bench.threshold)),
333+
now: this.#bench.opts.now,
334+
signal: this.#signal ?? this.#bench.opts.signal,
335+
time,
336+
})
337+
} catch (error) {
338+
return { error: toError(error) }
335339
}
336-
337-
while (
338-
// eslint-disable-next-line no-unmodified-loop-condition
339-
(totalTime < time ||
340-
samples.length + (limit?.activeCount ?? 0) + (limit?.pendingCount ?? 0) < iterations) &&
341-
!this.#aborted
342-
) {
343-
if (this.#bench.concurrency === 'task') {
344-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
345-
promises.push((limit!)(benchmarkTask))
346-
} else {
340+
this.#runs = samples.length
341+
} else {
342+
try {
343+
while (
344+
// eslint-disable-next-line no-unmodified-loop-condition
345+
(totalTime < time ||
346+
samples.length < iterations) &&
347+
!this.#aborted
348+
) {
347349
await benchmarkTask()
348350
}
351+
} catch (error) {
352+
return { error: toError(error) }
349353
}
350-
if (!this.#aborted && promises.length > 0) {
351-
await Promise.all(promises)
352-
} else if (promises.length > 0) {
353-
// Abort path
354-
// eslint-disable-next-line no-void
355-
void Promise.allSettled(promises)
356-
}
357-
} catch (error) {
358-
return { error: toError(error) }
359354
}
360355

361356
if (this.#fnOpts.afterAll != null) {

src/types.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ export type BenchEventsWithTask = Extract<BenchEvents, 'add' | 'cycle' | 'error'
2929
* Bench options
3030
*/
3131
export interface BenchOptions {
32+
/**
33+
* Executes tasks concurrently based on the specified concurrency mode.
34+
*
35+
* - When `mode` is set to `null` (default), concurrency is disabled.
36+
* - When `mode` is set to 'task', each task's iterations (calls of a task function) run concurrently.
37+
* - When `mode` is set to 'bench', different tasks within the bench run concurrently.
38+
*/
39+
concurrency?: 'bench' | 'task' | null
40+
3241
/**
3342
* number of times that a task should run if even the time option is finished
3443
* @default 64
@@ -60,6 +69,12 @@ export interface BenchOptions {
6069
*/
6170
teardown?: Hook
6271

72+
/**
73+
* The maximum number of concurrent tasks to run
74+
* @default Infinity
75+
*/
76+
threshold?: number
77+
6378
/**
6479
* Throws if a task fails
6580
* @default false

src/utils.ts

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
// Portions copyright QuiiBz. 2023-2024. All Rights Reserved.
33

44
import type { Task } from './task'
5-
import type { ConsoleTableConverter, Fn, Statistics } from './types'
5+
import type {
6+
ConsoleTableConverter,
7+
Fn,
8+
Statistics,
9+
} from './types'
610

711
import { emptyFunction, tTable } from './constants'
812

@@ -194,7 +198,7 @@ const hrtimeBigint: () => bigint = typeof (globalThis as { process?: { hrtime?:
194198
*/
195199
export const hrtimeNow = () => nToMs(Number(hrtimeBigint()))
196200

197-
export const now = performance.now.bind(performance)
201+
export const performanceNow = performance.now.bind(performance)
198202

199203
/**
200204
* Checks if a value is a promise-like object.
@@ -506,3 +510,74 @@ export const defaultConvertTaskResultForConsoleTable: ConsoleTableConverter = (
506510
}
507511
/* eslint-enable perfectionist/sort-objects */
508512
}
513+
514+
interface WithConcurrencyOptions<R> {
515+
fn: () => Promise<R>
516+
iterations: number
517+
limit: number
518+
now?: () => number
519+
signal?: AbortSignal
520+
time?: number
521+
}
522+
523+
/**
524+
* Creates a concurrency limiter that can execute functions with a maximum concurrency limit.
525+
* @param options - The resource containing the function to execute and other options
526+
* @returns A promise that resolves to an array of results.
527+
* @throws {Error} if a single error occurs during execution
528+
* @throws {AggregateError} if multiple errors occur during execution
529+
*/
530+
export const withConcurrency = async <R>(options: WithConcurrencyOptions<R>): Promise<R[]> => {
531+
const { fn, iterations, limit, now = performanceNow, signal, time = 0 } = options
532+
533+
const maxWorkers = iterations === 0 ? limit : Math.max(0, Math.min(limit, iterations))
534+
535+
const errors: Error[] = []
536+
const results: R[] = []
537+
538+
let isRunning = true
539+
let nextIndex = 0
540+
541+
const hasTimeLimit = Number.isFinite(time) && time > 0
542+
const hasIterationsLimit = iterations > 0
543+
let targetTime = 0
544+
545+
// Reduce checks based on provided limits to avoid tainting the benchmark results
546+
const doNext: () => boolean = hasIterationsLimit
547+
? hasTimeLimit
548+
? () => isRunning && (nextIndex++ < iterations) && ((now() < targetTime) || (isRunning = false))
549+
: () => isRunning && (nextIndex++ < iterations)
550+
: hasTimeLimit
551+
? () => isRunning && ((now() < targetTime) || (isRunning = false))
552+
: () => isRunning
553+
554+
const pushResult = (r: R) => { isRunning && results.push(r) }
555+
const pushError = (e: unknown) => { errors.push(toError(e)) }
556+
557+
const onAbort = () => (isRunning = false)
558+
559+
if (signal) {
560+
if (signal.aborted) return []
561+
signal.addEventListener('abort', onAbort)
562+
}
563+
564+
const worker = async () => {
565+
while (doNext()) {
566+
try {
567+
pushResult(await fn())
568+
} catch (err) {
569+
isRunning = false
570+
pushError(err)
571+
break
572+
}
573+
}
574+
}
575+
576+
if (hasTimeLimit) targetTime = now() + time
577+
const promises = Array.from({ length: maxWorkers }, () => worker())
578+
await Promise.allSettled(promises)
579+
580+
if (errors.length === 0) return results
581+
if (errors.length === 1) throw toError(errors[0])
582+
throw new AggregateError(errors, 'Multiple errors occurred during concurrent execution')
583+
}

0 commit comments

Comments
 (0)