From 6cfac9a2a03512b0a226a9f7211b8fc177978d81 Mon Sep 17 00:00:00 2001 From: thinknathan Date: Sun, 4 Feb 2024 16:01:35 -0800 Subject: [PATCH 1/5] Refactor threads --- slice.cjs | 13 ++++--- src/slice.ts | 14 ++++--- src/utils/processImage.ts | 19 ++++++---- src/utils/processPath.ts | 11 ++++-- src/utils/workerPool.ts | 78 ++++++++++++++++++++++++++++++--------- utils/processImage.js | 12 +++--- utils/processPath.js | 5 ++- utils/workerPool.js | 65 ++++++++++++++++++++++++-------- 8 files changed, 155 insertions(+), 62 deletions(-) diff --git a/slice.cjs b/slice.cjs index fa8cb6e..02587bb 100644 --- a/slice.cjs +++ b/slice.cjs @@ -5,9 +5,10 @@ const yargs = require('yargs'); const os = require('os'); const processImage_1 = require('./utils/processImage'); const processPath_1 = require('./utils/processPath'); -function main() { +async function main() { + console.time('Done in'); // Parse command line arguments - const options = yargs + const options = await yargs(process.argv.slice(2)) .option('f', { alias: 'filename', describe: 'Input image filename', @@ -94,7 +95,8 @@ function main() { 'Uses bicubic interpolation instead of nearest neighbour if rescaling', type: 'boolean', default: false, - }).argv; + }) + .parse(); if (options.filename) { // Process a single image (0, processImage_1.sliceImage)(options); @@ -107,12 +109,13 @@ function main() { console.error(err); } numCores = Math.max(numCores - 1, 1); // Min 1 - numCores = Math.min(numCores, 16); // Max 16 - (0, processPath_1.processPath)(options.folderPath, options, numCores); + numCores = Math.min(numCores, 32); // Max 32 + await (0, processPath_1.processPath)(options.folderPath, options, numCores); } else { console.error( 'Error: Requires either `filename` or `folderPath`. Run `slice --help` for help.', ); } + console.timeEnd('Done in'); } main(); diff --git a/src/slice.ts b/src/slice.ts index d368427..34e5bfc 100644 --- a/src/slice.ts +++ b/src/slice.ts @@ -6,9 +6,11 @@ import * as os from 'os'; import { sliceImage } from './utils/processImage'; import { processPath } from './utils/processPath'; -function main() { +async function main() { + console.time('Done in'); + // Parse command line arguments - const options = yargs + const options = (await yargs(process.argv.slice(2)) .option('f', { alias: 'filename', describe: 'Input image filename', @@ -95,7 +97,8 @@ function main() { 'Uses bicubic interpolation instead of nearest neighbour if rescaling', type: 'boolean', default: false, - }).argv as unknown as Options; + }) + .parse()) as unknown as Options; if (options.filename) { // Process a single image @@ -109,13 +112,14 @@ function main() { console.error(err); } numCores = Math.max(numCores - 1, 1); // Min 1 - numCores = Math.min(numCores, 16); // Max 16 - processPath(options.folderPath, options, numCores); + numCores = Math.min(numCores, 32); // Max 32 + await processPath(options.folderPath, options, numCores); } else { console.error( 'Error: Requires either `filename` or `folderPath`. Run `slice --help` for help.', ); } + console.timeEnd('Done in'); } main(); diff --git a/src/utils/processImage.ts b/src/utils/processImage.ts index 724804c..8b5d3ba 100644 --- a/src/utils/processImage.ts +++ b/src/utils/processImage.ts @@ -1,7 +1,7 @@ import * as Jimp from 'jimp'; import * as fs from 'fs'; import * as path from 'path'; -import { workerData, isMainThread } from 'worker_threads'; +import { parentPort, isMainThread } from 'worker_threads'; function errorCallback(err: unknown) { if (err) { @@ -13,7 +13,6 @@ function errorCallback(err: unknown) { * Function to slice an image into smaller segments */ export function sliceImage(options: Options, skipExtCheck?: boolean): void { - console.time('Done in'); const { filename } = options; Jimp.read(filename!) .then((image) => { @@ -134,13 +133,19 @@ function continueSlicing(image: Jimp, options: Options): void { console.log(`Slice saved: ${outputFilename}`); } } - console.timeEnd('Done in'); } // If used as a worker thread, get file name from message if (!isMainThread) { - const { filePath, options } = workerData; - options.filename = filePath; - - sliceImage(options, true); + const workIsDone = () => parentPort?.postMessage('complete'); + + parentPort?.on( + 'message', + async (message: { filePath: string; options: Options }) => { + const { filePath, options } = message; + options.filename = filePath; + sliceImage(options, true); + workIsDone(); + }, + ); } diff --git a/src/utils/processPath.ts b/src/utils/processPath.ts index 7772b18..37a2c90 100644 --- a/src/utils/processPath.ts +++ b/src/utils/processPath.ts @@ -9,7 +9,7 @@ export async function processPath( directoryPath: string, options: Options, maxWorkers: number, -): Promise { +): Promise { const workerPool = new WorkerPool(maxWorkers); try { @@ -24,10 +24,13 @@ export async function processPath( workerPool.addTask(filePath, options); } } - - // Wait for all tasks to complete before exiting - workerPool.waitForCompletion(); } catch (err) { console.error(`Error reading directory: ${directoryPath}`, err); } + + await workerPool.allComplete(); + + workerPool.exitAll(); + + return true; } diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index f7b844c..3c6c613 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -1,13 +1,48 @@ import { Worker } from 'worker_threads'; import * as path from 'path'; +type TWorker = Worker & { isIdle: boolean }; + /** * Manages a pool of worker threads for parallel processing of image files. */ export class WorkerPool { - private workers: Worker[] = []; + private workers: TWorker[] = []; private taskQueue: { filePath: string; options: Options }[] = []; private maxWorkers: number; + private completePromise?: Promise; + private completeResolve?: () => void; + private isComplete(): boolean { + return ( + this.taskQueue.length === 0 && + this.workers.every((worker) => worker.isIdle) + ); + } + + /** + * Terminate all workers in the pool. + */ + public exitAll(): void { + this.workers.forEach((worker) => worker.terminate()); + this.workers = []; + } + + /** + * Returns a promise that resolves when all work is done. + */ + public async allComplete(): Promise { + if (this.isComplete()) { + return Promise.resolve(); + } + + if (!this.completePromise) { + this.completePromise = new Promise((resolve) => { + this.completeResolve = resolve; + }); + } + + return this.completePromise; + } /** * Creates a new WorkerPool instance. @@ -25,18 +60,22 @@ export class WorkerPool { * @param options - Image processing options for the file. */ private createWorker(filePath: string, options: Options): void { - const worker = new Worker(path.join(__dirname, 'processImage.js'), { - workerData: { filePath, options }, - }); + const worker = new Worker( + path.join(__dirname, 'processImage.js'), + ) as TWorker; + + worker.postMessage({ filePath, options }); + worker.isIdle = false; // Listen for messages and errors from the worker - worker.on('message', (message) => { - console.log(message); + worker.on('message', () => { + worker.isIdle = true; this.processNextTask(); }); worker.on('error', (err) => { console.error(`Error in worker for file ${filePath}:`, err); + worker.isIdle = true; this.processNextTask(); }); @@ -49,7 +88,21 @@ export class WorkerPool { private processNextTask(): void { const nextTask = this.taskQueue.shift(); if (nextTask) { - this.createWorker(nextTask.filePath, nextTask.options); + if (this.workers.length < this.maxWorkers) { + this.createWorker(nextTask.filePath, nextTask.options); + } else { + const worker = this.workers.find((w) => w.isIdle); + if (worker) { + worker.postMessage(nextTask); + } else { + // Something went wrong, there are no idle workers somehow + throw Error('Could not find an idle worker.'); + } + } + } else if (this.isComplete() && this.completeResolve) { + this.completeResolve(); + this.completePromise = undefined; + this.completeResolve = undefined; } } @@ -66,15 +119,4 @@ export class WorkerPool { this.taskQueue.push({ filePath, options }); } } - - /** - * Waits for all tasks to complete before exiting. - */ - public waitForCompletion(): void { - this.workers.forEach((worker) => { - worker.on('exit', () => { - this.processNextTask(); - }); - }); - } } diff --git a/utils/processImage.js b/utils/processImage.js index 0941c82..1bab905 100644 --- a/utils/processImage.js +++ b/utils/processImage.js @@ -14,7 +14,6 @@ function errorCallback(err) { * Function to slice an image into smaller segments */ function sliceImage(options, skipExtCheck) { - console.time('Done in'); const { filename } = options; Jimp.read(filename) .then((image) => { @@ -110,11 +109,14 @@ function continueSlicing(image, options) { console.log(`Slice saved: ${outputFilename}`); } } - console.timeEnd('Done in'); } // If used as a worker thread, get file name from message if (!worker_threads_1.isMainThread) { - const { filePath, options } = worker_threads_1.workerData; - options.filename = filePath; - sliceImage(options, true); + const workIsDone = () => worker_threads_1.parentPort?.postMessage('complete'); + worker_threads_1.parentPort?.on('message', async (message) => { + const { filePath, options } = message; + options.filename = filePath; + sliceImage(options, true); + workIsDone(); + }); } diff --git a/utils/processPath.js b/utils/processPath.js index e62b879..950e255 100644 --- a/utils/processPath.js +++ b/utils/processPath.js @@ -19,11 +19,12 @@ async function processPath(directoryPath, options, maxWorkers) { workerPool.addTask(filePath, options); } } - // Wait for all tasks to complete before exiting - workerPool.waitForCompletion(); } catch (err) { console.error(`Error reading directory: ${directoryPath}`, err); } + await workerPool.allComplete(); + workerPool.exitAll(); + return true; } exports.processPath = processPath; diff --git a/utils/workerPool.js b/utils/workerPool.js index 508f5f0..b6d628f 100644 --- a/utils/workerPool.js +++ b/utils/workerPool.js @@ -7,6 +7,31 @@ const path = require("path"); * Manages a pool of worker threads for parallel processing of image files. */ class WorkerPool { + isComplete() { + return (this.taskQueue.length === 0 && + this.workers.every((worker) => worker.isIdle)); + } + /** + * Terminate all workers in the pool. + */ + exitAll() { + this.workers.forEach((worker) => worker.terminate()); + this.workers = []; + } + /** + * Returns a promise that resolves when all work is done. + */ + async allComplete() { + if (this.isComplete()) { + return Promise.resolve(); + } + if (!this.completePromise) { + this.completePromise = new Promise((resolve) => { + this.completeResolve = resolve; + }); + } + return this.completePromise; + } /** * Creates a new WorkerPool instance. * @@ -24,16 +49,17 @@ class WorkerPool { * @param options - Image processing options for the file. */ createWorker(filePath, options) { - const worker = new worker_threads_1.Worker(path.join(__dirname, 'processImage.js'), { - workerData: { filePath, options }, - }); + const worker = new worker_threads_1.Worker(path.join(__dirname, 'processImage.js')); + worker.postMessage({ filePath, options }); + worker.isIdle = false; // Listen for messages and errors from the worker - worker.on('message', (message) => { - console.log(message); + worker.on('message', () => { + worker.isIdle = true; this.processNextTask(); }); worker.on('error', (err) => { console.error(`Error in worker for file ${filePath}:`, err); + worker.isIdle = true; this.processNextTask(); }); this.workers.push(worker); @@ -44,7 +70,24 @@ class WorkerPool { processNextTask() { const nextTask = this.taskQueue.shift(); if (nextTask) { - this.createWorker(nextTask.filePath, nextTask.options); + if (this.workers.length < this.maxWorkers) { + this.createWorker(nextTask.filePath, nextTask.options); + } + else { + const worker = this.workers.find((w) => w.isIdle); + if (worker) { + worker.postMessage(nextTask); + } + else { + // Something went wrong, there are no idle workers somehow + throw Error('Could not find an idle worker.'); + } + } + } + else if (this.isComplete() && this.completeResolve) { + this.completeResolve(); + this.completePromise = undefined; + this.completeResolve = undefined; } } /** @@ -61,15 +104,5 @@ class WorkerPool { this.taskQueue.push({ filePath, options }); } } - /** - * Waits for all tasks to complete before exiting. - */ - waitForCompletion() { - this.workers.forEach((worker) => { - worker.on('exit', () => { - this.processNextTask(); - }); - }); - } } exports.WorkerPool = WorkerPool; From ad08124fd43f9e8e47a190a0663c214f14d52382 Mon Sep 17 00:00:00 2001 From: thinknathan Date: Sun, 4 Feb 2024 16:48:26 -0800 Subject: [PATCH 2/5] Fix idle check --- src/utils/processImage.ts | 9 ++++++--- src/utils/workerPool.ts | 3 ++- utils/processImage.js | 6 ++++-- utils/workerPool.js | 3 ++- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/utils/processImage.ts b/src/utils/processImage.ts index 8b5d3ba..b3aca74 100644 --- a/src/utils/processImage.ts +++ b/src/utils/processImage.ts @@ -9,6 +9,8 @@ function errorCallback(err: unknown) { } } +const workIsDone = () => parentPort?.postMessage('complete'); + /** * Function to slice an image into smaller segments */ @@ -133,19 +135,20 @@ function continueSlicing(image: Jimp, options: Options): void { console.log(`Slice saved: ${outputFilename}`); } } + + if (!isMainThread) { + workIsDone(); + } } // If used as a worker thread, get file name from message if (!isMainThread) { - const workIsDone = () => parentPort?.postMessage('complete'); - parentPort?.on( 'message', async (message: { filePath: string; options: Options }) => { const { filePath, options } = message; options.filename = filePath; sliceImage(options, true); - workIsDone(); }, ); } diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index 3c6c613..25a00bf 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -64,8 +64,8 @@ export class WorkerPool { path.join(__dirname, 'processImage.js'), ) as TWorker; - worker.postMessage({ filePath, options }); worker.isIdle = false; + worker.postMessage({ filePath, options }); // Listen for messages and errors from the worker worker.on('message', () => { @@ -93,6 +93,7 @@ export class WorkerPool { } else { const worker = this.workers.find((w) => w.isIdle); if (worker) { + worker.isIdle = false; worker.postMessage(nextTask); } else { // Something went wrong, there are no idle workers somehow diff --git a/utils/processImage.js b/utils/processImage.js index 1bab905..04357f0 100644 --- a/utils/processImage.js +++ b/utils/processImage.js @@ -10,6 +10,7 @@ function errorCallback(err) { console.error(err); } } +const workIsDone = () => worker_threads_1.parentPort?.postMessage('complete'); /** * Function to slice an image into smaller segments */ @@ -109,14 +110,15 @@ function continueSlicing(image, options) { console.log(`Slice saved: ${outputFilename}`); } } + if (!worker_threads_1.isMainThread) { + workIsDone(); + } } // If used as a worker thread, get file name from message if (!worker_threads_1.isMainThread) { - const workIsDone = () => worker_threads_1.parentPort?.postMessage('complete'); worker_threads_1.parentPort?.on('message', async (message) => { const { filePath, options } = message; options.filename = filePath; sliceImage(options, true); - workIsDone(); }); } diff --git a/utils/workerPool.js b/utils/workerPool.js index b6d628f..7c7616b 100644 --- a/utils/workerPool.js +++ b/utils/workerPool.js @@ -50,8 +50,8 @@ class WorkerPool { */ createWorker(filePath, options) { const worker = new worker_threads_1.Worker(path.join(__dirname, 'processImage.js')); - worker.postMessage({ filePath, options }); worker.isIdle = false; + worker.postMessage({ filePath, options }); // Listen for messages and errors from the worker worker.on('message', () => { worker.isIdle = true; @@ -76,6 +76,7 @@ class WorkerPool { else { const worker = this.workers.find((w) => w.isIdle); if (worker) { + worker.isIdle = false; worker.postMessage(nextTask); } else { From fcf722f8a48b935e98e82d9ca204ad4830318b0c Mon Sep 17 00:00:00 2001 From: thinknathan Date: Sun, 4 Feb 2024 17:11:36 -0800 Subject: [PATCH 3/5] Bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index e6d657c..697e437 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-image-slice", - "version": "2.2.1", + "version": "2.2.2", "description": "Slices an input image into segments according to specified width and height", "repository": { "type": "git", From dbab8a3cd4b6cb427026822236fdfbba185e1e30 Mon Sep 17 00:00:00 2001 From: thinknathan Date: Sun, 4 Feb 2024 17:38:25 -0800 Subject: [PATCH 4/5] Use write correctly --- src/utils/processImage.ts | 31 ++++++++++++++++++++++--------- utils/processImage.js | 28 +++++++++++++++++++++------- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/utils/processImage.ts b/src/utils/processImage.ts index b3aca74..fa3b66e 100644 --- a/src/utils/processImage.ts +++ b/src/utils/processImage.ts @@ -9,7 +9,10 @@ function errorCallback(err: unknown) { } } -const workIsDone = () => parentPort?.postMessage('complete'); +/** + * Called on a worker thread to signal current work is complete + */ +const workerIsDone = () => parentPort?.postMessage('complete'); /** * Function to slice an image into smaller segments @@ -77,6 +80,8 @@ function continueSlicing(image: Jimp, options: Options): void { // Calculate the number of slices in both dimensions const horizontalSlices = Math.ceil(imageWidth / width); const verticalSlices = Math.ceil(imageHeight / height); + const totalSlices = horizontalSlices * verticalSlices; + let savedSlices = 0; // Create a folder for output if it doesn't exist const outputFolder = 'output'; @@ -99,6 +104,14 @@ function continueSlicing(image: Jimp, options: Options): void { const baseFilename = path.basename(filename!, path.extname(filename!)); const outputFilename = `${outputFolder}/${baseFilename}_${x}_${y}.png`; + const finishedSavingFile = () => { + console.log(`Slice saved: ${outputFilename}`); + savedSlices++; + if (savedSlices === totalSlices && !isMainThread) { + workerIsDone(); + } + }; + if (canvasWidth || canvasHeight) { // Calculate canvas dimensions const finalCanvasWidth = canvasWidth || width; @@ -121,7 +134,10 @@ function continueSlicing(image: Jimp, options: Options): void { cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR, ); } - canvas.write(outputFilename, errorCallback); + canvas + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } else { if (scale !== 1) { slice.scale( @@ -129,16 +145,13 @@ function continueSlicing(image: Jimp, options: Options): void { cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR, ); } - slice.write(outputFilename, errorCallback); + slice + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } - - console.log(`Slice saved: ${outputFilename}`); } } - - if (!isMainThread) { - workIsDone(); - } } // If used as a worker thread, get file name from message diff --git a/utils/processImage.js b/utils/processImage.js index 04357f0..5f8c608 100644 --- a/utils/processImage.js +++ b/utils/processImage.js @@ -10,7 +10,10 @@ function errorCallback(err) { console.error(err); } } -const workIsDone = () => worker_threads_1.parentPort?.postMessage('complete'); +/** + * Called on a worker thread to signal current work is complete + */ +const workerIsDone = () => worker_threads_1.parentPort?.postMessage('complete'); /** * Function to slice an image into smaller segments */ @@ -70,6 +73,8 @@ function continueSlicing(image, options) { // Calculate the number of slices in both dimensions const horizontalSlices = Math.ceil(imageWidth / width); const verticalSlices = Math.ceil(imageHeight / height); + const totalSlices = horizontalSlices * verticalSlices; + let savedSlices = 0; // Create a folder for output if it doesn't exist const outputFolder = 'output'; if (!fs.existsSync(outputFolder)) { @@ -86,6 +91,13 @@ function continueSlicing(image, options) { // Incorporate the input filename into the output filename const baseFilename = path.basename(filename, path.extname(filename)); const outputFilename = `${outputFolder}/${baseFilename}_${x}_${y}.png`; + const finishedSavingFile = () => { + console.log(`Slice saved: ${outputFilename}`); + savedSlices++; + if (savedSlices === totalSlices && !worker_threads_1.isMainThread) { + workerIsDone(); + } + }; if (canvasWidth || canvasHeight) { // Calculate canvas dimensions const finalCanvasWidth = canvasWidth || width; @@ -99,20 +111,22 @@ function continueSlicing(image, options) { if (scale !== 1) { canvas.scale(scale, cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR); } - canvas.write(outputFilename, errorCallback); + canvas + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } else { if (scale !== 1) { slice.scale(scale, cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR); } - slice.write(outputFilename, errorCallback); + slice + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } - console.log(`Slice saved: ${outputFilename}`); } } - if (!worker_threads_1.isMainThread) { - workIsDone(); - } } // If used as a worker thread, get file name from message if (!worker_threads_1.isMainThread) { From 901f88ed00052eb6b1a66e9d80daf5cbe3b87dfe Mon Sep 17 00:00:00 2001 From: thinknathan Date: Sun, 4 Feb 2024 17:46:50 -0800 Subject: [PATCH 5/5] Remove time report for now --- slice.cjs | 4 ++-- src/slice.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/slice.cjs b/slice.cjs index 02587bb..d8632c7 100644 --- a/slice.cjs +++ b/slice.cjs @@ -6,7 +6,7 @@ const os = require('os'); const processImage_1 = require('./utils/processImage'); const processPath_1 = require('./utils/processPath'); async function main() { - console.time('Done in'); + // console.time('Done in'); // Parse command line arguments const options = await yargs(process.argv.slice(2)) .option('f', { @@ -116,6 +116,6 @@ async function main() { 'Error: Requires either `filename` or `folderPath`. Run `slice --help` for help.', ); } - console.timeEnd('Done in'); + // console.timeEnd('Done in'); } main(); diff --git a/src/slice.ts b/src/slice.ts index 34e5bfc..8f3ea56 100644 --- a/src/slice.ts +++ b/src/slice.ts @@ -7,7 +7,7 @@ import { sliceImage } from './utils/processImage'; import { processPath } from './utils/processPath'; async function main() { - console.time('Done in'); + // console.time('Done in'); // Parse command line arguments const options = (await yargs(process.argv.slice(2)) @@ -119,7 +119,7 @@ async function main() { 'Error: Requires either `filename` or `folderPath`. Run `slice --help` for help.', ); } - console.timeEnd('Done in'); + // console.timeEnd('Done in'); } main();