Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change temporality for Counter and UpDownCounter to CUMULATIVE #1384

Merged
merged 1 commit into from Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-otlp/CHANGELOG.md
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Change temporality for Counter and UpDownCounter
([#1384](https://github.com/open-telemetry/opentelemetry-python/pull/1384))
- Add Gzip compression for exporter
([#1141](https://github.com/open-telemetry/opentelemetry-python/pull/1141))
- OTLP exporter: Handle error case when no credentials supplied
Expand Down
Expand Up @@ -15,8 +15,7 @@
"""OTLP Metrics Exporter"""

import logging
import os
from typing import List, Optional, Sequence, Type, TypeVar, Union
from typing import List, Optional, Sequence, Type, TypeVar

from grpc import ChannelCredentials

Expand Down Expand Up @@ -71,7 +70,9 @@


def _get_data_points(
export_record: ExportRecord, data_point_class: Type[DataPointT]
export_record: ExportRecord,
data_point_class: Type[DataPointT],
aggregation_temporality: int,
) -> List[DataPointT]:

if isinstance(export_record.aggregator, SumAggregator):
Expand All @@ -91,16 +92,23 @@ def _get_data_points(
elif isinstance(export_record.aggregator, ValueObserverAggregator):
value = export_record.aggregator.checkpoint.last

if aggregation_temporality == (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
):
start_time_unix_nano = export_record.aggregator.first_timestamp
else:
start_time_unix_nano = (
export_record.aggregator.initial_checkpoint_timestamp
)

return [
data_point_class(
labels=[
StringKeyValue(key=str(label_key), value=str(label_value))
for label_key, label_value in export_record.labels
],
value=value,
start_time_unix_nano=(
export_record.aggregator.initial_checkpoint_timestamp
),
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=(export_record.aggregator.last_update_timestamp),
)
]
Expand Down Expand Up @@ -215,25 +223,35 @@ def _translate_data(
data_point_class = type_class[value_type]["data_point_class"]

if isinstance(export_record.instrument, Counter):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=True,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(export_record.instrument, UpDownCounter):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=False,
)
argument = type_class[value_type]["sum"]["argument"]
Expand All @@ -243,33 +261,45 @@ def _translate_data(
continue

elif isinstance(export_record.instrument, SumObserver):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=True,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(export_record.instrument, UpDownSumObserver):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=False,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(export_record.instrument, (ValueObserver)):
otlp_metric_data = gauge_class(
data_points=_get_data_points(
export_record, data_point_class
export_record,
data_point_class,
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA,
)
)
argument = type_class[value_type]["gauge"]["argument"]
Expand Down
Expand Up @@ -46,7 +46,9 @@


class TestOTLPMetricExporter(TestCase):
def setUp(self):
@patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def setUp(self, mock_time_ns): # pylint: disable=arguments-differ
mock_time_ns.configure_mock(**{"return_value": 1})
self.exporter = OTLPMetricsExporter(insecure=True)
resource = SDKResource(OrderedDict([("a", 1), ("b", False)]))

Expand Down Expand Up @@ -95,12 +97,9 @@ def test_no_credentials_error(self):
with self.assertRaises(ValueError):
OTLPMetricsExporter()

@patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_translate_metrics(self, mock_time_ns):
def test_translate_metrics(self):
# pylint: disable=no-member

mock_time_ns.configure_mock(**{"return_value": 1})

self.counter_export_record.aggregator.checkpoint = 1
self.counter_export_record.aggregator.initial_checkpoint_timestamp = 1
self.counter_export_record.aggregator.last_update_timestamp = 1
Expand Down Expand Up @@ -137,7 +136,7 @@ def test_translate_metrics(self, mock_time_ns):
)
],
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
),
is_monotonic=True,
),
Expand Down
Expand Up @@ -34,6 +34,7 @@ def __init__(self, config=None):
self._lock = threading.Lock()
self.last_update_timestamp = 0
self.initial_checkpoint_timestamp = 0
self.first_timestamp = time_ns()
self.checkpointed = True
if config is not None:
self.config = config
Expand Down