Skip to content

Commit

Permalink
metrics: Implement release for handles and observers (#435)
Browse files Browse the repository at this point in the history
This commit implements a solution for releasing instrument handles and
observers.

For the handles it is based on a ref count that is increased each time the
handled is acquired, when the ref count reaches 0 the handle is removed on
collection time.  The direct call convention is updated to release the handle
after it has been updated.

The observer instrument is only updated on collection time, so it can be
removed as soon as the user request to do so.
  • Loading branch information
mauriciovasquezbernal committed Mar 17, 2020
1 parent 6bfc48b commit 4e551ba
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 26 deletions.
9 changes: 7 additions & 2 deletions docs/examples/metrics/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@
# Therefore, getting a bound metric instrument using the same set of labels
# will yield the same bound metric instrument.
bound_counter = counter.bind(label_set)
bound_counter.add(100)
for i in range(1000):
bound_counter.add(i)

# You can release the bound instrument we you are done
bound_counter.release()

# Direct metric usage
# You can record metrics directly using the metric instrument. You pass in a
Expand All @@ -79,4 +83,5 @@
# (metric, value) pairs. The value would be recorded for each metric using the
# specified labelset for each.
meter.record_batch(label_set, [(counter, 50), (counter2, 70)])
time.sleep(100)

time.sleep(10)
14 changes: 14 additions & 0 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def record(self, value: ValueT) -> None:
value: The value to record to the bound metric instrument.
"""

def release(self) -> None:
"""No-op implementation of release."""


class BoundCounter:
def add(self, value: ValueT) -> None:
Expand Down Expand Up @@ -350,6 +353,14 @@ def register_observer(
Returns: A new ``Observer`` metric instrument.
"""

@abc.abstractmethod
def unregister_observer(self, observer: "Observer") -> None:
"""Unregisters an ``Observer`` metric instrument.
Args:
observer: The observer to unregister.
"""

@abc.abstractmethod
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
"""Gets a `LabelSet` with the given labels.
Expand Down Expand Up @@ -396,6 +407,9 @@ def register_observer(
) -> "Observer":
return DefaultObserver()

def unregister_observer(self, observer: "Observer") -> None:
pass

def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
# pylint: disable=no-self-use
return DefaultLabelSet()
Expand Down
8 changes: 7 additions & 1 deletion opentelemetry-api/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_measure_record(self):
measure.record(1, label_set)

def test_default_bound_metric(self):
metrics.DefaultBoundInstrument()
bound_instrument = metrics.DefaultBoundInstrument()
bound_instrument.release()

def test_bound_counter(self):
bound_counter = metrics.BoundCounter()
Expand All @@ -59,3 +60,8 @@ def test_bound_counter(self):
def test_bound_measure(self):
bound_measure = metrics.BoundMeasure()
bound_measure.record(1)

def test_observer(self):
observer = metrics.DefaultObserver()
label_set = metrics.LabelSet()
observer.observe(1, label_set)
5 changes: 5 additions & 0 deletions opentelemetry-api/tests/test_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ def test_register_observer(self):
observer = meter.register_observer(callback, "", "", "", int, (), True)
self.assertIsInstance(observer, metrics.DefaultObserver)

def test_unregister_observer(self):
meter = metrics.DefaultMeter()
observer = metrics.DefaultObserver()
meter.unregister_observer(observer)

def test_get_label_set(self):
meter = metrics.DefaultMeter()
label_set = meter.get_label_set({})
Expand Down
88 changes: 66 additions & 22 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import threading
from typing import Dict, Sequence, Tuple, Type

from opentelemetry import metrics as metrics_api
Expand Down Expand Up @@ -71,6 +72,8 @@ def __init__(
self.enabled = enabled
self.aggregator = aggregator
self.last_update_timestamp = time_ns()
self._ref_count = 0
self._ref_count_lock = threading.Lock()

def _validate_update(self, value: metrics_api.ValueT) -> bool:
if not self.enabled:
Expand All @@ -86,6 +89,21 @@ def update(self, value: metrics_api.ValueT):
self.last_update_timestamp = time_ns()
self.aggregator.update(value)

def release(self):
self.decrease_ref_count()

def decrease_ref_count(self):
with self._ref_count_lock:
self._ref_count -= 1

def increase_ref_count(self):
with self._ref_count_lock:
self._ref_count += 1

def ref_count(self):
with self._ref_count_lock:
return self._ref_count

def __repr__(self):
return '{}(data="{}", last_update_timestamp={})'.format(
type(self).__name__,
Expand Down Expand Up @@ -137,18 +155,21 @@ def __init__(
self.label_keys = label_keys
self.enabled = enabled
self.bound_instruments = {}
self.bound_instruments_lock = threading.Lock()

def bind(self, label_set: LabelSet) -> BaseBoundInstrument:
"""See `opentelemetry.metrics.Metric.bind`."""
bound_instrument = self.bound_instruments.get(label_set)
if not bound_instrument:
bound_instrument = self.BOUND_INSTR_TYPE(
self.value_type,
self.enabled,
# Aggregator will be created based off type of metric
self.meter.batcher.aggregator_for(self.__class__),
)
self.bound_instruments[label_set] = bound_instrument
with self.bound_instruments_lock:
bound_instrument = self.bound_instruments.get(label_set)
if bound_instrument is None:
bound_instrument = self.BOUND_INSTR_TYPE(
self.value_type,
self.enabled,
# Aggregator will be created based off type of metric
self.meter.batcher.aggregator_for(self.__class__),
)
self.bound_instruments[label_set] = bound_instrument
bound_instrument.increase_ref_count()
return bound_instrument

def __repr__(self):
Expand All @@ -167,7 +188,9 @@ class Counter(Metric, metrics_api.Counter):

def add(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
"""See `opentelemetry.metrics.Counter.add`."""
self.bind(label_set).add(value)
bound_intrument = self.bind(label_set)
bound_intrument.add(value)
bound_intrument.release()

UPDATE_FUNCTION = add

Expand All @@ -179,7 +202,9 @@ class Measure(Metric, metrics_api.Measure):

def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
"""See `opentelemetry.metrics.Measure.record`."""
self.bind(label_set).record(value)
bound_intrument = self.bind(label_set)
bound_intrument.record(value)
bound_intrument.release()

UPDATE_FUNCTION = record

Expand Down Expand Up @@ -279,6 +304,7 @@ def __init__(
self.metrics = set()
self.observers = set()
self.batcher = UngroupedBatcher(stateful)
self.observers_lock = threading.Lock()
self.resource = resource

def collect(self) -> None:
Expand All @@ -294,26 +320,39 @@ def collect(self) -> None:

def _collect_metrics(self) -> None:
for metric in self.metrics:
if metric.enabled:
if not metric.enabled:
continue

to_remove = []

with metric.bound_instruments_lock:
for label_set, bound_instr in metric.bound_instruments.items():
# TODO: Consider storing records in memory?
record = Record(metric, label_set, bound_instr.aggregator)
# Checkpoints the current aggregators
# Applies different batching logic based on type of batcher
self.batcher.process(record)

if bound_instr.ref_count() == 0:
to_remove.append(label_set)

# Remove handles that were released
for label_set in to_remove:
del metric.bound_instruments[label_set]

def _collect_observers(self) -> None:
for observer in self.observers:
if not observer.enabled:
continue
with self.observers_lock:
for observer in self.observers:
if not observer.enabled:
continue

# TODO: capture timestamp?
if not observer.run():
continue
# TODO: capture timestamp?
if not observer.run():
continue

for label_set, aggregator in observer.aggregators.items():
record = Record(observer, label_set, aggregator)
self.batcher.process(record)
for label_set, aggregator in observer.aggregators.items():
record = Record(observer, label_set, aggregator)
self.batcher.process(record)

def record_batch(
self,
Expand Down Expand Up @@ -368,9 +407,14 @@ def register_observer(
label_keys,
enabled,
)
self.observers.add(ob)
with self.observers_lock:
self.observers.add(ob)
return ob

def unregister_observer(self, observer: "Observer") -> None:
with self.observers_lock:
self.observers.remove(observer)

def get_label_set(self, labels: Dict[str, str]):
"""See `opentelemetry.metrics.Meter.create_metric`.
Expand Down
72 changes: 71 additions & 1 deletion opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_collect(self):
)
kvp = {"key1": "value1"}
label_set = meter.get_label_set(kvp)
counter.add(label_set, 1.0)
counter.add(1.0, label_set)
meter.metrics.add(counter)
meter.collect()
self.assertTrue(batcher_mock.process.called)
Expand Down Expand Up @@ -179,6 +179,18 @@ def test_register_observer(self):
self.assertEqual(observer.label_keys, ())
self.assertTrue(observer.enabled)

def test_unregister_observer(self):
meter = metrics.MeterProvider().get_meter(__name__)

callback = mock.Mock()

observer = meter.register_observer(
callback, "name", "desc", "unit", int, (), True
)

meter.unregister_observer(observer)
self.assertEqual(len(meter.observers), 0)

def test_get_label_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
kvp = {"environment": "staging", "a": "z"}
Expand All @@ -193,6 +205,64 @@ def test_get_label_set_empty(self):
label_set = meter.get_label_set(kvp)
self.assertEqual(label_set, metrics.EMPTY_LABEL_SET)

def test_direct_call_release_bound_instrument(self):
meter = metrics.MeterProvider().get_meter(__name__)
label_keys = ("key1",)
kvp = {"key1": "value1"}
label_set = meter.get_label_set(kvp)

counter = metrics.Counter(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(counter)
counter.add(4.0, label_set)

measure = metrics.Measure(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(measure)
measure.record(42.0, label_set)

self.assertEqual(len(counter.bound_instruments), 1)
self.assertEqual(len(measure.bound_instruments), 1)

meter.collect()

self.assertEqual(len(counter.bound_instruments), 0)
self.assertEqual(len(measure.bound_instruments), 0)

def test_release_bound_instrument(self):
meter = metrics.MeterProvider().get_meter(__name__)
label_keys = ("key1",)
kvp = {"key1": "value1"}
label_set = meter.get_label_set(kvp)

counter = metrics.Counter(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(counter)
bound_counter = counter.bind(label_set)
bound_counter.add(4.0)

measure = metrics.Measure(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(measure)
bound_measure = measure.bind(label_set)
bound_measure.record(42)

bound_counter.release()
bound_measure.release()

# be sure that bound instruments are only released after collection
self.assertEqual(len(counter.bound_instruments), 1)
self.assertEqual(len(measure.bound_instruments), 1)

meter.collect()

self.assertEqual(len(counter.bound_instruments), 0)
self.assertEqual(len(measure.bound_instruments), 0)


class TestMetric(unittest.TestCase):
def test_bind(self):
Expand Down

0 comments on commit 4e551ba

Please sign in to comment.