Skip to content

Commit

Permalink
Merge 37b5bc0 into fbd9a9f
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias-Keller committed Feb 21, 2021
2 parents fbd9a9f + 37b5bc0 commit a3defee
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ $env:DEBUG='puppeteer-cluster:*';node examples/minimal
* [cluster.task(taskFunction)](#clustertasktaskfunction)
* [cluster.queue([data] [, taskFunction])](#clusterqueuedata--taskfunction)
* [cluster.execute([data] [, taskFunction])](#clusterexecutedata--taskfunction)
* [cluster.getMonitorObject()](#clustergetmonitorobject)
* [cluster.idle()](#clusteridle)
* [cluster.close()](#clusterclose)

Expand Down Expand Up @@ -172,6 +173,7 @@ Emitted when a task is queued via [Cluster.queue] or [Cluster.execute]. The firs
- `skipDuplicateUrls` <[boolean]> If set to `true`, will skip URLs which were already crawled by the cluster. Defaults to `false`. If you use this field, the queued `data` must be your URL or `data` must be an object containing a field called `url`.
- `timeout` <[number]> Specify a timeout for all tasks. Defaults to `30000` (30 seconds).
- `monitor` <[boolean]> If set to `true`, will provide a small command line output to provide information about the crawling process. Defaults to `false`.
- `monitor` <[boolean]> If set to `true`, will provide a method that provides an object with information about the crawling process. Default `false`.
- `workerCreationDelay` <[number]> Time between creation of two workers. Set this to a value like `100` (0.1 seconds) in case you want some time to pass before another worker is created. You can use this to prevent a network peak right at the start. Defaults to `0` (no delay).
- `puppeteer` <[Object]> In case you want to use a different puppeteer library (like [puppeteer-core](https://github.com/GoogleChrome/puppeteer/blob/master/docs/api.md#puppeteer-vs-puppeteer-core) or [puppeteer-extra](https://github.com/berstend/puppeteer-extra)), pass the object here. If not set, will default to using puppeteer. When using `puppeteer-core`, make sure to also provide `puppeteerOptions.executablePath`.
- returns: <[Promise]<[Cluster]>>
Expand Down Expand Up @@ -212,6 +214,10 @@ Be aware that this function only returns a Promise for backward compatibility re

Works like [Cluster.queue], but this function returns a Promise which will be resolved after the task is executed. That means, that the job is still queued, but the script will wait for it to be finished. In case an error happens during the execution, this function will reject the Promise with the thrown error. There will be no "taskerror" event fired. In addition, tasks queued via execute will ignore "retryLimit" and "retryDelay". For an example see the [Execute example](examples/execute.js).

### cluster.getMonitorObject()
- if in the cluster options the option `monitorObject` is set to `true` this method will provide an object with all information of the `monitor` functionality. Maybe used for API-Calls or other use cases.
- returns: object

#### cluster.idle()
- returns: <[Promise]>

Expand Down
81 changes: 80 additions & 1 deletion src/Cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ interface ClusterOptions {
puppeteerOptions: LaunchOptions;
perBrowserOptions: LaunchOptions[] | undefined;
monitor: boolean;
monitorObject: boolean;
timeout: number;
retryLimit: number;
retryDelay: number;
Expand All @@ -30,6 +31,21 @@ interface ClusterOptions {
puppeteer: any;
}

interface MonitorObject {
now: number;
timeDiff: number;
doneTargets: number;
donePercentage: string;
errorPerc: string;
timeRunning: string;
timeRemaining: string;
cpuUsage : string;
memoryUsage: string;
pagesPerSecond: string;
workerCount: number;
worker: string[];
}

type Partial<T> = {
[P in keyof T]?: T[P];
};
Expand All @@ -45,6 +61,7 @@ const DEFAULT_OPTIONS: ClusterOptions = {
},
perBrowserOptions: undefined,
monitor: false,
monitorObject: false,
timeout: 30 * 1000,
retryLimit: 0,
retryDelay: 0,
Expand Down Expand Up @@ -168,7 +185,7 @@ export default class Cluster<JobData = any, ReturnData = any> extends EventEmitt
throw new Error(`Unable to launch browser, error message: ${err.message}`);
}

if (this.options.monitor) {
if (this.options.monitor || this.options.monitorObject) {
await this.systemMonitor.init();
}

Expand Down Expand Up @@ -472,6 +489,68 @@ export default class Cluster<JobData = any, ReturnData = any> extends EventEmitt
debug('Closed');
}

public getMonitorObject(): MonitorObject {
const monitorObject: MonitorObject = {
now: Date.now(),
timeDiff: 0,
doneTargets: 0,
donePercentage: '',
errorPerc: '',
timeRunning: '',
timeRemaining: '',
cpuUsage : '',
memoryUsage: '',
pagesPerSecond: '',
workerCount: this.workers.length + this.workersStarting,
worker: [] as string[],
};

if (!this.options.monitorObject) return monitorObject;

monitorObject.timeDiff = monitorObject.now - this.startTime;
monitorObject.doneTargets = this.allTargetCount - this.jobQueue.size() -
this.workersBusy.length;
const donePercentage = this.allTargetCount === 0
? 1 : (monitorObject.doneTargets / this.allTargetCount);

monitorObject.donePercentage = (100 * donePercentage).toFixed(2);
monitorObject.errorPerc = monitorObject.doneTargets === 0 ?
'0.00' : (100 * this.errorCount / monitorObject.doneTargets).toFixed(2);

monitorObject.timeRunning = util.formatDuration(monitorObject.timeDiff);

let timeRemainingMillis = -1;
if (donePercentage !== 0) {
timeRemainingMillis = ((monitorObject.timeDiff) / donePercentage)
- monitorObject.timeDiff;
}
monitorObject.timeRemaining = util.formatDuration(timeRemainingMillis);

monitorObject.cpuUsage = this.systemMonitor.getCpuUsage().toFixed(1);
monitorObject.memoryUsage = this.systemMonitor.getMemoryUsage().toFixed(1);

monitorObject.pagesPerSecond = monitorObject.doneTargets === 0 ?
'0' : (monitorObject.doneTargets * 1000 / monitorObject.timeDiff).toFixed(2);

this.workers.forEach((worker, i) => {
const isIdle = this.workersAvail.indexOf(worker) !== -1;
let workOrIdle;
let workerUrl = '';
if (isIdle) {
workOrIdle = 'IDLE';
} else {
workOrIdle = 'WORK';
if (worker.activeTarget) {
workerUrl = worker.activeTarget.getUrl() || 'UNKNOWN TARGET';
} else {
workerUrl = 'NO TARGET (should not be happening)';
}
}
monitorObject.worker.push(`#${i} ${workOrIdle} ${workerUrl}`);
});
return monitorObject;
}

private monitor(): void {
if (!this.display) {
this.display = new Display();
Expand Down
29 changes: 29 additions & 0 deletions test/Cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,35 @@ describe('options', () => {
await cluster.idle();
await cluster.close();
});

test('monitorObject enabled', async () => {
const cluster = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_CONTEXT,
puppeteerOptions: { args: ['--no-sandbox'] },
maxConcurrency: 1,
monitor: false,
monitorObject: true,
});
cluster.on('taskerror', (err) => {
throw err;
});

cluster.task(async () => {
await new Promise(resolve => setTimeout(resolve, 550));
});

cluster.queue(TEST_URL);

// there should be at least one logging call in a 500ms interval
output = '';
await new Promise(resolve => setTimeout(resolve, 510));
const monitorObject = cluster.getMonitorObject();
expect(monitorObject.workerCount).toBeGreaterThan(0);
expect(monitorObject.cpuUsage.length).toBeGreaterThan(0);

await cluster.idle();
await cluster.close();
});
});

});
Expand Down

0 comments on commit a3defee

Please sign in to comment.