diff --git a/CHANGELOG.md b/CHANGELOG.md index d449ad83dc..63e669ceba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ ### :rocket: (Enhancement) +* feat(sdk-metrics): implement MetricProducer specification [#4007](https://github.com/open-telemetry/opentelemetry-js/pull/4007) + ### :bug: (Bug Fix) ### :books: (Refine Doc) diff --git a/packages/sdk-metrics/src/export/MetricReader.ts b/packages/sdk-metrics/src/export/MetricReader.ts index 3bc4c63a06..8aad601d70 100644 --- a/packages/sdk-metrics/src/export/MetricReader.ts +++ b/packages/sdk-metrics/src/export/MetricReader.ts @@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api'; import { AggregationTemporality } from './AggregationTemporality'; import { MetricProducer } from './MetricProducer'; import { CollectionResult } from './MetricData'; -import { callWithTimeout } from '../utils'; +import { FlatMap, callWithTimeout } from '../utils'; import { InstrumentType } from '../InstrumentDescriptor'; import { CollectionOptions, @@ -45,6 +45,13 @@ export interface MetricReaderOptions { * not configured, cumulative is used for all instruments. */ aggregationTemporalitySelector?: AggregationTemporalitySelector; + /** + * **Note, this option is experimental**. Additional MetricProducers to use as a source of + * aggregated metric data in addition to the SDK's metric data. The resource returned by + * these MetricProducers is ignored; the SDK's resource will be used instead. + * @experimental + */ + metricProducers?: MetricProducer[]; } /** @@ -55,8 +62,10 @@ export abstract class MetricReader { // Tracks the shutdown state. // TODO: use BindOncePromise here once a new version of @opentelemetry/core is available. private _shutdown = false; - // MetricProducer used by this instance. - private _metricProducer?: MetricProducer; + // Additional MetricProducers which will be combined with the SDK's output + private _metricProducers: MetricProducer[]; + // MetricProducer used by this instance which produces metrics from the SDK + private _sdkMetricProducer?: MetricProducer; private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector; private readonly _aggregationSelector: AggregationSelector; @@ -66,20 +75,26 @@ export abstract class MetricReader { this._aggregationTemporalitySelector = options?.aggregationTemporalitySelector ?? DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR; + this._metricProducers = options?.metricProducers ?? []; } /** - * Set the {@link MetricProducer} used by this instance. + * Set the {@link MetricProducer} used by this instance. **This should only be called by the + * SDK and should be considered internal.** * + * To add additional {@link MetricProducer}s to a {@link MetricReader}, pass them to the + * constructor as {@link MetricReaderOptions.metricProducers}. + * + * @internal * @param metricProducer */ setMetricProducer(metricProducer: MetricProducer) { - if (this._metricProducer) { + if (this._sdkMetricProducer) { throw new Error( 'MetricReader can not be bound to a MeterProvider again.' ); } - this._metricProducer = metricProducer; + this._sdkMetricProducer = metricProducer; this.onInitialized(); } @@ -130,7 +145,7 @@ export abstract class MetricReader { * Collect all metrics from the associated {@link MetricProducer} */ async collect(options?: CollectionOptions): Promise { - if (this._metricProducer === undefined) { + if (this._sdkMetricProducer === undefined) { throw new Error('MetricReader is not bound to a MetricProducer'); } @@ -139,9 +154,37 @@ export abstract class MetricReader { throw new Error('MetricReader is shutdown'); } - return this._metricProducer.collect({ - timeoutMillis: options?.timeoutMillis, - }); + const [sdkCollectionResults, ...additionalCollectionResults] = + await Promise.all([ + this._sdkMetricProducer.collect({ + timeoutMillis: options?.timeoutMillis, + }), + ...this._metricProducers.map(producer => + producer.collect({ + timeoutMillis: options?.timeoutMillis, + }) + ), + ]); + + // Merge the results, keeping the SDK's Resource + const errors = sdkCollectionResults.errors.concat( + FlatMap(additionalCollectionResults, result => result.errors) + ); + const resource = sdkCollectionResults.resourceMetrics.resource; + const scopeMetrics = + sdkCollectionResults.resourceMetrics.scopeMetrics.concat( + FlatMap( + additionalCollectionResults, + result => result.resourceMetrics.scopeMetrics + ) + ); + return { + resourceMetrics: { + resource, + scopeMetrics, + }, + errors, + }; } /** diff --git a/packages/sdk-metrics/src/index.ts b/packages/sdk-metrics/src/index.ts index 4760da3ccc..c9623707f2 100644 --- a/packages/sdk-metrics/src/index.ts +++ b/packages/sdk-metrics/src/index.ts @@ -54,6 +54,8 @@ export { InMemoryMetricExporter } from './export/InMemoryMetricExporter'; export { ConsoleMetricExporter } from './export/ConsoleMetricExporter'; +export { MetricCollectOptions, MetricProducer } from './export/MetricProducer'; + export { InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor'; export { MeterProvider, MeterProviderOptions } from './MeterProvider'; diff --git a/packages/sdk-metrics/test/export/MetricReader.test.ts b/packages/sdk-metrics/test/export/MetricReader.test.ts index 297622982d..c0643a60da 100644 --- a/packages/sdk-metrics/test/export/MetricReader.test.ts +++ b/packages/sdk-metrics/test/export/MetricReader.test.ts @@ -20,7 +20,13 @@ import { MeterProvider } from '../../src/MeterProvider'; import { assertRejects } from '../test-utils'; import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer'; import { TestMetricReader } from './TestMetricReader'; -import { Aggregation, AggregationTemporality } from '../../src'; +import { + Aggregation, + AggregationTemporality, + DataPointType, + InstrumentType, + ScopeMetrics, +} from '../../src'; import { DEFAULT_AGGREGATION_SELECTOR, DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR, @@ -29,6 +35,39 @@ import { assertAggregationSelector, assertAggregationTemporalitySelector, } from './utils'; +import { defaultResource } from '../util'; +import { ValueType } from '@opentelemetry/api'; +import { Resource } from '@opentelemetry/resources'; + +const testScopeMetrics: ScopeMetrics[] = [ + { + scope: { + name: 'additionalMetricProducerMetrics', + }, + metrics: [ + { + aggregationTemporality: AggregationTemporality.CUMULATIVE, + dataPointType: DataPointType.SUM, + dataPoints: [ + { + attributes: {}, + value: 1, + startTime: [0, 0], + endTime: [1, 0], + }, + ], + descriptor: { + name: 'additionalCounter', + unit: '', + type: InstrumentType.COUNTER, + description: '', + valueType: ValueType.INT, + }, + isMonotonic: true, + }, + ], + }, +]; describe('MetricReader', () => { describe('setMetricProducer', () => { @@ -83,20 +122,91 @@ describe('MetricReader', () => { assertRejects(reader.collect(), /MetricReader is shutdown/); }); - it('should call MetricProduce.collect with timeout', async () => { + it('should call MetricProducer.collect with timeout', async () => { const reader = new TestMetricReader(); const producer = new TestMetricProducer(); reader.setMetricProducer(producer); - const collectStub = sinon.stub(producer, 'collect'); + const collectSpy = sinon.spy(producer, 'collect'); await reader.collect({ timeoutMillis: 20 }); - assert(collectStub.calledOnce); - const args = collectStub.args[0]; + assert(collectSpy.calledOnce); + const args = collectSpy.args[0]; assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]); await reader.shutdown(); }); + + it('should collect metrics from the SDK and the additional metricProducers', async () => { + const meterProvider = new MeterProvider({ resource: defaultResource }); + const additionalProducer = new TestMetricProducer({ + resourceMetrics: { + resource: new Resource({ + shouldBeDiscarded: 'should-be-discarded', + }), + scopeMetrics: testScopeMetrics, + }, + }); + const reader = new TestMetricReader({ + metricProducers: [additionalProducer], + }); + meterProvider.addMetricReader(reader); + + // Make a measurement + meterProvider + .getMeter('someSdkMetrics') + .createCounter('sdkCounter') + .add(5, { hello: 'world' }); + const collectionResult = await reader.collect(); + + assert.strictEqual(collectionResult.errors.length, 0); + // Should keep the SDK's Resource only + assert.deepStrictEqual( + collectionResult.resourceMetrics.resource, + defaultResource + ); + assert.strictEqual( + collectionResult.resourceMetrics.scopeMetrics.length, + 2 + ); + const [sdkScopeMetrics, additionalScopeMetrics] = + collectionResult.resourceMetrics.scopeMetrics; + + assert.strictEqual(sdkScopeMetrics.scope.name, 'someSdkMetrics'); + assert.strictEqual( + additionalScopeMetrics.scope.name, + 'additionalMetricProducerMetrics' + ); + + await reader.shutdown(); + }); + + it('should merge the errors from the SDK and all metricProducers', async () => { + const meterProvider = new MeterProvider(); + const reader = new TestMetricReader({ + metricProducers: [ + new TestMetricProducer({ errors: ['err1'] }), + new TestMetricProducer({ errors: ['err2'] }), + ], + }); + meterProvider.addMetricReader(reader); + + // Provide a callback throwing an error too + meterProvider + .getMeter('someSdkMetrics') + .createObservableCounter('sdkCounter') + .addCallback(result => { + throw 'errsdk'; + }); + const collectionResult = await reader.collect(); + + assert.deepStrictEqual(collectionResult.errors, [ + 'errsdk', + 'err1', + 'err2', + ]); + await reader.shutdown(); + }); }); describe('selectAggregation', () => { diff --git a/packages/sdk-metrics/test/export/TestMetricProducer.ts b/packages/sdk-metrics/test/export/TestMetricProducer.ts index 29137d62a4..d4865724c3 100644 --- a/packages/sdk-metrics/test/export/TestMetricProducer.ts +++ b/packages/sdk-metrics/test/export/TestMetricProducer.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { CollectionResult } from '../../src/export/MetricData'; +import { CollectionResult, ResourceMetrics } from '../../src/export/MetricData'; import { MetricProducer } from '../../src/export/MetricProducer'; import { defaultResource } from '../util'; @@ -24,10 +24,21 @@ export const emptyResourceMetrics = { }; export class TestMetricProducer implements MetricProducer { + private resourceMetrics: ResourceMetrics; + private errors: unknown[]; + + constructor(params?: { + resourceMetrics?: ResourceMetrics; + errors?: unknown[]; + }) { + this.resourceMetrics = params?.resourceMetrics ?? emptyResourceMetrics; + this.errors = params?.errors ?? []; + } + async collect(): Promise { return { - resourceMetrics: { resource: defaultResource, scopeMetrics: [] }, - errors: [], + resourceMetrics: this.resourceMetrics, + errors: this.errors, }; } } diff --git a/packages/sdk-metrics/test/export/TestMetricReader.ts b/packages/sdk-metrics/test/export/TestMetricReader.ts index 61727322a4..46fd41c045 100644 --- a/packages/sdk-metrics/test/export/TestMetricReader.ts +++ b/packages/sdk-metrics/test/export/TestMetricReader.ts @@ -31,7 +31,7 @@ export class TestMetricReader extends MetricReader { } getMetricCollector(): MetricCollector { - return this['_metricProducer'] as MetricCollector; + return this['_sdkMetricProducer'] as MetricCollector; } }