diff --git a/package.json b/package.json index e6d657c..697e437 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-image-slice", - "version": "2.2.1", + "version": "2.2.2", "description": "Slices an input image into segments according to specified width and height", "repository": { "type": "git", diff --git a/slice.cjs b/slice.cjs index fa8cb6e..d8632c7 100644 --- a/slice.cjs +++ b/slice.cjs @@ -5,9 +5,10 @@ const yargs = require('yargs'); const os = require('os'); const processImage_1 = require('./utils/processImage'); const processPath_1 = require('./utils/processPath'); -function main() { +async function main() { + // console.time('Done in'); // Parse command line arguments - const options = yargs + const options = await yargs(process.argv.slice(2)) .option('f', { alias: 'filename', describe: 'Input image filename', @@ -94,7 +95,8 @@ function main() { 'Uses bicubic interpolation instead of nearest neighbour if rescaling', type: 'boolean', default: false, - }).argv; + }) + .parse(); if (options.filename) { // Process a single image (0, processImage_1.sliceImage)(options); @@ -107,12 +109,13 @@ function main() { console.error(err); } numCores = Math.max(numCores - 1, 1); // Min 1 - numCores = Math.min(numCores, 16); // Max 16 - (0, processPath_1.processPath)(options.folderPath, options, numCores); + numCores = Math.min(numCores, 32); // Max 32 + await (0, processPath_1.processPath)(options.folderPath, options, numCores); } else { console.error( 'Error: Requires either `filename` or `folderPath`. Run `slice --help` for help.', ); } + // console.timeEnd('Done in'); } main(); diff --git a/src/slice.ts b/src/slice.ts index d368427..8f3ea56 100644 --- a/src/slice.ts +++ b/src/slice.ts @@ -6,9 +6,11 @@ import * as os from 'os'; import { sliceImage } from './utils/processImage'; import { processPath } from './utils/processPath'; -function main() { +async function main() { + // console.time('Done in'); + // Parse command line arguments - const options = yargs + const options = (await yargs(process.argv.slice(2)) .option('f', { alias: 'filename', describe: 'Input image filename', @@ -95,7 +97,8 @@ function main() { 'Uses bicubic interpolation instead of nearest neighbour if rescaling', type: 'boolean', default: false, - }).argv as unknown as Options; + }) + .parse()) as unknown as Options; if (options.filename) { // Process a single image @@ -109,13 +112,14 @@ function main() { console.error(err); } numCores = Math.max(numCores - 1, 1); // Min 1 - numCores = Math.min(numCores, 16); // Max 16 - processPath(options.folderPath, options, numCores); + numCores = Math.min(numCores, 32); // Max 32 + await processPath(options.folderPath, options, numCores); } else { console.error( 'Error: Requires either `filename` or `folderPath`. Run `slice --help` for help.', ); } + // console.timeEnd('Done in'); } main(); diff --git a/src/utils/processImage.ts b/src/utils/processImage.ts index 724804c..fa3b66e 100644 --- a/src/utils/processImage.ts +++ b/src/utils/processImage.ts @@ -1,7 +1,7 @@ import * as Jimp from 'jimp'; import * as fs from 'fs'; import * as path from 'path'; -import { workerData, isMainThread } from 'worker_threads'; +import { parentPort, isMainThread } from 'worker_threads'; function errorCallback(err: unknown) { if (err) { @@ -9,11 +9,15 @@ function errorCallback(err: unknown) { } } +/** + * Called on a worker thread to signal current work is complete + */ +const workerIsDone = () => parentPort?.postMessage('complete'); + /** * Function to slice an image into smaller segments */ export function sliceImage(options: Options, skipExtCheck?: boolean): void { - console.time('Done in'); const { filename } = options; Jimp.read(filename!) .then((image) => { @@ -76,6 +80,8 @@ function continueSlicing(image: Jimp, options: Options): void { // Calculate the number of slices in both dimensions const horizontalSlices = Math.ceil(imageWidth / width); const verticalSlices = Math.ceil(imageHeight / height); + const totalSlices = horizontalSlices * verticalSlices; + let savedSlices = 0; // Create a folder for output if it doesn't exist const outputFolder = 'output'; @@ -98,6 +104,14 @@ function continueSlicing(image: Jimp, options: Options): void { const baseFilename = path.basename(filename!, path.extname(filename!)); const outputFilename = `${outputFolder}/${baseFilename}_${x}_${y}.png`; + const finishedSavingFile = () => { + console.log(`Slice saved: ${outputFilename}`); + savedSlices++; + if (savedSlices === totalSlices && !isMainThread) { + workerIsDone(); + } + }; + if (canvasWidth || canvasHeight) { // Calculate canvas dimensions const finalCanvasWidth = canvasWidth || width; @@ -120,7 +134,10 @@ function continueSlicing(image: Jimp, options: Options): void { cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR, ); } - canvas.write(outputFilename, errorCallback); + canvas + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } else { if (scale !== 1) { slice.scale( @@ -128,19 +145,23 @@ function continueSlicing(image: Jimp, options: Options): void { cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR, ); } - slice.write(outputFilename, errorCallback); + slice + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } - - console.log(`Slice saved: ${outputFilename}`); } } - console.timeEnd('Done in'); } // If used as a worker thread, get file name from message if (!isMainThread) { - const { filePath, options } = workerData; - options.filename = filePath; - - sliceImage(options, true); + parentPort?.on( + 'message', + async (message: { filePath: string; options: Options }) => { + const { filePath, options } = message; + options.filename = filePath; + sliceImage(options, true); + }, + ); } diff --git a/src/utils/processPath.ts b/src/utils/processPath.ts index 7772b18..37a2c90 100644 --- a/src/utils/processPath.ts +++ b/src/utils/processPath.ts @@ -9,7 +9,7 @@ export async function processPath( directoryPath: string, options: Options, maxWorkers: number, -): Promise { +): Promise { const workerPool = new WorkerPool(maxWorkers); try { @@ -24,10 +24,13 @@ export async function processPath( workerPool.addTask(filePath, options); } } - - // Wait for all tasks to complete before exiting - workerPool.waitForCompletion(); } catch (err) { console.error(`Error reading directory: ${directoryPath}`, err); } + + await workerPool.allComplete(); + + workerPool.exitAll(); + + return true; } diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index f7b844c..25a00bf 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -1,13 +1,48 @@ import { Worker } from 'worker_threads'; import * as path from 'path'; +type TWorker = Worker & { isIdle: boolean }; + /** * Manages a pool of worker threads for parallel processing of image files. */ export class WorkerPool { - private workers: Worker[] = []; + private workers: TWorker[] = []; private taskQueue: { filePath: string; options: Options }[] = []; private maxWorkers: number; + private completePromise?: Promise; + private completeResolve?: () => void; + private isComplete(): boolean { + return ( + this.taskQueue.length === 0 && + this.workers.every((worker) => worker.isIdle) + ); + } + + /** + * Terminate all workers in the pool. + */ + public exitAll(): void { + this.workers.forEach((worker) => worker.terminate()); + this.workers = []; + } + + /** + * Returns a promise that resolves when all work is done. + */ + public async allComplete(): Promise { + if (this.isComplete()) { + return Promise.resolve(); + } + + if (!this.completePromise) { + this.completePromise = new Promise((resolve) => { + this.completeResolve = resolve; + }); + } + + return this.completePromise; + } /** * Creates a new WorkerPool instance. @@ -25,18 +60,22 @@ export class WorkerPool { * @param options - Image processing options for the file. */ private createWorker(filePath: string, options: Options): void { - const worker = new Worker(path.join(__dirname, 'processImage.js'), { - workerData: { filePath, options }, - }); + const worker = new Worker( + path.join(__dirname, 'processImage.js'), + ) as TWorker; + + worker.isIdle = false; + worker.postMessage({ filePath, options }); // Listen for messages and errors from the worker - worker.on('message', (message) => { - console.log(message); + worker.on('message', () => { + worker.isIdle = true; this.processNextTask(); }); worker.on('error', (err) => { console.error(`Error in worker for file ${filePath}:`, err); + worker.isIdle = true; this.processNextTask(); }); @@ -49,7 +88,22 @@ export class WorkerPool { private processNextTask(): void { const nextTask = this.taskQueue.shift(); if (nextTask) { - this.createWorker(nextTask.filePath, nextTask.options); + if (this.workers.length < this.maxWorkers) { + this.createWorker(nextTask.filePath, nextTask.options); + } else { + const worker = this.workers.find((w) => w.isIdle); + if (worker) { + worker.isIdle = false; + worker.postMessage(nextTask); + } else { + // Something went wrong, there are no idle workers somehow + throw Error('Could not find an idle worker.'); + } + } + } else if (this.isComplete() && this.completeResolve) { + this.completeResolve(); + this.completePromise = undefined; + this.completeResolve = undefined; } } @@ -66,15 +120,4 @@ export class WorkerPool { this.taskQueue.push({ filePath, options }); } } - - /** - * Waits for all tasks to complete before exiting. - */ - public waitForCompletion(): void { - this.workers.forEach((worker) => { - worker.on('exit', () => { - this.processNextTask(); - }); - }); - } } diff --git a/utils/processImage.js b/utils/processImage.js index 0941c82..5f8c608 100644 --- a/utils/processImage.js +++ b/utils/processImage.js @@ -10,11 +10,14 @@ function errorCallback(err) { console.error(err); } } +/** + * Called on a worker thread to signal current work is complete + */ +const workerIsDone = () => worker_threads_1.parentPort?.postMessage('complete'); /** * Function to slice an image into smaller segments */ function sliceImage(options, skipExtCheck) { - console.time('Done in'); const { filename } = options; Jimp.read(filename) .then((image) => { @@ -70,6 +73,8 @@ function continueSlicing(image, options) { // Calculate the number of slices in both dimensions const horizontalSlices = Math.ceil(imageWidth / width); const verticalSlices = Math.ceil(imageHeight / height); + const totalSlices = horizontalSlices * verticalSlices; + let savedSlices = 0; // Create a folder for output if it doesn't exist const outputFolder = 'output'; if (!fs.existsSync(outputFolder)) { @@ -86,6 +91,13 @@ function continueSlicing(image, options) { // Incorporate the input filename into the output filename const baseFilename = path.basename(filename, path.extname(filename)); const outputFilename = `${outputFolder}/${baseFilename}_${x}_${y}.png`; + const finishedSavingFile = () => { + console.log(`Slice saved: ${outputFilename}`); + savedSlices++; + if (savedSlices === totalSlices && !worker_threads_1.isMainThread) { + workerIsDone(); + } + }; if (canvasWidth || canvasHeight) { // Calculate canvas dimensions const finalCanvasWidth = canvasWidth || width; @@ -99,22 +111,28 @@ function continueSlicing(image, options) { if (scale !== 1) { canvas.scale(scale, cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR); } - canvas.write(outputFilename, errorCallback); + canvas + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } else { if (scale !== 1) { slice.scale(scale, cubic ? Jimp.RESIZE_BICUBIC : Jimp.RESIZE_NEAREST_NEIGHBOR); } - slice.write(outputFilename, errorCallback); + slice + .writeAsync(outputFilename) + .then(finishedSavingFile) + .catch(errorCallback); } - console.log(`Slice saved: ${outputFilename}`); } } - console.timeEnd('Done in'); } // If used as a worker thread, get file name from message if (!worker_threads_1.isMainThread) { - const { filePath, options } = worker_threads_1.workerData; - options.filename = filePath; - sliceImage(options, true); + worker_threads_1.parentPort?.on('message', async (message) => { + const { filePath, options } = message; + options.filename = filePath; + sliceImage(options, true); + }); } diff --git a/utils/processPath.js b/utils/processPath.js index e62b879..950e255 100644 --- a/utils/processPath.js +++ b/utils/processPath.js @@ -19,11 +19,12 @@ async function processPath(directoryPath, options, maxWorkers) { workerPool.addTask(filePath, options); } } - // Wait for all tasks to complete before exiting - workerPool.waitForCompletion(); } catch (err) { console.error(`Error reading directory: ${directoryPath}`, err); } + await workerPool.allComplete(); + workerPool.exitAll(); + return true; } exports.processPath = processPath; diff --git a/utils/workerPool.js b/utils/workerPool.js index 508f5f0..7c7616b 100644 --- a/utils/workerPool.js +++ b/utils/workerPool.js @@ -7,6 +7,31 @@ const path = require("path"); * Manages a pool of worker threads for parallel processing of image files. */ class WorkerPool { + isComplete() { + return (this.taskQueue.length === 0 && + this.workers.every((worker) => worker.isIdle)); + } + /** + * Terminate all workers in the pool. + */ + exitAll() { + this.workers.forEach((worker) => worker.terminate()); + this.workers = []; + } + /** + * Returns a promise that resolves when all work is done. + */ + async allComplete() { + if (this.isComplete()) { + return Promise.resolve(); + } + if (!this.completePromise) { + this.completePromise = new Promise((resolve) => { + this.completeResolve = resolve; + }); + } + return this.completePromise; + } /** * Creates a new WorkerPool instance. * @@ -24,16 +49,17 @@ class WorkerPool { * @param options - Image processing options for the file. */ createWorker(filePath, options) { - const worker = new worker_threads_1.Worker(path.join(__dirname, 'processImage.js'), { - workerData: { filePath, options }, - }); + const worker = new worker_threads_1.Worker(path.join(__dirname, 'processImage.js')); + worker.isIdle = false; + worker.postMessage({ filePath, options }); // Listen for messages and errors from the worker - worker.on('message', (message) => { - console.log(message); + worker.on('message', () => { + worker.isIdle = true; this.processNextTask(); }); worker.on('error', (err) => { console.error(`Error in worker for file ${filePath}:`, err); + worker.isIdle = true; this.processNextTask(); }); this.workers.push(worker); @@ -44,7 +70,25 @@ class WorkerPool { processNextTask() { const nextTask = this.taskQueue.shift(); if (nextTask) { - this.createWorker(nextTask.filePath, nextTask.options); + if (this.workers.length < this.maxWorkers) { + this.createWorker(nextTask.filePath, nextTask.options); + } + else { + const worker = this.workers.find((w) => w.isIdle); + if (worker) { + worker.isIdle = false; + worker.postMessage(nextTask); + } + else { + // Something went wrong, there are no idle workers somehow + throw Error('Could not find an idle worker.'); + } + } + } + else if (this.isComplete() && this.completeResolve) { + this.completeResolve(); + this.completePromise = undefined; + this.completeResolve = undefined; } } /** @@ -61,15 +105,5 @@ class WorkerPool { this.taskQueue.push({ filePath, options }); } } - /** - * Waits for all tasks to complete before exiting. - */ - waitForCompletion() { - this.workers.forEach((worker) => { - worker.on('exit', () => { - this.processNextTask(); - }); - }); - } } exports.WorkerPool = WorkerPool;