diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index 9d2d805bf25052..4a465e20ad53b9 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -4,9 +4,12 @@ const { ArrayPrototypeIndexOf, ArrayPrototypePush, ArrayPrototypeSplice, + FunctionPrototypeBind, ObjectCreate, ObjectGetPrototypeOf, ObjectSetPrototypeOf, + PromisePrototypeThen, + ReflectApply, SymbolHasInstance, } = primordials; @@ -23,11 +26,35 @@ const { triggerUncaughtException } = internalBinding('errors'); const { WeakReference } = internalBinding('util'); +function decRef(channel) { + channel._weak.decRef(); + if (channels[channel.name].getRef() === 0) { + delete channels[channel.name]; + } +} + +function markActive(channel) { + ObjectSetPrototypeOf(channel, ActiveChannel.prototype); + channel._subscribers = []; + channel._stores = new Map(); +} + +function maybeMarkInactive(channel) { + // When there are no more active subscribers, restore to fast prototype. + if (!channel._subscribers.length && !channel._stores.size) { + // eslint-disable-next-line no-use-before-define + ObjectSetPrototypeOf(channel, Channel.prototype); + channel._subscribers = undefined; + channel._stores = undefined; + } +} + // TODO(qard): should there be a C++ channel interface? class ActiveChannel { subscribe(subscription) { validateFunction(subscription, 'subscription'); ArrayPrototypePush(this._subscribers, subscription); + this._weak.incRef(); } unsubscribe(subscription) { @@ -36,11 +63,29 @@ class ActiveChannel { ArrayPrototypeSplice(this._subscribers, index, 1); - // When there are no more active subscribers, restore to fast prototype. - if (!this._subscribers.length) { - // eslint-disable-next-line no-use-before-define - ObjectSetPrototypeOf(this, Channel.prototype); + decRef(this); + maybeMarkInactive(this); + + return true; + } + + bindStore(store, transform) { + const replacing = this._stores.has(store); + this._stores.set(store, transform); + if (!replacing) { + this._weak.incRef(); } + } + + unbindStore(store) { + if (!this._stores.has(store)) { + return false; + } + + this._stores.delete(store); + + decRef(this); + maybeMarkInactive(this); return true; } @@ -61,11 +106,26 @@ class ActiveChannel { } } } + + runStores(data, fn, thisArg, ...args) { + this.publish(data); + + // Bind base fn first due to AsyncLocalStorage.run not having thisArg + fn = FunctionPrototypeBind(fn, thisArg, ...args); + + for (const [ store, transform ] of this._stores.entries()) { + fn = wrapStoreRun(store, data, fn, transform); + } + + return fn(); + } } class Channel { constructor(name) { this._subscribers = undefined; + this._stores = undefined; + this._weak = undefined; this.name = name; } @@ -76,8 +136,7 @@ class Channel { } subscribe(subscription) { - ObjectSetPrototypeOf(this, ActiveChannel.prototype); - this._subscribers = []; + markActive(this); this.subscribe(subscription); } @@ -85,11 +144,28 @@ class Channel { return false; } + bindStore(store, transform = (v) => v) { + markActive(this); + this.bindStore(store, transform); + } + + unbindStore() { + return false; + } + get hasSubscribers() { return false; } publish() {} + + runStores(data, fn, thisArg, ...args) { + return ReflectApply(fn, thisArg, args) + } +} + +function wrapStoreRun(store, data, next, transform = (v) => v) { + return () => store.run(transform(data), next); } const channels = ObjectCreate(null); @@ -105,27 +181,17 @@ function channel(name) { } channel = new Channel(name); - channels[name] = new WeakReference(channel); + channel._weak = new WeakReference(channel); + channels[name] = channel._weak; return channel; } function subscribe(name, subscription) { - const chan = channel(name); - channels[name].incRef(); - chan.subscribe(subscription); + return channel(name).subscribe(subscription); } function unsubscribe(name, subscription) { - const chan = channel(name); - if (!chan.unsubscribe(subscription)) { - return false; - } - - channels[name].decRef(); - if (channels[name].getRef() === 0) { - delete channels[name]; - } - return true; + return channel(name).unsubscribe(subscription); } function hasSubscribers(name) { @@ -139,10 +205,172 @@ function hasSubscribers(name) { return channel.hasSubscribers; } +function traceSync(channels, fn, ctx, thisArg, ...args) { + const { start, end, error } = channels; + + try { + const result = start.runStores(ctx, fn, thisArg, ...args); + ctx.result = result; + return result; + } catch (err) { + ctx.error = err; + error.publish(ctx); + throw err; + } finally { + end.publish(ctx); + } +} + +function traceCallback(channels, fn, position, ctx, thisArg, ...args) { + const { start, end, asyncEnd, error } = channels; + + function wrap (fn) { + return function wrappedCallback (err, res) { + if (err) { + ctx.error = err; + error.publish(ctx); + } else { + ctx.result = res; + } + + asyncEnd.publish(ctx); + if (fn) { + return ReflectApply(fn, this, arguments); + } + } + } + + if (position >= 0) { + args.splice(position, 1, wrap(args.at(position))); + } + + try { + return start.runStores(ctx, fn, thisArg, ...args); + } catch (err) { + ctx.error = err; + error.publish(ctx); + throw err; + } finally { + end.publish(ctx); + } +} + +function tracePromise(channels, fn, ctx, thisArg, ...args) { + const { asyncEnd, start, end, error } = channels; + + function reject(err) { + ctx.error = err; + error.publish(ctx); + asyncEnd.publish(ctx); + return Promise.reject(err); + } + + function resolve(result) { + ctx.result = result; + asyncEnd.publish(ctx); + return result; + } + + try { + const promise = start.runStores(ctx, fn, thisArg, ...args); + return PromisePrototypeThen(promise, resolve, reject); + } catch (err) { + ctx.error = err; + error.publish(ctx); + throw err; + } finally { + end.publish(ctx); + } +} + +class TracingChannel { + constructor(name) { + this.name = name; + this.channels = { + start: new Channel(`tracing:${name}:start`), + end: new Channel(`tracing:${name}:end`), + asyncEnd: new Channel(`tracing:${name}:asyncEnd`), + error: new Channel(`tracing:${name}:error`) + }; + } + + // Attach WeakReference to all the sub-channels so the liveness management + // in subscribe/unsubscribe keeps the TracingChannel the sub-channels are + // attached to alive. + set _weak(weak) { + for (const key in this.channels) { + this.channels[key]._weak = weak; + } + } + + get hasSubscribers() { + for (const key in this.channels) { + if (this.channels[key].hasSubscribers) { + return true; + } + } + return false; + } + + subscribe(handlers) { + for (const key in handlers) { + this.channels[key]?.subscribe(handlers[key]); + } + } + + unsubscribe(handlers) { + let done = true; + for (const key in handlers) { + const channel = this.channels[key]; + if (channel instanceof Channel && !channel.unsubscribe(handlers[key])) { + done = false; + } + } + return done; + } + + traceSync(fn, ctx = {}, thisArg, ...args) { + if (!this.hasSubscribers) return ReflectApply(fn, thisArg, args); + return traceSync(this.channels, fn, ctx, thisArg, ...args); + } + + tracePromise(fn, ctx = {}, thisArg, ...args) { + if (!this.hasSubscribers) return ReflectApply(fn, thisArg, args); + return tracePromise(this.channels, fn, ctx, thisArg, ...args); + } + + traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) { + if (!this.hasSubscribers) return ReflectApply(fn, thisArg, args); + return traceCallback(this.channels, fn, position, ctx, thisArg, ...args); + } +} + +const tracingChannels = ObjectCreate(null); + +function tracingChannel(name) { + let channel; + const ref = tracingChannels[name]; + if (ref) channel = ref.get(); + if (channel) return channel; + + if (typeof name !== 'string' && typeof name !== 'symbol') { + throw new ERR_INVALID_ARG_TYPE('tracingChannel', ['string', 'symbol'], name); + } + + channel = new TracingChannel(name); + channel._weak = new WeakReference(channel); + tracingChannels[name] = channel._weak; + return channel; +} + module.exports = { channel, hasSubscribers, subscribe, + tracingChannel, + traceSync, + traceCallback, + tracePromise, unsubscribe, Channel }; diff --git a/test/parallel/test-diagnostics-channel-bind-store.js b/test/parallel/test-diagnostics-channel-bind-store.js new file mode 100644 index 00000000000000..5ecb553e4fb061 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-bind-store.js @@ -0,0 +1,67 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const { AsyncLocalStorage } = require('async_hooks'); + +let n = 0; +const thisArg = new Date(); +const inputs = [ + { foo: 'bar' }, + { baz: 'buz' } +]; + +const channel = dc.channel('test'); + +// Bind a storage directly to published data +const store1 = new AsyncLocalStorage(); +channel.bindStore(store1); + +// Bind a store with transformation of published data +const store2 = new AsyncLocalStorage(); +channel.bindStore(store2, common.mustCall((data) => { + assert.deepStrictEqual(data, inputs[n]); + return { data }; +}, 2)); + +// Regular subscribers should see publishes from runStores calls +channel.subscribe(common.mustCall((data) => { + assert.deepStrictEqual(data, inputs[n]); +}, 2)); + +// Verify stores are empty before run +assert.deepStrictEqual(store1.getStore(), undefined); +assert.deepStrictEqual(store2.getStore(), undefined); + +channel.runStores(inputs[n], common.mustCall(function (a, b) { + // Verify this and argument forwarding + assert.deepStrictEqual(this, thisArg); + assert.strictEqual(a, 1); + assert.strictEqual(b, 2); + + // Verify store 1 state matches input + assert.deepStrictEqual(store1.getStore(), inputs[n]); + + // Verify store 2 state has expected transformation + assert.deepStrictEqual(store2.getStore(), { data: inputs[n] }); +}), thisArg, 1, 2); + +// Verify stores are empty after run +assert.deepStrictEqual(store1.getStore(), undefined); +assert.deepStrictEqual(store2.getStore(), undefined); + +// Verify unbinding works +assert.ok(channel.unbindStore(store1)); + +// Verify unbinding a store that is not bound returns false +assert.ok(!channel.unbindStore(store1)); + +n++ +channel.runStores(inputs[n], common.mustCall(() => { + // Verify after unbinding store 1 will remain undefined + assert.deepStrictEqual(store1.getStore(), undefined); + + // Verify still bound store 2 receives expected data + assert.deepStrictEqual(store2.getStore(), { data: inputs[n] }); +})); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-async-error.js b/test/parallel/test-diagnostics-channel-tracing-channel-async-error.js new file mode 100644 index 00000000000000..97df6e79e3dc16 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-async-error.js @@ -0,0 +1,45 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedError = new Error('test'); +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check, 2), + end: common.mustCall(check, 2), + asyncEnd: common.mustCall(check, 2), + error: common.mustCall((found) => { + check(found); + assert.deepStrictEqual(found.error, expectedError); + }, 2) +}; + +channel.subscribe(handlers); + +channel.traceCallback(function (cb, err) { + assert.deepStrictEqual(this, thisArg); + setImmediate(cb, err); +}, 0, input, thisArg, common.mustCall((err, res) => { + assert.strictEqual(err, expectedError); + assert.deepStrictEqual(res, undefined); +}), expectedError); + +channel.tracePromise(function (value) { + assert.deepStrictEqual(this, thisArg); + return Promise.reject(value); +}, input, thisArg, expectedError).then( + common.mustNotCall(), + common.mustCall(value => { + assert.deepStrictEqual(value, expectedError); + }) +); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-async.js b/test/parallel/test-diagnostics-channel-tracing-channel-async.js new file mode 100644 index 00000000000000..b717ebf9c9e243 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-async.js @@ -0,0 +1,46 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedResult = { foo: 'bar' }; +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check, 2), + end: common.mustCall(check, 2), + asyncEnd: common.mustCall((found) => { + check(found); + assert.strictEqual(found.error, undefined); + assert.deepStrictEqual(found.result, expectedResult); + }, 2), + error: common.mustNotCall() +}; + +channel.subscribe(handlers); + +channel.traceCallback(function (cb, err, res) { + assert.deepStrictEqual(this, thisArg); + setImmediate(cb, err, res); +}, 0, input, thisArg, common.mustCall((err, res) => { + assert.strictEqual(err, null); + assert.deepStrictEqual(res, expectedResult); +}), null, expectedResult); + +channel.tracePromise(function (value) { + assert.deepStrictEqual(this, thisArg); + return Promise.resolve(value); +}, input, thisArg, expectedResult).then( + common.mustCall(value => { + assert.deepStrictEqual(value, expectedResult); + }), + common.mustNotCall() +); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js b/test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js new file mode 100644 index 00000000000000..3ceb78c6f1f2d9 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-sync-error.js @@ -0,0 +1,38 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedError = new Error('test'); +const input = { foo: 'bar' }; +const thisArg = { baz: 'buz' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check), + end: common.mustCall(check), + asyncEnd: common.mustNotCall(), + error: common.mustCall((found) => { + check(found); + assert.deepStrictEqual(found.error, expectedError); + }) +}; + +channel.subscribe(handlers); +try { + channel.traceSync(function (err) { + assert.deepStrictEqual(this, thisArg); + assert.strictEqual(err, expectedError); + throw err; + }, input, thisArg, expectedError); + + throw new Error('It should not reach this error'); +} catch (error) { + assert.deepStrictEqual(error, expectedError); +} diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-sync.js b/test/parallel/test-diagnostics-channel-tracing-channel-sync.js new file mode 100644 index 00000000000000..3b640280fc2771 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-sync.js @@ -0,0 +1,37 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('test'); + +const expectedResult = { foo: 'bar' }; +const input = { foo: 'bar' }; + +function check(found) { + assert.deepStrictEqual(found, input); +} + +const handlers = { + start: common.mustCall(check), + end: common.mustCall((found) => { + check(found); + assert.deepStrictEqual(found.result, expectedResult); + }), + asyncEnd: common.mustNotCall(), + error: common.mustNotCall() +}; + +assert.strictEqual(channel.hasSubscribers, false); +channel.subscribe(handlers); +assert.strictEqual(channel.hasSubscribers, true); +channel.traceSync(() => { + return expectedResult; +}, input); + +channel.unsubscribe(handlers); +assert.strictEqual(channel.hasSubscribers, false); +channel.traceSync(() => { + return expectedResult; +}, input);