From cf7beda51c91f8f74fe5804fd239380d709993b5 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <7019101+cwillisf@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:18:10 -0800 Subject: [PATCH 1/5] feat: add QueueManager to task-herder --- packages/task-herder/src/QueueManager.ts | 63 ++++++ packages/task-herder/src/TaskQueue.ts | 15 ++ packages/task-herder/src/index.ts | 1 + packages/task-herder/test/manager.test.ts | 239 ++++++++++++++++++++++ 4 files changed, 318 insertions(+) create mode 100644 packages/task-herder/src/QueueManager.ts create mode 100644 packages/task-herder/test/manager.test.ts diff --git a/packages/task-herder/src/QueueManager.ts b/packages/task-herder/src/QueueManager.ts new file mode 100644 index 0000000000..7bd7ae1f4d --- /dev/null +++ b/packages/task-herder/src/QueueManager.ts @@ -0,0 +1,63 @@ +import { TaskQueue, type QueueOptions } from './TaskQueue' + +/** + * Manages multiple named task queues with shared or individual configurations. Each queue can be accessed by a + * unique name or other identifier. For example, this can manage a separate queue for each of several different + * servers, using the server's DNS name as the key. + */ +export class QueueManager { + private readonly queues: Map + private readonly defaultOptions: QueueOptions + + /** + * @param defaultOptions The options that will be used to initialize each queue. + * These can be overridden later on a per-queue basis. + * @param iterable An optional iterable of key-value pairs to initialize the manager with existing queues. + */ + constructor(defaultOptions: QueueOptions, iterable?: Iterable | null) { + this.queues = new Map(iterable) + this.defaultOptions = defaultOptions + } + + /** + * Create a new task queue with the given identifier. If a queue with that identifier already exists, it will be + * replaced. If you need to cancel tasks in that queue before replacing it, do so manually first. + * @param id The identifier for the queue. + * @param overrides Optional overrides for the default QueueOptions for this specific queue. + * @returns The newly created TaskQueue. + */ + public create(id: T, overrides: Partial = {}): TaskQueue { + const queue = new TaskQueue({ ...this.defaultOptions, ...overrides }) + this.queues.set(id, queue) + return queue + } + + /** + * Get the task queue for the given identifier. + * @param id The identifier for the queue. + * @returns The TaskQueue queue associated with the given identifier, or undefined if none exists. + */ + public get(id: T): TaskQueue | undefined { + return this.queues.get(id) + } + + /** + * Get the task queue for the given identifier, creating it if it does not already exist. + * @param id The identifier for the queue. + * @param overrides Optional overrides for the default QueueOptions for this specific queue. Only used if the queue + * did not already exist. + * @returns The TaskQueue queue associated with the given identifier. + */ + public getOrCreate(id: T, overrides: Partial = {}): TaskQueue { + return this.get(id) ?? this.create(id, overrides) + } + + /** + * @returns A copy of the default queue options. Used primarily for testing and inspection. + */ + public options(): Readonly { + return { + ...this.defaultOptions, + } + } +} diff --git a/packages/task-herder/src/TaskQueue.ts b/packages/task-herder/src/TaskQueue.ts index 353e615e44..4d583cc1dc 100644 --- a/packages/task-herder/src/TaskQueue.ts +++ b/packages/task-herder/src/TaskQueue.ts @@ -50,6 +50,21 @@ export class TaskQueue { return this.pendingTaskRecords.length } + /** + * @returns The current configuration options of the queue. Used primarily for testing and inspection. + * Note that the `startingTokens` value returned here reflects the current token count, which is only guaranteed to + * match the originally configured starting tokens value if no time has passed and no tasks have been processed. + */ + get options(): Readonly { + return { + burstLimit: this.burstLimit, + sustainRate: this.sustainRate, + startingTokens: this.tokenCount, + queueCostLimit: this.queueCostLimit, + concurrency: this.concurrencyLimiter.concurrency, + } + } + /** * Adds a task to the queue. The task will first wait until enough tokens are available, then will wait its turn in * the concurrency queue. diff --git a/packages/task-herder/src/index.ts b/packages/task-herder/src/index.ts index be8af10984..bc606dca61 100644 --- a/packages/task-herder/src/index.ts +++ b/packages/task-herder/src/index.ts @@ -1,2 +1,3 @@ export { CancelReason } from './CancelReason' export { type QueueOptions, TaskQueue } from './TaskQueue' +export { QueueManager } from './QueueManager' diff --git a/packages/task-herder/test/manager.test.ts b/packages/task-herder/test/manager.test.ts new file mode 100644 index 0000000000..d0308dff08 --- /dev/null +++ b/packages/task-herder/test/manager.test.ts @@ -0,0 +1,239 @@ +import { describe, expect, it } from 'vitest' +import { QueueManager } from '../src/QueueManager' +import { TaskQueue } from '../src/TaskQueue' + +// This suite verifies QueueManager's API without executing queue tasks. + +describe('QueueManager', () => { + describe('constructor', () => { + it('should create a manager with default options', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 3, + }) + expect(manager).toBeInstanceOf(QueueManager) + + const options = manager.options() + expect(options).toEqual({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 3, + }) + }) + + it('should accept an iterable of existing queues', () => { + const existingQueue = new TaskQueue({ + burstLimit: 10, + sustainRate: 100, + }) + const manager = new QueueManager({ burstLimit: 5, sustainRate: 50 }, [['existing', existingQueue]]) + const retrieved = manager.get('existing') + expect(retrieved).toBe(existingQueue) + expect(retrieved?.options).toEqual({ + burstLimit: 10, + concurrency: 1, + queueCostLimit: Infinity, + startingTokens: 10, + sustainRate: 100, + }) + }) + }) + + describe('create()', () => { + it('should create a queue with default options when no overrides given', () => { + const manager = new QueueManager({ + burstLimit: 2, + sustainRate: 1000, + startingTokens: 1, + concurrency: 1, + }) + + const queue = manager.create('test') + expect(queue).toBeInstanceOf(TaskQueue) + expect(queue.options).toEqual({ + burstLimit: 2, + sustainRate: 1000, + startingTokens: 1, + queueCostLimit: Infinity, + concurrency: 1, + }) + }) + + it('should override default options when overrides are provided', () => { + const manager = new QueueManager({ + burstLimit: 2, + sustainRate: 1000, + startingTokens: 1, + }) + + // Override startingTokens to 5 and allow two concurrent tasks + const queue = manager.create('test', { startingTokens: 5, concurrency: 2 }) + expect(queue.options).toEqual({ + burstLimit: 2, + sustainRate: 1000, + startingTokens: 5, + queueCostLimit: Infinity, + concurrency: 2, + }) + }) + + it('should replace an existing queue with the same id', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 1000, + }) + + const queue1 = manager.create('test') + // Create a new queue with the same id - should replace + const queue2 = manager.create('test') + expect(queue2).not.toBe(queue1) + expect(manager.get('test')).toBe(queue2) + }) + }) + + describe('get()', () => { + it('should return a queue that was created', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 100, + }) + + const created = manager.create('test') + const retrieved = manager.get('test') + + expect(retrieved).toBe(created) + }) + + it('should return undefined for a non-existent queue', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 100, + }) + + const result = manager.get('nonExistent') + expect(result).toBeUndefined() + }) + + it('should work with non-string keys', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 100, + }) + + const queue = manager.create(42) + const retrieved = manager.get(42) + expect(retrieved).toBe(queue) + }) + }) + + describe('getOrCreate()', () => { + it('should return existing queue if one exists', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 3, + }) + + const queue1 = manager.create('test') + // getOrCreate should return the same queue + const queue2 = manager.getOrCreate('test', { startingTokens: 10 }) + expect(queue2).toBe(queue1) + }) + + it('should create new queue with default options if it does not exist', () => { + const manager = new QueueManager({ + burstLimit: 3, + sustainRate: 1000, + startingTokens: 2, + concurrency: 2, + }) + + const queue = manager.getOrCreate('test') + expect(queue.options).toEqual({ + burstLimit: 3, + sustainRate: 1000, + startingTokens: 2, + queueCostLimit: Infinity, + concurrency: 2, + }) + }) + + it('should create new queue with overrides if it does not exist', () => { + const manager = new QueueManager({ + burstLimit: 3, + sustainRate: 1000, + startingTokens: 0, + }) + + // Override to give 10 starting tokens and allow two concurrent tasks + const queue = manager.getOrCreate('test', { startingTokens: 10, concurrency: 2 }) + expect(queue.options).toEqual({ + burstLimit: 3, + sustainRate: 1000, + startingTokens: 10, + queueCostLimit: Infinity, + concurrency: 2, + }) + }) + + it('should ignore overrides if queue already exists', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 1, + concurrency: 1, + }) + + const queue1 = manager.create('test') + // getOrCreate with different overrides should return existing queue + const queue2 = manager.getOrCreate('test', { startingTokens: 10 }) + expect(queue2).toBe(queue1) + // Options should still match original queue (startingTokens=1) + expect(queue1.options.startingTokens).toBe(1) + }) + }) + + describe('multiple queues', () => { + it('should manage multiple independent queues', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 2, + concurrency: 2, + }) + + const queue1 = manager.create('queue1') + const queue2 = manager.create('queue2') + expect(queue1.options).toEqual({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 2, + queueCostLimit: Infinity, + concurrency: 2, + }) + expect(queue2.options).toEqual({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 2, + queueCostLimit: Infinity, + concurrency: 2, + }) + expect(queue1).not.toBe(queue2) + }) + + it('should allow different overrides per queue', () => { + const manager = new QueueManager({ + burstLimit: 5, + sustainRate: 1000, + startingTokens: 1, + concurrency: 1, + }) + + const queue1 = manager.create('queue1', { startingTokens: 0 }) + const queue2 = manager.create('queue2', { startingTokens: 5 }) + expect(queue1.options.startingTokens).toBe(0) + expect(queue2.options.startingTokens).toBe(5) + }) + }) +}) From 40473adcb7dec566b6478a2bebc1a7ea6f9b9570 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <7019101+cwillisf@users.noreply.github.com> Date: Wed, 3 Dec 2025 10:01:45 -0800 Subject: [PATCH 2/5] build: pick up vendor chunks from scratch-storage --- packages/scratch-gui/webpack.config.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/scratch-gui/webpack.config.js b/packages/scratch-gui/webpack.config.js index 34e2eda2e3..f1c4d9c129 100644 --- a/packages/scratch-gui/webpack.config.js +++ b/packages/scratch-gui/webpack.config.js @@ -91,6 +91,11 @@ const baseConfig = new ScratchWebpackConfigBuilder( from: 'chunks/fetch-worker.*.{js,js.map}', noErrorOnMissing: true }, + { + context: '../../node_modules/scratch-storage/dist/web', + from: 'chunks/vendors-*.{js,js.map}', + noErrorOnMissing: true + }, { from: '../../node_modules/@mediapipe/face_detection', to: 'chunks/mediapipe/face_detection' From 0f92cdeb81480418ec1b3e0a7324b534a6683753 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <7019101+cwillisf@users.noreply.github.com> Date: Wed, 3 Dec 2025 10:04:10 -0800 Subject: [PATCH 3/5] fix: fix and simplify concurrency logic --- package-lock.json | 30 ------------- packages/task-herder/package.json | 3 -- packages/task-herder/src/TaskQueue.ts | 64 +++++++++++++++------------ 3 files changed, 36 insertions(+), 61 deletions(-) diff --git a/package-lock.json b/package-lock.json index 8c2b76c3f1..6b3c6fda04 100644 --- a/package-lock.json +++ b/package-lock.json @@ -42423,9 +42423,6 @@ "name": "@scratch/task-herder", "version": "0.0.0", "license": "AGPL-3.0-only", - "dependencies": { - "p-limit": "7.2.0" - }, "devDependencies": { "@vitest/coverage-v8": "4.0.14", "eslint": "9.39.1", @@ -42559,33 +42556,6 @@ "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", "dev": true, "license": "MIT" - }, - "packages/task-herder/node_modules/p-limit": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-7.2.0.tgz", - "integrity": "sha512-ATHLtwoTNDloHRFFxFJdHnG6n2WUeFjaR8XQMFdKIv0xkXjrER8/iG9iu265jOM95zXHAfv9oTkqhrfbIzosrQ==", - "license": "MIT", - "dependencies": { - "yocto-queue": "^1.2.1" - }, - "engines": { - "node": ">=20" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, - "packages/task-herder/node_modules/yocto-queue": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.1.tgz", - "integrity": "sha512-AyeEbWOu/TAXdxlV9wmGcR0+yh2j3vYPGOECcIj2S7MkrLyC7ne+oye2BKTItt0ii2PHk4cDy+95+LshzbXnGg==", - "license": "MIT", - "engines": { - "node": ">=12.20" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } } } } diff --git a/packages/task-herder/package.json b/packages/task-herder/package.json index 5bd8bd0107..daac34aeef 100644 --- a/packages/task-herder/package.json +++ b/packages/task-herder/package.json @@ -48,8 +48,5 @@ }, "overrides": { "vite": "npm:rolldown-vite@7.2.9" - }, - "dependencies": { - "p-limit": "7.2.0" } } diff --git a/packages/task-herder/src/TaskQueue.ts b/packages/task-herder/src/TaskQueue.ts index 4d583cc1dc..9e455d0b44 100644 --- a/packages/task-herder/src/TaskQueue.ts +++ b/packages/task-herder/src/TaskQueue.ts @@ -1,5 +1,5 @@ -import pLimit, { type LimitFunction } from 'p-limit' import { CancelReason } from './CancelReason' +import { PromiseWithResolvers } from './PromiseWithResolvers' import { TaskRecord, type TaskOptions } from './TaskRecord' export interface QueueOptions { @@ -29,20 +29,22 @@ export class TaskQueue { private readonly burstLimit: number private readonly sustainRate: number private readonly queueCostLimit: number - private readonly concurrencyLimiter: LimitFunction - private readonly boundRunTasks = this.runTasks.bind(this) + private readonly concurrencyLimit: number private tokenCount: number + private runningTasks = 0 private pendingTaskRecords: TaskRecord[] = [] - private timeout: number | null = null private lastRefillTime: number = Date.now() + private onTaskAdded = PromiseWithResolvers().resolve // start with a no-op of correct type + private onTaskFinished = PromiseWithResolvers().resolve // start with a no-op of correct type constructor(options: QueueOptions) { this.burstLimit = options.burstLimit this.sustainRate = options.sustainRate this.tokenCount = options.startingTokens ?? options.burstLimit this.queueCostLimit = options.queueCostLimit ?? Infinity - this.concurrencyLimiter = pLimit(options.concurrency ?? 1) + this.concurrencyLimit = options.concurrency ?? 1 + void this.runTasks() } /** @returns The number of tasks currently in the queue */ @@ -61,7 +63,7 @@ export class TaskQueue { sustainRate: this.sustainRate, startingTokens: this.tokenCount, queueCostLimit: this.queueCostLimit, - concurrency: this.concurrencyLimiter.concurrency, + concurrency: this.concurrencyLimit, } } @@ -92,10 +94,7 @@ export class TaskQueue { this.cancel(taskRecord.promise, new Error(CancelReason.Aborted)) }) - // If the queue was empty, we need to prime the pump - if (this.pendingTaskRecords.length === 1) { - void this.runTasks() - } + this.onTaskAdded() return taskRecord.promise } @@ -111,9 +110,6 @@ export class TaskQueue { if (taskIndex !== -1) { const [taskRecord] = this.pendingTaskRecords.splice(taskIndex, 1) taskRecord.cancel(reason ?? new Error(CancelReason.Cancel)) - if (taskIndex === 0 && this.pendingTaskRecords.length > 0) { - void this.runTasks() - } return true } return false @@ -125,10 +121,6 @@ export class TaskQueue { * @returns The number of tasks that were cancelled. */ cancelAll(reason?: Error): number { - if (this.timeout !== null) { - clearTimeout(this.timeout) - this.timeout = null - } const oldTasks = this.pendingTaskRecords this.pendingTaskRecords = [] reason = reason ?? new Error(CancelReason.Cancel) @@ -179,17 +171,15 @@ export class TaskQueue { /** * Run tasks from the queue as tokens become available. */ - private runTasks(): void { - if (this.timeout !== null) { - clearTimeout(this.timeout) - this.timeout = null - } - + private async runTasks(): Promise { for (;;) { const nextRecord = this.pendingTaskRecords.shift() if (!nextRecord) { // No more tasks to run - return + const { promise, resolve } = PromiseWithResolvers() + this.onTaskAdded = resolve + await promise // wait until a task is added + continue // then try again } if (nextRecord.cost > this.burstLimit) { @@ -200,16 +190,34 @@ export class TaskQueue { // Refill before each task in case the time it took for the last task to run was enough to afford the next. if (this.refillAndSpend(nextRecord.cost)) { - // Run the task within the concurrency limiter - void this.concurrencyLimiter(nextRecord.run) + if (this.runningTasks >= this.concurrencyLimit) { + const { promise, resolve } = PromiseWithResolvers() + this.onTaskFinished = resolve + await promise // wait until a task finishes + // then we know there's room for at least one more task + } + void this.runTask(nextRecord) } else { // We can't currently afford this task. Put it back and wait until we can, then try again. this.pendingTaskRecords.unshift(nextRecord) const tokensNeeded = Math.max(nextRecord.cost - this.tokenCount, 0) const estimatedWait = Math.ceil((1000 * tokensNeeded) / this.sustainRate) - this.timeout = setTimeout(this.boundRunTasks, estimatedWait) - return + await new Promise(resolve => setTimeout(resolve, estimatedWait)) } } } + + /** + * Run a task record right now, managing the running tasks count. + * @param taskRecord The task that should run. + */ + private async runTask(taskRecord: TaskRecord): Promise { + this.runningTasks++ + try { + await taskRecord.run() + } finally { + this.runningTasks-- + this.onTaskFinished() + } + } } From 1be24de9effde01970778d139a9443526cd0febe Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <7019101+cwillisf@users.noreply.github.com> Date: Wed, 3 Dec 2025 14:12:29 -0800 Subject: [PATCH 4/5] fix: mark loadBitmap canvas as willReadFrequently This fixes a Chrome crash when loading assets with moderately high concurrency. It's also allegedly a good idea for performance. --- packages/scratch-render/src/BitmapSkin.js | 7 ++++--- packages/scratch-vm/src/import/load-costume.js | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/scratch-render/src/BitmapSkin.js b/packages/scratch-render/src/BitmapSkin.js index 7aa7096d70..fda96a8325 100644 --- a/packages/scratch-render/src/BitmapSkin.js +++ b/packages/scratch-render/src/BitmapSkin.js @@ -69,9 +69,10 @@ class BitmapSkin extends Skin { // memory. let textureData = bitmapData; if (bitmapData instanceof HTMLCanvasElement) { - // Given a HTMLCanvasElement get the image data to pass to webgl and - // Silhouette. - const context = bitmapData.getContext('2d'); + // Given a HTMLCanvasElement, get the image data to pass to WebGL and the Silhouette. The 2D context was + // likely already created, so `willReadFrequently` is likely ignored here, but it's good documentation and + // may help if the code path changes someday. + const context = bitmapData.getContext('2d', {willReadFrequently: true}); textureData = context.getImageData(0, 0, bitmapData.width, bitmapData.height); } diff --git a/packages/scratch-vm/src/import/load-costume.js b/packages/scratch-vm/src/import/load-costume.js index e430ad3157..25dba39465 100644 --- a/packages/scratch-vm/src/import/load-costume.js +++ b/packages/scratch-vm/src/import/load-costume.js @@ -145,7 +145,7 @@ const fetchBitmapCanvas_ = function (costume, runtime, rotationCenter) { mergeCanvas.width = baseImageElement.width; mergeCanvas.height = baseImageElement.height; - const ctx = mergeCanvas.getContext('2d'); + const ctx = mergeCanvas.getContext('2d', {willReadFrequently: true}); ctx.drawImage(baseImageElement, 0, 0); if (textImageElement) { ctx.drawImage(textImageElement, 0, 0); From 6082f36a9b044bea09e57ba9f7f782331ad45993 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <7019101+cwillisf@users.noreply.github.com> Date: Thu, 4 Dec 2025 10:32:12 -0800 Subject: [PATCH 5/5] docs: fix comment grammar --- packages/task-herder/src/QueueManager.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/task-herder/src/QueueManager.ts b/packages/task-herder/src/QueueManager.ts index 7bd7ae1f4d..2ae54b57ba 100644 --- a/packages/task-herder/src/QueueManager.ts +++ b/packages/task-herder/src/QueueManager.ts @@ -35,7 +35,7 @@ export class QueueManager { /** * Get the task queue for the given identifier. * @param id The identifier for the queue. - * @returns The TaskQueue queue associated with the given identifier, or undefined if none exists. + * @returns The TaskQueue associated with the given identifier, or undefined if none exists. */ public get(id: T): TaskQueue | undefined { return this.queues.get(id) @@ -46,7 +46,7 @@ export class QueueManager { * @param id The identifier for the queue. * @param overrides Optional overrides for the default QueueOptions for this specific queue. Only used if the queue * did not already exist. - * @returns The TaskQueue queue associated with the given identifier. + * @returns The TaskQueue associated with the given identifier. */ public getOrCreate(id: T, overrides: Partial = {}): TaskQueue { return this.get(id) ?? this.create(id, overrides)