Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select histogram aggregation with environment variables #3265

Merged
merged 9 commits into from Apr 28, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Select histogram aggregation with an environment variable
([#3265](https://github.com/open-telemetry/opentelemetry-python/pull/3265))
- Move Protobuf encoding to its own package
([#3169](https://github.com/open-telemetry/opentelemetry-python/pull/3169))
- Add experimental feature to detect resource detectors in auto instrumentation
Expand Down
Expand Up @@ -13,9 +13,27 @@
# limitations under the License.
import logging

from opentelemetry.sdk.metrics.export import (
MetricExporter,
)
from os import environ
from opentelemetry.sdk.metrics import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_attributes,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
Expand All @@ -24,17 +42,118 @@
from opentelemetry.sdk.metrics.export import (
MetricsData,
Gauge,
Histogram,
Histogram as HistogramType,
Sum,
ExponentialHistogram as ExponentialHistogramType,
)
from typing import Dict
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
)
from opentelemetry.sdk.metrics.view import (
ExponentialBucketHistogramAggregation,
ExplicitBucketHistogramAggregation,
)

_logger = logging.getLogger(__name__)


class OTLPMetricExporterMixin:
def _common_configuration(
self,
preferred_temporality: Dict[type, AggregationTemporality] = None,
) -> None:

instrument_class_temporality = {}

otel_exporter_otlp_metrics_temporality_preference = (
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
)

if otel_exporter_otlp_metrics_temporality_preference == "DELTA":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
if otel_exporter_otlp_metrics_temporality_preference != (
"CUMULATIVE"
):
_logger.warning(
"Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE"
" value found: "
f"{otel_exporter_otlp_metrics_temporality_preference}, "
"using CUMULATIVE"
)
instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

instrument_class_temporality.update(preferred_temporality or {})

otel_exporter_otlp_metrics_default_histogram_aggregation = environ.get(
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
"explicit_bucket_histogram",
)

if otel_exporter_otlp_metrics_default_histogram_aggregation == (
"base2_exponential_bucket_histogram"
):

histogram_aggregation_type = ExponentialBucketHistogramAggregation

else:

if otel_exporter_otlp_metrics_default_histogram_aggregation != (
"explicit_bucket_histogram"
):

_logger.warning(
(
"Invalid value for %s: %s, using explicit bucket "
"histogram aggregation"
),
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
otel_exporter_otlp_metrics_default_histogram_aggregation,
)

histogram_aggregation_type = ExplicitBucketHistogramAggregation

MetricExporter.__init__(
self,
preferred_temporality=instrument_class_temporality,
preferred_aggregation={Histogram: histogram_aggregation_type()},
)


def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
resource_metrics_dict = {}

Expand Down Expand Up @@ -85,7 +204,7 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, Histogram):
elif isinstance(metric.data, HistogramType):
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=_encode_attributes(
Expand Down
Expand Up @@ -47,15 +47,6 @@
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
)
from opentelemetry.sdk.metrics import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Expand All @@ -73,13 +64,17 @@
Sum,
ExponentialHistogram as ExponentialHistogramType,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
OTLPMetricExporterMixin,
)

_logger = getLogger(__name__)


class OTLPMetricExporter(
MetricExporter,
OTLPExporterMixin[Metric, ExportMetricsServiceRequest, MetricExportResult],
OTLPMetricExporterMixin,
):
"""OTLP metric exporter

Expand Down Expand Up @@ -132,63 +127,7 @@ def __init__(
else compression
)

instrument_class_temporality = {}

otel_exporter_otlp_metrics_temporality_preference = (
environ.get(
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
)

if otel_exporter_otlp_metrics_temporality_preference == "DELTA":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

elif otel_exporter_otlp_metrics_temporality_preference == "LOWMEMORY":
instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
if otel_exporter_otlp_metrics_temporality_preference != (
"CUMULATIVE"
):
_logger.warning(
"Unrecognized OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE"
" value found: "
f"{otel_exporter_otlp_metrics_temporality_preference}, "
"using CUMULATIVE"
)
instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

instrument_class_temporality.update(preferred_temporality or {})

MetricExporter.__init__(
self,
preferred_temporality=instrument_class_temporality,
preferred_aggregation=preferred_aggregation,
)
self._common_configuration(preferred_temporality)

OTLPExporterMixin.__init__(
self,
Expand Down
Expand Up @@ -44,6 +44,7 @@
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
Expand All @@ -68,6 +69,10 @@
ResourceMetrics,
ScopeMetrics,
)
from opentelemetry.sdk.metrics.view import (
ExplicitBucketHistogramAggregation,
ExponentialBucketHistogramAggregation,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import (
InstrumentationScope as SDKInstrumentationScope,
Expand Down Expand Up @@ -872,6 +877,57 @@ def test_aggregation_temporality(self):
AggregationTemporality.CUMULATIVE,
)

def test_exponential_explicit_bucket_histogram(self):

self.assertIsInstance(
# pylint: disable=protected-access
OTLPMetricExporter()._preferred_aggregation[Histogram],
ExplicitBucketHistogramAggregation,
)

with patch.dict(
environ,
{
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION: "base2_exponential_bucket_histogram"
},
):
self.assertIsInstance(
# pylint: disable=protected-access
OTLPMetricExporter()._preferred_aggregation[Histogram],
ExponentialBucketHistogramAggregation,
)

with patch.dict(
environ,
{OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION: "abc"},
):
with self.assertLogs(level=WARNING) as log:
self.assertIsInstance(
# pylint: disable=protected-access
OTLPMetricExporter()._preferred_aggregation[Histogram],
ExplicitBucketHistogramAggregation,
)
self.assertIn(
(
"Invalid value for OTEL_EXPORTER_OTLP_METRICS_DEFAULT_"
"HISTOGRAM_AGGREGATION: abc, using explicit bucket "
"histogram aggregation"
),
log.output[0],
)

with patch.dict(
environ,
{
OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION: "explicit_bucket_histogram"
},
):
self.assertIsInstance(
# pylint: disable=protected-access
OTLPMetricExporter()._preferred_aggregation[Histogram],
ExplicitBucketHistogramAggregation,
)


def _resource_metrics(
index: int, scope_metrics: List[ScopeMetrics]
Expand Down