Skip to content

Commit

Permalink
Renaming batcher to processor (#1203)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrex committed Oct 6, 2020
1 parent f9218ca commit 8ccc5cf
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 76 deletions.
4 changes: 2 additions & 2 deletions docs/getting_started/prometheus_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
# Start Prometheus client
start_http_server(port=8000, addr="localhost")

batcher_mode = "stateful"
processor_mode = "stateful"
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__, batcher_mode == "stateful")
meter = metrics.get_meter(__name__, processor_mode == "stateful")
exporter = PrometheusMetricsExporter("MyAppPrefix")
controller = PushController(meter, exporter, 5)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
opentelemetry.sdk.metrics.export.batcher
opentelemetry.sdk.metrics.export.processor
==========================================

.. toctree::

metrics.export

.. automodule:: opentelemetry.sdk.metrics.export.batcher
.. automodule:: opentelemetry.sdk.metrics.export.processor
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion docs/sdk/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Submodules
.. toctree::

metrics.export.aggregate
metrics.export.batcher
metrics.export.processor
util.instrumentation

.. automodule:: opentelemetry.sdk.metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def translate_to_collector(

# If cumulative and stateful, explicitly set the start_timestamp to
# exporter start time.
if metric_record.instrument.meter.batcher.stateful:
if metric_record.instrument.meter.processor.stateful:
start_timestamp = exporter_start_timestamp
else:
start_timestamp = None
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120))
- Allow for Custom Trace and Span IDs Generation - `IdsGenerator` for TracerProvider
([#1153](https://github.com/open-telemetry/opentelemetry-python/pull/1153))
- Renaming metrics Batcher to Processor
([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203))

## Version 0.13b0

Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
MetricsExporter,
)
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.metrics.export.batcher import Batcher
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.processor import Processor
from opentelemetry.sdk.metrics.view import (
ViewData,
ViewManager,
Expand Down Expand Up @@ -325,7 +325,7 @@ class ValueObserver(Observer, metrics_api.ValueObserver):


class Record:
"""Container class used for processing in the `Batcher`"""
"""Container class used for processing in the `Processor`"""

def __init__(
self,
Expand All @@ -352,7 +352,7 @@ def __init__(
instrumentation_info: "InstrumentationInfo",
):
self.instrumentation_info = instrumentation_info
self.batcher = Batcher(source.stateful)
self.processor = Processor(source.stateful)
self.resource = source.resource
self.metrics = set()
self.observers = set()
Expand All @@ -363,7 +363,7 @@ def __init__(
def collect(self) -> None:
"""Collects all the metrics created with this `Meter` for export.
Utilizes the batcher to create checkpoints of the current values in
Utilizes the processor to create checkpoints of the current values in
each aggregator belonging to the metrics that were created with this
meter instance.
"""
Expand All @@ -385,7 +385,7 @@ def _collect_metrics(self) -> None:
record = Record(
metric, view_data.labels, view_data.aggregator
)
self.batcher.process(record)
self.processor.process(record)

if bound_instrument.ref_count() == 0:
to_remove.append(labels)
Expand All @@ -405,7 +405,7 @@ def _collect_observers(self) -> None:

for labels, aggregator in observer.aggregators.items():
record = Record(observer, labels, aggregator)
self.batcher.process(record)
self.processor.process(record)

def record_batch(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def tick(self):
self.meter.collect()
# Export the collected metrics
token = attach(set_value("suppress_instrumentation", True))
self.exporter.export(self.meter.batcher.checkpoint_set())
self.exporter.export(self.meter.processor.checkpoint_set())
detach(token)
# Perform post-exporting logic
self.meter.batcher.finished_collection()
self.meter.processor.finished_collection()
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
from opentelemetry.sdk.util import get_dict_as_key


class Batcher:
"""Base class for all batcher types.
class Processor:
"""Base class for all processor types.
The batcher is responsible for storing the aggregators and aggregated
The processor is responsible for storing the aggregators and aggregated
values received from updates from metrics in the meter. The stored values
will be sent to an exporter for exporting.
"""

def __init__(self, stateful: bool):
self._batch_map = {}
# stateful=True indicates the batcher computes checkpoints from over
# the process lifetime. False indicates the batcher computes
# stateful=True indicates the processor computes checkpoints from over
# the process lifetime. False indicates the processor computes
# checkpoints which describe the updates of a single collection period
# (deltas)
self.stateful = stateful
Expand All @@ -38,7 +38,7 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
"""Returns a list of MetricRecords used for exporting.
The list of MetricRecords is a snapshot created from the current
data in all of the aggregators in this batcher.
data in all of the aggregators in this processor.
"""
metric_records = []
# pylint: disable=W0612
Expand All @@ -52,7 +52,7 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
def finished_collection(self):
"""Performs certain post-export logic.
For batchers that are stateless, resets the batch map.
For processors that are stateless, resets the batch map.
"""
if not self.stateful:
self._batch_map = {}
Expand Down
78 changes: 39 additions & 39 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
SumAggregator,
ValueObserverAggregator,
)
from opentelemetry.sdk.metrics.export.batcher import Batcher
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.export.processor import Processor


# pylint: disable=protected-access
Expand Down Expand Up @@ -61,10 +61,10 @@ def test_export(self):
mock_stdout.write.assert_any_call(result)


class TestBatcher(unittest.TestCase):
class TestProcessor(unittest.TestCase):
def test_checkpoint_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -73,21 +73,21 @@ def test_checkpoint_set(self):
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
records = batcher.checkpoint_set()
processor._batch_map = _batch_map
records = processor.checkpoint_set()
self.assertEqual(len(records), 1)
self.assertEqual(records[0].instrument, metric)
self.assertEqual(records[0].labels, labels)
self.assertEqual(records[0].aggregator, aggregator)

def test_checkpoint_set_empty(self):
batcher = Batcher(True)
records = batcher.checkpoint_set()
processor = Processor(True)
records = processor.checkpoint_set()
self.assertEqual(len(records), 0)

def test_finished_collection_stateless(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(False)
processor = Processor(False)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -96,13 +96,13 @@ def test_finished_collection_stateless(self):
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
batcher.finished_collection()
self.assertEqual(len(batcher._batch_map), 0)
processor._batch_map = _batch_map
processor.finished_collection()
self.assertEqual(len(processor._batch_map), 0)

def test_finished_collection_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -111,13 +111,13 @@ def test_finished_collection_stateful(self):
labels = ()
_batch_map = {}
_batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator
batcher._batch_map = _batch_map
batcher.finished_collection()
self.assertEqual(len(batcher._batch_map), 1)
processor._batch_map = _batch_map
processor.finished_collection()
self.assertEqual(len(processor._batch_map), 1)

def test_batcher_process_exists(self):
def test_processor_process_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
aggregator2 = SumAggregator()
metric = metrics.Counter(
Expand All @@ -128,17 +128,17 @@ def test_batcher_process_exists(self):
batch_key = (metric, SumAggregator, tuple(), labels)
_batch_map[batch_key] = aggregator
aggregator2.update(1.0)
batcher._batch_map = _batch_map
processor._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator2)
batcher.process(record)
self.assertEqual(len(batcher._batch_map), 1)
self.assertIsNotNone(batcher._batch_map.get(batch_key))
self.assertEqual(batcher._batch_map.get(batch_key).current, 0)
self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0)
processor.process(record)
self.assertEqual(len(processor._batch_map), 1)
self.assertIsNotNone(processor._batch_map.get(batch_key))
self.assertEqual(processor._batch_map.get(batch_key).current, 0)
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_batcher_process_not_exists(self):
def test_processor_process_not_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -147,17 +147,17 @@ def test_batcher_process_not_exists(self):
_batch_map = {}
batch_key = (metric, SumAggregator, tuple(), labels)
aggregator.update(1.0)
batcher._batch_map = _batch_map
processor._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator)
batcher.process(record)
self.assertEqual(len(batcher._batch_map), 1)
self.assertIsNotNone(batcher._batch_map.get(batch_key))
self.assertEqual(batcher._batch_map.get(batch_key).current, 0)
self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0)
processor.process(record)
self.assertEqual(len(processor._batch_map), 1)
self.assertIsNotNone(processor._batch_map.get(batch_key))
self.assertEqual(processor._batch_map.get(batch_key).current, 0)
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)

def test_batcher_process_not_stateful(self):
def test_processor_process_not_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
processor = Processor(True)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -166,13 +166,13 @@ def test_batcher_process_not_stateful(self):
_batch_map = {}
batch_key = (metric, SumAggregator, tuple(), labels)
aggregator.update(1.0)
batcher._batch_map = _batch_map
processor._batch_map = _batch_map
record = metrics.Record(metric, labels, aggregator)
batcher.process(record)
self.assertEqual(len(batcher._batch_map), 1)
self.assertIsNotNone(batcher._batch_map.get(batch_key))
self.assertEqual(batcher._batch_map.get(batch_key).current, 0)
self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0)
processor.process(record)
self.assertEqual(len(processor._batch_map), 1)
self.assertIsNotNone(processor._batch_map.get(batch_key))
self.assertEqual(processor._batch_map.get(batch_key).current, 0)
self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0)


class TestSumAggregator(unittest.TestCase):
Expand Down
Loading

0 comments on commit 8ccc5cf

Please sign in to comment.