Skip to content

Commit

Permalink
feat(aggregators): implement histogram aggregator #927
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarchaud committed Apr 6, 2020
1 parent 55a011d commit 55a57e8
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import {
MetricRecord,
MetricKind,
Sum,
Distribution,
} from './types';
import { Distribution } from './aggregators'
import { ExportResult } from '@opentelemetry/base';

/**
Expand Down
96 changes: 96 additions & 0 deletions packages/opentelemetry-metrics/src/export/aggregators/histogram.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*!
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Aggregator, Point } from '../types';
import { HrTime } from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';

export type Checkpoint = {
buckets: {
boundaries: number[];
counts: number[];
};
sum: number;
count: number;
};

/** Basic aggregator which calculates a Sum from individual measurements. */
export class HistogramAggregator implements Aggregator {
private _lastCheckpoint: Checkpoint;
private _currentCheckpoint: Checkpoint;
private _lastCheckpointTime: HrTime = [0, 0];
private _boundaries: number[];

constructor(boundaries: number[]) {
if (boundaries === undefined || boundaries.length === 0) {
throw new Error(`HistogramAggregator should be created with boundaries.`);
}
this._boundaries = boundaries.sort();
this._lastCheckpoint = this._newEmptyCheckpoint();
this._currentCheckpoint = this._newEmptyCheckpoint();
}

get sum() {
return this._lastCheckpoint.sum;
}

get count() {
return this._lastCheckpoint.count;
}

get checkpoint() {
return this._lastCheckpoint;
}

update(value: number): void {
this._currentCheckpoint.count += 1;
this._currentCheckpoint.sum += value;

for (let i = 0; i < this._boundaries.length; i++) {
if (value < this._boundaries[i]) {
this._currentCheckpoint.buckets.counts[i] += 1;
return;
}
}

// value is above all observed boundaries
this._currentCheckpoint.buckets.counts[this._boundaries.length] += 1;
}

resetCheckpoint(): void {
this._lastCheckpointTime = hrTime();
this._lastCheckpoint = this._currentCheckpoint;
this._currentCheckpoint = this._newEmptyCheckpoint();
}

toPoint(): Point {
return {
value: this._lastCheckpoint,
timestamp: this._lastCheckpointTime,
};
}

private _newEmptyCheckpoint(): Checkpoint {
return {
buckets: {
boundaries: this._boundaries,
counts: this._boundaries.map(() => 0).concat([0]),
},
sum: 0,
count: 0,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@

export * from './countersum';
export * from './observer';
export * from './measureexact';
export * from './measureexact';
export * from './histogram';
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@
* limitations under the License.
*/

import { Aggregator, Point, Distribution } from '../types';
import { Aggregator, Point } from '../types';
import { HrTime } from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';

export interface Distribution {
min: number;
max: number;
count: number;
sum: number;
}

/** Basic aggregator keeping all raw values (events, sum, max and min). */
export class MeasureExactAggregator implements Aggregator {
private _distribution: Distribution;
Expand Down Expand Up @@ -46,4 +53,4 @@ export class MeasureExactAggregator implements Aggregator {
timestamp: this._lastUpdateTime,
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ export class ObserverAggregator implements Aggregator {
timestamp: this._lastUpdateTime,
};
}
}
}
10 changes: 2 additions & 8 deletions packages/opentelemetry-metrics/src/export/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { ValueType, HrTime, Labels } from '@opentelemetry/api';
import { ExportResult } from '@opentelemetry/base';
import { Distribution, Checkpoint } from './aggregators';

/** The kind of metric. */
export enum MetricKind {
Expand All @@ -30,13 +31,6 @@ export type Sum = number;
/** LastValue returns last value. */
export type LastValue = number;

export interface Distribution {
min: number;
max: number;
count: number;
sum: number;
}

export interface MetricRecord {
readonly descriptor: MetricDescriptor;
readonly labels: Labels;
Expand Down Expand Up @@ -80,6 +74,6 @@ export interface Aggregator {
}

export interface Point {
value: Sum | LastValue | Distribution;
value: Sum | LastValue | Distribution | Checkpoint;
timestamp: HrTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*!
* Copyright 2019, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import { HistogramAggregator } from '../../../src/export/aggregators';

describe('HistogramAggregator', () => {
describe('constructor()', () => {
it('should construct a histogramAggregator', () => {
assert.doesNotThrow(() => {
new HistogramAggregator([1, 2]);
});
});

it('should sort boundaries', () => {
const aggregator = new HistogramAggregator([500, 300, 700]);
assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [
300,
500,
700,
]);
});

it('should throw if no boundaries are defined', () => {
// @ts-ignore
assert.throws(() => new HistogramAggregator());
assert.throws(() => new HistogramAggregator([]));
});
});

describe('.update()', () => {
it('should not update checkpoint', () => {
const aggregator = new HistogramAggregator([100, 200]);
aggregator.update(150);
assert.equal(aggregator.checkpoint.count, 0);
assert.equal(aggregator.checkpoint.sum, 0);
});

it('should update the second bucket', () => {
const aggregator = new HistogramAggregator([100, 200]);
aggregator.update(150);
aggregator.resetCheckpoint();
assert.equal(aggregator.checkpoint.count, 1);
assert.equal(aggregator.checkpoint.sum, 150);
assert.equal(aggregator.checkpoint.buckets.counts[0], 0);
assert.equal(aggregator.checkpoint.buckets.counts[1], 1);
assert.equal(aggregator.checkpoint.buckets.counts[2], 0);
});

it('should update the second bucket', () => {
const aggregator = new HistogramAggregator([100, 200]);
aggregator.update(50);
aggregator.resetCheckpoint();
assert.equal(aggregator.checkpoint.count, 1);
assert.equal(aggregator.checkpoint.sum, 50);
assert.equal(aggregator.checkpoint.buckets.counts[0], 1);
assert.equal(aggregator.checkpoint.buckets.counts[1], 0);
assert.equal(aggregator.checkpoint.buckets.counts[2], 0);
});

it('should update the third bucket since value is above all boundaries', () => {
const aggregator = new HistogramAggregator([100, 200]);
aggregator.update(250);
aggregator.resetCheckpoint();
assert.equal(aggregator.checkpoint.count, 1);
assert.equal(aggregator.checkpoint.sum, 250);
assert.equal(aggregator.checkpoint.buckets.counts[0], 0);
assert.equal(aggregator.checkpoint.buckets.counts[1], 0);
assert.equal(aggregator.checkpoint.buckets.counts[2], 1);
});
});

describe('.count', () => {
it('should return last checkpoint count', () => {
const aggregator = new HistogramAggregator([100]);
assert.equal(aggregator.count, aggregator.checkpoint.count);
aggregator.update(10);
aggregator.resetCheckpoint();
assert.equal(aggregator.checkpoint.count, 1);
assert.equal(aggregator.count, aggregator.checkpoint.count);
});
});

describe('.sum', () => {
it('should return last checkpoint sum', () => {
const aggregator = new HistogramAggregator([100]);
assert.equal(aggregator.sum, aggregator.checkpoint.sum);
aggregator.update(10);
aggregator.resetCheckpoint();
assert.equal(aggregator.checkpoint.sum, 10);
assert.deepEqual(aggregator.sum, aggregator.checkpoint.sum);
});
});

describe('.resetCheckpoint()', () => {
it('should create a empty checkoint by default', () => {
const aggregator = new HistogramAggregator([100]);
assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [100]);
assert(aggregator.checkpoint.buckets.counts.every(count => count === 0));
// should contains one bucket for each boundary + one for values outside of the largest boundary
assert.equal(aggregator.checkpoint.buckets.counts.length, 2);
assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [100]);
assert.equal(aggregator.checkpoint.count, 0);
assert.equal(aggregator.checkpoint.sum, 0);
});

it('should update checkpoint', () => {
const aggregator = new HistogramAggregator([100]);
aggregator.update(10);
aggregator.resetCheckpoint();
assert.equal(aggregator.checkpoint.count, 1);
assert.equal(aggregator.checkpoint.sum, 10);
assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [100]);
assert.equal(aggregator.checkpoint.buckets.counts.length, 2);
assert.deepEqual(aggregator.checkpoint.buckets.counts, [1, 0]);
});
});

describe('.toPoint()', () => {
it('should return default checkpoint', () => {
const aggregator = new HistogramAggregator([100]);
assert.deepEqual(aggregator.toPoint().value, aggregator.checkpoint);
assert.deepEqual(aggregator.toPoint().timestamp, [0, 0]);
});

it('should return last checkpoint if updated', () => {
const aggregator = new HistogramAggregator([100]);
aggregator.update(100);
aggregator.resetCheckpoint();
assert.deepEqual(aggregator.toPoint().value, aggregator.checkpoint);
console.log(aggregator.toPoint().timestamp);
assert(
aggregator
.toPoint()
.timestamp.every(nbr => typeof nbr === 'number' && nbr !== 0)
);
});
});
});

0 comments on commit 55a57e8

Please sign in to comment.