Skip to content

Commit

Permalink
Refactor view conflict handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Aug 2, 2022
1 parent 43288ca commit 0a565b1
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def __init__(
) -> None:
self._lock = RLock()
self._sdk_config = sdk_config
self._instrument_view_instrument_matches: Dict[
Instrument, List[_ViewInstrumentMatch]
self._instrumentation_scope_instrument_view_instrument_matches: Dict[
InstrumentationScope, Dict[Instrument, List[_ViewInstrumentMatch]]
] = {}
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation
Expand All @@ -79,13 +79,33 @@ def _get_or_init_view_instrument_match(
# Optimistically get the relevant views for the given instrument. Once set for a given
# instrument, the mapping will never change

if instrument in self._instrument_view_instrument_matches:
return self._instrument_view_instrument_matches[instrument]
if instrument.instrumentation_scope in (
self._instrumentation_scope_instrument_view_instrument_matches
) and instrument in (
self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
]
):
return (
self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
][instrument]
)

with self._lock:
# double check if it was set before we held the lock
if instrument in self._instrument_view_instrument_matches:
return self._instrument_view_instrument_matches[instrument]
if instrument.instrumentation_scope in (
self._instrumentation_scope_instrument_view_instrument_matches
) and instrument in (
self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
]
):
return self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
][
instrument
]

# not present, hold the lock and add a new mapping
view_instrument_matches = []
Expand All @@ -105,9 +125,6 @@ def _get_or_init_view_instrument_match(
),
)
)
self._instrument_view_instrument_matches[
instrument
] = view_instrument_matches

return view_instrument_matches

Expand Down Expand Up @@ -139,83 +156,93 @@ def collect(self) -> MetricsData:
) = {}

for (
instrument,
view_instrument_matches,
) in self._instrument_view_instrument_matches.items():
aggregation_temporality = self._instrument_class_temporality[
instrument.__class__
]
instrument_view_instrument_matches
) in (
self._instrumentation_scope_instrument_view_instrument_matches.values()
):
for (
instrument,
view_instrument_matches,
) in instrument_view_instrument_matches.items():
aggregation_temporality = (
self._instrument_class_temporality[
instrument.__class__
]
)

metrics: List[Metric] = []
metrics: List[Metric] = []

for view_instrument_match in view_instrument_matches:
for view_instrument_match in view_instrument_matches:

if isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_SumAggregation,
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
if isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_SumAggregation,
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality,
collection_start_nanos,
),
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
)
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_DropAggregation,
):
continue

metrics.append(
Metric(
elif isinstance(
# pylint: disable=protected-access
name=view_instrument_match._name,
description=view_instrument_match._description,
unit=view_instrument_match._instrument.unit,
data=data,
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality,
collection_start_nanos,
)
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality,
collection_start_nanos,
),
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_DropAggregation,
):
continue

metrics.append(
Metric(
# pylint: disable=protected-access
name=view_instrument_match._name,
description=view_instrument_match._description,
unit=view_instrument_match._instrument.unit,
data=data,
)
)
)

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)
if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

return MetricsData(
resource_metrics=[
Expand All @@ -229,11 +256,30 @@ def collect(self) -> MetricsData:
]
)

# pylint: disable=too-many-branches
def _handle_view_instrument_match(
self,
instrument: Instrument,
view_instrument_matches: List["_ViewInstrumentMatch"],
) -> None:

if instrument.instrumentation_scope not in (
self._instrumentation_scope_instrument_view_instrument_matches
):
self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
] = {}

if instrument not in (
self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
]
):
self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
][instrument] = view_instrument_matches

# pylint: disable=too-many-nested-blocks
for view in self._sdk_config.views:
# pylint: disable=protected-access
if not view._match(instrument):
Expand All @@ -252,20 +298,64 @@ def _handle_view_instrument_match(

for (
existing_view_instrument_matches
) in self._instrument_view_instrument_matches.values():
) in self._instrumentation_scope_instrument_view_instrument_matches[
instrument.instrumentation_scope
].values():
for (
existing_view_instrument_match
) in existing_view_instrument_matches:
if existing_view_instrument_match.conflicts(
new_view_instrument_match

conflict_messages = []

if (
existing_view_instrument_match._name
== new_view_instrument_match._name
):

_logger.warning(
"Views %s and %s will cause conflicting "
"metrics identities",
existing_view_instrument_match._view,
new_view_instrument_match._view,
)
if (
existing_view_instrument_match._description
!= new_view_instrument_match._description
):
conflict_messages.append("description")

# The aggregation class is being used here instead of
# data point type since they are functionally
# equivalent.
if (
existing_view_instrument_match._aggregation.__class__
!= new_view_instrument_match._aggregation.__class__
):
conflict_messages.append("aggregation")

elif isinstance(
existing_view_instrument_match._aggregation,
_SumAggregation,
):
if (
existing_view_instrument_match._aggregation._instrument_is_monotonic
!= new_view_instrument_match._aggregation._instrument_is_monotonic
):
conflict_messages.append("monotonicity")

if (
existing_view_instrument_match._aggregation._instrument_temporality
!= new_view_instrument_match._aggregation._instrument_temporality
):
conflict_messages.append(
"aggregation temporality"
)

if conflict_messages:

_logger.warning(
"Views named %s will cause conflicting "
"metrics identities for instrument %s because "
"of different %s. Rename a view named %s.",
existing_view_instrument_match._view._name,
instrument.name,
",".join(conflict_messages),
new_view_instrument_match._view._name,
)

view_instrument_matches.append(new_view_instrument_match)

Expand Down

0 comments on commit 0a565b1

Please sign in to comment.