Skip to content

Commit

Permalink
Adding OT Collector metrics exporter (#454)
Browse files Browse the repository at this point in the history
Current implementation use OpenCensus receiver available in Collector, this
will need to be updated eventually to use OT receiver when is ready.

Fixes #344

Co-authored-by: Chris Kleinknecht <libc@google.com>
  • Loading branch information
hectorhdzg and c24t committed Mar 11, 2020
1 parent 4b6a52d commit dd8521e
Show file tree
Hide file tree
Showing 8 changed files with 508 additions and 3 deletions.
53 changes: 53 additions & 0 deletions examples/metrics/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module serves as an example for a simple application using metrics
exporting to Collector
"""

from opentelemetry import metrics
from opentelemetry.ext.otcollector.metrics_exporter import (
CollectorMetricsExporter,
)
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export.controller import PushController

# Meter is responsible for creating and recording metrics
metrics.set_preferred_meter_provider_implementation(lambda _: MeterProvider())
meter = metrics.get_meter(__name__)
# exporter to export metrics to OT Collector
exporter = CollectorMetricsExporter(
service_name="basic-service", endpoint="localhost:55678"
)
# controller collects metrics created from meter and exports it via the
# exporter every interval
controller = PushController(meter, exporter, 5)

counter = meter.create_metric(
"requests",
"number of requests",
"requests",
int,
Counter,
("environment",),
)

# Labelsets are used to identify key-values that are associated with a specific
# metric that you want to record. These are useful for pre-aggregation and can
# be used to store custom dimensions pertaining to a metric
label_set = meter.get_label_set({"environment": "staging"})

counter.add(25, label_set)
input("Press any key to exit...")
18 changes: 18 additions & 0 deletions examples/metrics/docker/collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
receivers:
opencensus:
endpoint: "0.0.0.0:55678"

exporters:
prometheus:
endpoint: "0.0.0.0:8889"
logging: {}

processors:
batch:
queued_retry:

service:
pipelines:
metrics:
receivers: [opencensus]
exporters: [logging, prometheus]
19 changes: 19 additions & 0 deletions examples/metrics/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: "2"
services:

otel-collector:
image: omnition/opentelemetry-collector-contrib:latest
command: ["--config=/conf/collector-config.yaml", "--log-level=DEBUG"]
volumes:
- ./collector-config.yaml:/conf/collector-config.yaml
ports:
- "8889:8889" # Prometheus exporter metrics
- "55678:55678" # OpenCensus receiver

prometheus:
container_name: prometheus
image: prom/prometheus:latest
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
5 changes: 5 additions & 0 deletions examples/metrics/docker/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 5s
static_configs:
- targets: ['otel-collector:8889']
49 changes: 46 additions & 3 deletions ext/opentelemetry-ext-otcollector/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ OpenTelemetry Collector Exporter
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-otcollector.svg
:target: https://pypi.org/project/opentelemetry-ext-otcollector/

This library allows to export data to `OpenTelemetry Collector <https://github.com/open-telemetry/opentelemetry-collector/>`_.
This library allows to export data to `OpenTelemetry Collector <https://github.com/open-telemetry/opentelemetry-collector/>`_ , currently using OpenCensus receiver in Collector side.

Installation
------------
Expand All @@ -16,8 +16,8 @@ Installation
pip install opentelemetry-ext-otcollector


Usage
-----
Traces Usage
------------

The **OpenTelemetry Collector Exporter** allows to export `OpenTelemetry`_ traces to `OpenTelemetry Collector`_.

Expand Down Expand Up @@ -48,6 +48,49 @@ The **OpenTelemetry Collector Exporter** allows to export `OpenTelemetry`_ trace
with tracer.start_as_current_span("foo"):
print("Hello world!")
Metrics Usage
-------------

The **OpenTelemetry Collector Exporter** allows to export `OpenTelemetry`_ metrics to `OpenTelemetry Collector`_.

.. code:: python
from opentelemetry import metrics
from opentelemetry.ext.otcollector.metrics_exporter import CollectorMetricsExporter
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export.controller import PushController
# create a CollectorMetricsExporter
collector_exporter = CollectorMetricsExporter(
# optional:
# endpoint="myCollectorUrl:55678",
# service_name="test_service",
# host_name="machine/container name",
)
# Meter is responsible for creating and recording metrics
metrics.set_preferred_meter_provider_implementation(lambda _: MeterProvider())
meter = metrics.get_meter(__name__)
# controller collects metrics created from meter and exports it via the
# exporter every interval
controller = PushController(meter, collector_exporter, 5)
counter = meter.create_metric(
"requests",
"number of requests",
"requests",
int,
Counter,
("environment",),
)
# Labelsets are used to identify key-values that are associated with a specific
# metric that you want to record. These are useful for pre-aggregation and can
# be used to store custom dimensions pertaining to a metric
label_set = meter.get_label_set({"environment": "staging"})
counter.add(25, label_set)
References
----------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""OpenTelemetry Collector Metrics Exporter."""

import logging
from typing import Sequence

import grpc
from opencensus.proto.agent.metrics.v1 import (
metrics_service_pb2,
metrics_service_pb2_grpc,
)
from opencensus.proto.metrics.v1 import metrics_pb2

import opentelemetry.ext.otcollector.util as utils
from opentelemetry.sdk.metrics import Counter, Metric
from opentelemetry.sdk.metrics.export import (
MetricRecord,
MetricsExporter,
MetricsExportResult,
aggregate,
)

DEFAULT_ENDPOINT = "localhost:55678"

logger = logging.getLogger(__name__)


# pylint: disable=no-member
class CollectorMetricsExporter(MetricsExporter):
"""OpenTelemetry Collector metrics exporter.
Args:
endpoint: OpenTelemetry Collector OpenCensus receiver endpoint.
service_name: Name of Collector service.
host_name: Host name.
client: MetricsService client stub.
"""

def __init__(
self,
endpoint: str = DEFAULT_ENDPOINT,
service_name: str = None,
host_name: str = None,
client: metrics_service_pb2_grpc.MetricsServiceStub = None,
):
self.endpoint = endpoint
if client is None:
channel = grpc.insecure_channel(self.endpoint)
self.client = metrics_service_pb2_grpc.MetricsServiceStub(
channel=channel
)
else:
self.client = client

self.node = utils.get_node(service_name, host_name)

def export(
self, metric_records: Sequence[MetricRecord]
) -> MetricsExportResult:
try:
responses = self.client.Export(
self.generate_metrics_requests(metric_records)
)

# Read response
for _ in responses:
pass

except grpc.RpcError:
return MetricsExportResult.FAILED_RETRYABLE

return MetricsExportResult.SUCCESS

def shutdown(self) -> None:
pass

def generate_metrics_requests(
self, metrics: Sequence[MetricRecord]
) -> metrics_service_pb2.ExportMetricsServiceRequest:
collector_metrics = translate_to_collector(metrics)
service_request = metrics_service_pb2.ExportMetricsServiceRequest(
node=self.node, metrics=collector_metrics
)
yield service_request


# pylint: disable=too-many-branches
def translate_to_collector(
metric_records: Sequence[MetricRecord],
) -> Sequence[metrics_pb2.Metric]:
collector_metrics = []
for metric_record in metric_records:

label_values = []
label_keys = []
for label_tuple in metric_record.label_set.labels:
label_keys.append(metrics_pb2.LabelKey(key=label_tuple[0]))
label_values.append(
metrics_pb2.LabelValue(
has_value=label_tuple[1] is not None, value=label_tuple[1]
)
)

metric_descriptor = metrics_pb2.MetricDescriptor(
name=metric_record.metric.name,
description=metric_record.metric.description,
unit=metric_record.metric.unit,
type=get_collector_metric_type(metric_record.metric),
label_keys=label_keys,
)

timeseries = metrics_pb2.TimeSeries(
label_values=label_values,
points=[get_collector_point(metric_record)],
)
collector_metrics.append(
metrics_pb2.Metric(
metric_descriptor=metric_descriptor, timeseries=[timeseries]
)
)
return collector_metrics


# pylint: disable=no-else-return
def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor:
if isinstance(metric, Counter):
if metric.value_type == int:
return metrics_pb2.MetricDescriptor.CUMULATIVE_INT64
elif metric.value_type == float:
return metrics_pb2.MetricDescriptor.CUMULATIVE_DOUBLE
return metrics_pb2.MetricDescriptor.UNSPECIFIED


def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
point = metrics_pb2.Point(
timestamp=utils.proto_timestamp_from_time_ns(
metric_record.metric.bind(
metric_record.label_set
).last_update_timestamp
)
)
if metric_record.metric.value_type == int:
point.int64_value = metric_record.aggregator.checkpoint
elif metric_record.metric.value_type == float:
point.double_value = metric_record.aggregator.checkpoint
else:
raise TypeError(
"Unsupported metric type: {}".format(
metric_record.metric.value_type
)
)
return point

0 comments on commit dd8521e

Please sign in to comment.