Skip to content

Commit 491f390

Browse files
theanarkhaduh95
authored andcommitted
worker: add cpuUsage for worker
PR-URL: #59177 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ilyas Shabi <ilyasshabi94@gmail.com>
1 parent 035da74 commit 491f390

File tree

9 files changed

+225
-1
lines changed

9 files changed

+225
-1
lines changed

doc/api/worker_threads.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,19 @@ added: v10.5.0
15961596
The `'online'` event is emitted when the worker thread has started executing
15971597
JavaScript code.
15981598

1599+
### `worker.cpuUsage([prev])`
1600+
1601+
<!-- YAML
1602+
added:
1603+
- REPLACEME
1604+
-->
1605+
1606+
* Returns: {Promise}
1607+
1608+
This method returns a `Promise` that will resolve to an object identical to [`process.threadCpuUsage()`][],
1609+
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
1610+
This methods allows the statistics to be observed from outside the actual thread.
1611+
15991612
### `worker.getHeapSnapshot([options])`
16001613

16011614
<!-- YAML
@@ -1949,6 +1962,7 @@ thread spawned will spawn another until the application crashes.
19491962
[`process.stderr`]: process.md#processstderr
19501963
[`process.stdin`]: process.md#processstdin
19511964
[`process.stdout`]: process.md#processstdout
1965+
[`process.threadCpuUsage()`]: process.md#processthreadcpuusagepreviousvalue
19521966
[`process.title`]: process.md#processtitle
19531967
[`require('node:worker_threads').isMainThread`]: #workerismainthread
19541968
[`require('node:worker_threads').parentPort.on('message')`]: #event-message

lib/internal/worker.js

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const {
88
Float64Array,
99
FunctionPrototypeBind,
1010
MathMax,
11+
NumberMAX_SAFE_INTEGER,
1112
ObjectEntries,
1213
Promise,
1314
PromiseResolve,
@@ -39,6 +40,7 @@ const {
3940
ERR_WORKER_INVALID_EXEC_ARGV,
4041
ERR_INVALID_ARG_TYPE,
4142
ERR_INVALID_ARG_VALUE,
43+
ERR_OPERATION_FAILED,
4244
} = errorCodes;
4345

4446
const workerIo = require('internal/worker/io');
@@ -59,7 +61,7 @@ const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker
5961
const { deserializeError } = require('internal/error_serdes');
6062
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6163
const { kEmptyObject, SymbolAsyncDispose } = require('internal/util');
62-
const { validateArray, validateString } = require('internal/validators');
64+
const { validateArray, validateString, validateObject, validateNumber } = require('internal/validators');
6365
const {
6466
throwIfBuildingSnapshot,
6567
} = require('internal/v8/startup_snapshot');
@@ -473,6 +475,37 @@ class Worker extends EventEmitter {
473475
};
474476
});
475477
}
478+
479+
cpuUsage(prev) {
480+
if (prev) {
481+
validateObject(prev, 'prev');
482+
validateNumber(prev.user, 'prev.user', 0, NumberMAX_SAFE_INTEGER);
483+
validateNumber(prev.system, 'prev.system', 0, NumberMAX_SAFE_INTEGER);
484+
}
485+
if (process.platform === 'sunos') {
486+
throw new ERR_OPERATION_FAILED('worker.cpuUsage() is not available on SunOS');
487+
}
488+
const taker = this[kHandle]?.cpuUsage();
489+
return new Promise((resolve, reject) => {
490+
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
491+
taker.ondone = (err, current) => {
492+
if (err !== null) {
493+
return reject(err);
494+
}
495+
if (prev) {
496+
resolve({
497+
user: current.user - prev.user,
498+
system: current.system - prev.system,
499+
});
500+
} else {
501+
resolve({
502+
user: current.user,
503+
system: current.system,
504+
});
505+
}
506+
};
507+
});
508+
}
476509
}
477510

478511
/**

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ namespace node {
7878
V(UDPWRAP) \
7979
V(SIGINTWATCHDOG) \
8080
V(WORKER) \
81+
V(WORKERCPUUSAGE) \
8182
V(WORKERHEAPSNAPSHOT) \
8283
V(WORKERHEAPSTATISTICS) \
8384
V(WRITEWRAP) \

src/env_properties.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@
461461
V(tcp_constructor_template, v8::FunctionTemplate) \
462462
V(tty_constructor_template, v8::FunctionTemplate) \
463463
V(write_wrap_template, v8::ObjectTemplate) \
464+
V(worker_cpu_usage_taker_template, v8::ObjectTemplate) \
464465
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
465466
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
466467
V(x509_constructor_template, v8::FunctionTemplate)

src/node_worker.cc

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ using v8::Isolate;
3232
using v8::Local;
3333
using v8::Locker;
3434
using v8::Maybe;
35+
using v8::Name;
3536
using v8::Null;
3637
using v8::Number;
3738
using v8::Object;
@@ -811,6 +812,81 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
811812
}
812813
}
813814

815+
class WorkerCpuUsageTaker : public AsyncWrap {
816+
public:
817+
WorkerCpuUsageTaker(Environment* env, Local<Object> obj)
818+
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERCPUUSAGE) {}
819+
820+
SET_NO_MEMORY_INFO()
821+
SET_MEMORY_INFO_NAME(WorkerCpuUsageTaker)
822+
SET_SELF_SIZE(WorkerCpuUsageTaker)
823+
};
824+
825+
void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
826+
Worker* w;
827+
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
828+
829+
Environment* env = w->env();
830+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
831+
Local<Object> wrap;
832+
if (!env->worker_cpu_usage_taker_template()
833+
->NewInstance(env->context())
834+
.ToLocal(&wrap)) {
835+
return;
836+
}
837+
838+
BaseObjectPtr<WorkerCpuUsageTaker> taker =
839+
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);
840+
841+
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
842+
env](Environment* worker_env) mutable {
843+
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
844+
int err = uv_getrusage_thread(cpu_usage_stats.get());
845+
846+
env->SetImmediateThreadsafe(
847+
[taker = std::move(taker),
848+
cpu_usage_stats = std::move(cpu_usage_stats),
849+
err = err](Environment* env) mutable {
850+
Isolate* isolate = env->isolate();
851+
HandleScope handle_scope(isolate);
852+
Context::Scope context_scope(env->context());
853+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
854+
855+
Local<Value> argv[] = {
856+
Null(isolate),
857+
Undefined(isolate),
858+
};
859+
860+
if (err) {
861+
argv[0] = UVException(
862+
isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
863+
} else {
864+
Local<Name> names[] = {
865+
FIXED_ONE_BYTE_STRING(isolate, "user"),
866+
FIXED_ONE_BYTE_STRING(isolate, "system"),
867+
};
868+
Local<Value> values[] = {
869+
Number::New(isolate,
870+
1e6 * cpu_usage_stats->ru_utime.tv_sec +
871+
cpu_usage_stats->ru_utime.tv_usec),
872+
Number::New(isolate,
873+
1e6 * cpu_usage_stats->ru_stime.tv_sec +
874+
cpu_usage_stats->ru_stime.tv_usec),
875+
};
876+
argv[1] = Object::New(
877+
isolate, Null(isolate), names, values, arraysize(names));
878+
}
879+
880+
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
881+
},
882+
CallbackFlags::kUnrefed);
883+
});
884+
885+
if (scheduled) {
886+
args.GetReturnValue().Set(wrap);
887+
}
888+
}
889+
814890
class WorkerHeapStatisticsTaker : public AsyncWrap {
815891
public:
816892
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
@@ -1102,6 +1178,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
11021178
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
11031179
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
11041180
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
1181+
SetProtoMethod(isolate, w, "cpuUsage", Worker::CpuUsage);
11051182

11061183
SetConstructorFunction(isolate, target, "Worker", w);
11071184
}
@@ -1134,6 +1211,19 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
11341211
wst->InstanceTemplate());
11351212
}
11361213

1214+
{
1215+
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
1216+
1217+
wst->InstanceTemplate()->SetInternalFieldCount(
1218+
WorkerCpuUsageTaker::kInternalFieldCount);
1219+
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
1220+
1221+
Local<String> wst_string =
1222+
FIXED_ONE_BYTE_STRING(isolate, "WorkerCpuUsageTaker");
1223+
wst->SetClassName(wst_string);
1224+
isolate_data->set_worker_cpu_usage_taker_template(wst->InstanceTemplate());
1225+
}
1226+
11371227
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
11381228
}
11391229

@@ -1200,6 +1290,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
12001290
registry->Register(Worker::LoopIdleTime);
12011291
registry->Register(Worker::LoopStartTime);
12021292
registry->Register(Worker::GetHeapStatistics);
1293+
registry->Register(Worker::CpuUsage);
12031294
}
12041295

12051296
} // anonymous namespace

src/node_worker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class Worker : public AsyncWrap {
8080
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
8181
static void GetHeapStatistics(
8282
const v8::FunctionCallbackInfo<v8::Value>& args);
83+
static void CpuUsage(const v8::FunctionCallbackInfo<v8::Value>& args);
8384

8485
private:
8586
bool CreateEnvMessagePort(Environment* env);
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { isSunOS } = require('../common');
4+
const assert = require('assert');
5+
const {
6+
Worker,
7+
} = require('worker_threads');
8+
9+
function validate(result) {
10+
assert.ok(typeof result == 'object' && result !== null);
11+
assert.ok(result.user >= 0);
12+
assert.ok(result.system >= 0);
13+
assert.ok(Number.isFinite(result.user));
14+
assert.ok(Number.isFinite(result.system));
15+
}
16+
17+
function check(worker) {
18+
[
19+
-1,
20+
1.1,
21+
NaN,
22+
undefined,
23+
{},
24+
[],
25+
null,
26+
function() {},
27+
Symbol(),
28+
true,
29+
Infinity,
30+
{ user: -1, system: 1 },
31+
{ user: 1, system: -1 },
32+
].forEach((value) => {
33+
try {
34+
worker.cpuUsage(value);
35+
} catch (e) {
36+
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
37+
}
38+
});
39+
}
40+
41+
const worker = new Worker(`
42+
const { parentPort } = require('worker_threads');
43+
parentPort.on('message', () => {});
44+
`, { eval: true });
45+
46+
// See test-process-threadCpuUsage-main-thread.js
47+
if (isSunOS) {
48+
assert.throws(
49+
() => worker.cpuUsage(),
50+
{
51+
code: 'ERR_OPERATION_FAILED',
52+
name: 'Error',
53+
message: 'Operation failed: worker.cpuUsage() is not available on SunOS'
54+
}
55+
);
56+
worker.terminate();
57+
} else {
58+
worker.on('online', common.mustCall(async () => {
59+
check(worker);
60+
61+
const prev = await worker.cpuUsage();
62+
validate(prev);
63+
64+
const curr = await worker.cpuUsage();
65+
validate(curr);
66+
67+
assert.ok(curr.user >= prev.user);
68+
assert.ok(curr.system >= prev.system);
69+
70+
const delta = await worker.cpuUsage(curr);
71+
validate(delta);
72+
73+
worker.terminate();
74+
}));
75+
76+
worker.once('exit', common.mustCall(async () => {
77+
await assert.rejects(worker.cpuUsage(), {
78+
code: 'ERR_WORKER_NOT_RUNNING'
79+
});
80+
}));
81+
}

test/sequential/test-async-wrap-getasyncid.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const { getSystemErrorName } = require('util');
6262
delete providers.SIGINTWATCHDOG;
6363
delete providers.WORKERHEAPSNAPSHOT;
6464
delete providers.WORKERHEAPSTATISTICS;
65+
delete providers.WORKERCPUUSAGE;
6566
delete providers.BLOBREADER;
6667
delete providers.RANDOMPRIMEREQUEST;
6768
delete providers.CHECKPRIMEREQUEST;

typings/internalBinding/worker.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ declare namespace InternalWorkerBinding {
1616
getResourceLimits(): Float64Array;
1717
takeHeapSnapshot(): object;
1818
getHeapStatistics(): Promise<object>;
19+
cpuUsage(): Promise<object>;
1920
loopIdleTime(): number;
2021
loopStartTime(): number;
2122
}

0 commit comments

Comments
 (0)