Skip to content

Commit

Permalink
Align collection start times (#2679)
Browse files Browse the repository at this point in the history
* Align collection start times

Fixes #2677

* Undo unnecessary change

* Use fixed values for more deterministic tests

* Align start times

* Rename variable

* Add time align test case

* Remove unnecessary views

* Add more test cases
  • Loading branch information
ocelotl authored May 13, 2022
1 parent e8fbb08 commit 91211b3
Show file tree
Hide file tree
Showing 7 changed files with 545 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from opentelemetry.sdk._metrics._internal.measurement import Measurement
from opentelemetry.sdk._metrics._internal.point import DataPointT
from opentelemetry.sdk._metrics._internal.view import View
from opentelemetry.util._time import _time_ns

_logger = getLogger(__name__)

Expand All @@ -39,6 +40,7 @@ def __init__(
instrument: Instrument,
instrument_class_aggregation: Dict[type, Aggregation],
):
self._start_time_unix_nano = _time_ns()
self._view = view
self._instrument = instrument
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
Expand All @@ -50,12 +52,12 @@ def __init__(
)
if not isinstance(self._view._aggregation, DefaultAggregation):
self._aggregation = self._view._aggregation._create_aggregation(
self._instrument, None
self._instrument, None, 0
)
else:
self._aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument, None)
]._create_aggregation(self._instrument, None, 0)

def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
# pylint: disable=protected-access
Expand Down Expand Up @@ -103,21 +105,31 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
aggregation = (
self._view._aggregation._create_aggregation(
self._instrument, attributes
self._instrument,
attributes,
self._start_time_unix_nano,
)
)
else:
aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument, attributes)
]._create_aggregation(
self._instrument,
attributes,
self._start_time_unix_nano,
)
self._attributes_aggregation[attributes] = aggregation

self._attributes_aggregation[attributes].aggregate(measurement)

def collect(
self, aggregation_temporality: AggregationTemporality
self,
aggregation_temporality: AggregationTemporality,
collection_start_nanos: int,
) -> Iterable[DataPointT]:

with self._lock:
for aggregation in self._attributes_aggregation.values():
yield aggregation.collect(aggregation_temporality)
yield aggregation.collect(
aggregation_temporality, collection_start_nanos
)
Loading

0 comments on commit 91211b3

Please sign in to comment.