Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Jun 13, 2023
2 parents 957be65 + 5ec5064 commit 0d0668e
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 71 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

- Add max_scale option to Exponential Bucket Histogram Aggregation [#3323](https://github.com/open-telemetry/opentelemetry-python/pull/3323))
- Use BoundedAttributes instead of raw dict to extract attributes from LogRecord and Support dropped_attributes_count in LogRecord ([#3310](https://github.com/open-telemetry/opentelemetry-python/pull/3310))

## Version 1.18.0/0.39b0 (2023-05-04)

- Select histogram aggregation with an environment variable
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ mypy-protobuf~=3.0.0
markupsafe==2.0.1
bleach==4.1.0 # This dependency was updated to a breaking version.
codespell==2.1.0
requests==2.28.1
requests==2.31.0
ruamel.yaml==0.17.21
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ def _translate_data(
def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._export(batch)

def shutdown(self) -> None:
pass
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

@property
def _exporting(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@

import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
from typing import ( # noqa: F401
Any,
Callable,
Dict,
Generic,
List,
Optional,
Tuple,
Union,
)
from typing import Sequence as TypingSequence
from typing import TypeVar
from urllib.parse import urlparse
Expand All @@ -45,7 +54,7 @@
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
)
from opentelemetry.proto.common.v1.common_pb2 import (
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
AnyValue,
ArrayValue,
KeyValue,
Expand Down Expand Up @@ -97,44 +106,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:
return _ENVIRON_TO_COMPRESSION[environ_value]


def _translate_value(value: Any) -> KeyValue:
if isinstance(value, bool):
any_value = AnyValue(bool_value=value)

elif isinstance(value, str):
any_value = AnyValue(string_value=value)

elif isinstance(value, int):
any_value = AnyValue(int_value=value)

elif isinstance(value, float):
any_value = AnyValue(double_value=value)

elif isinstance(value, Sequence):
any_value = AnyValue(
array_value=ArrayValue(values=[_translate_value(v) for v in value])
)

# Tracing specs currently does not support Mapping type attributes
# elif isinstance(value, Mapping):
# any_value = AnyValue(
# kvlist_value=KeyValueList(
# values=[
# _translate_key_values(str(k), v) for k, v in value.items()
# ]
# )
# )

else:
raise Exception(f"Invalid type {type(value)} of value {value}")

return any_value


def _translate_key_values(key: str, value: Any) -> KeyValue:
return KeyValue(key=key, value=_translate_value(value))


@deprecated(
version="1.18.0",
reason="Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead",
Expand Down Expand Up @@ -271,17 +242,6 @@ def _translate_data(
) -> ExportServiceRequestT:
pass

def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
output = []
if attributes:

for key, value in attributes.items():
try:
output.append(_translate_key_values(key, value))
except Exception as error: # pylint: disable=broad-except
logger.exception(error)
return output

def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,5 @@ def _exporting(self) -> str:
return "metrics"

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common._internal import _encode_value
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value
from opentelemetry.exporter.otlp.proto.grpc.version import __version__
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
Expand Down Expand Up @@ -367,7 +367,7 @@ def test_translate_log_data(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Zhengzhou, We have a heaviest rains in 1000 years"
),
attributes=[
Expand Down Expand Up @@ -426,7 +426,7 @@ def test_translate_multiple_logs(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Zhengzhou, We have a heaviest rains in 1000 years"
),
attributes=[
Expand Down Expand Up @@ -463,13 +463,13 @@ def test_translate_multiple_logs(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Sydney, Opera House is closed"
),
attributes=[
KeyValue(
key="custom_attr",
value=_translate_value([1, 2, 3]),
value=_encode_value([1, 2, 3]),
),
],
flags=int(
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_translate_multiple_logs(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Mumbai, Boil water before drinking"
),
attributes=[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def test_shutdown(self):
warning.records[0].message,
"Exporter already shutdown, ignoring batch",
)
self.exporter = OTLPMetricExporter()

def test_shutdown_wait_last_export(self):
add_MetricsServiceServicer_to_server(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry.attributes import BoundedAttributes
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
_is_backoff_v2,
_translate_key_values,
from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_key_value,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
Expand Down Expand Up @@ -840,31 +840,31 @@ def test_span_status_translate(self):

# pylint:disable=no-member
def test_translate_key_values(self):
bool_value = _translate_key_values("bool_type", False)
bool_value = _encode_key_value("bool_type", False)
self.assertTrue(isinstance(bool_value, KeyValue))
self.assertEqual(bool_value.key, "bool_type")
self.assertTrue(isinstance(bool_value.value, AnyValue))
self.assertFalse(bool_value.value.bool_value)

str_value = _translate_key_values("str_type", "str")
str_value = _encode_key_value("str_type", "str")
self.assertTrue(isinstance(str_value, KeyValue))
self.assertEqual(str_value.key, "str_type")
self.assertTrue(isinstance(str_value.value, AnyValue))
self.assertEqual(str_value.value.string_value, "str")

int_value = _translate_key_values("int_type", 2)
int_value = _encode_key_value("int_type", 2)
self.assertTrue(isinstance(int_value, KeyValue))
self.assertEqual(int_value.key, "int_type")
self.assertTrue(isinstance(int_value.value, AnyValue))
self.assertEqual(int_value.value.int_value, 2)

double_value = _translate_key_values("double_type", 3.2)
double_value = _encode_key_value("double_type", 3.2)
self.assertTrue(isinstance(double_value, KeyValue))
self.assertEqual(double_value.key, "double_type")
self.assertTrue(isinstance(double_value.value, AnyValue))
self.assertEqual(double_value.value.double_value, 3.2)

seq_value = _translate_key_values("seq_type", ["asd", "123"])
seq_value = _encode_key_value("seq_type", ["asd", "123"])
self.assertTrue(isinstance(seq_value, KeyValue))
self.assertEqual(seq_value.key, "seq_type")
self.assertTrue(isinstance(seq_value.value, AnyValue))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
return LogExportResult.FAILURE
return LogExportResult.FAILURE

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

def shutdown(self):
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def _exporting(self) -> str:
return "metrics"

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def shutdown(self):
self._shutdown = True

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def __init__(
# See the derivation here:
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation)
max_size: int = 160,
max_scale: int = 20,
):
super().__init__(attributes)
# max_size is the maximum capacity of the positive and negative
Expand All @@ -403,6 +404,7 @@ def __init__(
)

self._max_size = max_size
self._max_scale = max_scale

# _sum is the sum of all the values aggregated by this aggregator.
self._sum = 0
Expand All @@ -428,7 +430,14 @@ def __init__(

# _mapping corresponds to the current scale, is shared by both the
# positive and negative buckets.
self._mapping = LogarithmMapping(LogarithmMapping._max_scale)

if self._max_scale > 20:
_logger.warning(
"max_scale is set to %s which is "
"larger than the recommended value of 20",
self._max_scale,
)
self._mapping = LogarithmMapping(self._max_scale)

self._instrument_temporality = AggregationTemporality.DELTA
self._start_time_unix_nano = start_time_unix_nano
Expand Down Expand Up @@ -941,9 +950,10 @@ class ExponentialBucketHistogramAggregation(Aggregation):
def __init__(
self,
max_size: int = 160,
max_scale: int = 20,
):

self._max_size = max_size
self._max_scale = max_scale

def _create_aggregation(
self,
Expand All @@ -955,6 +965,7 @@ def _create_aggregation(
attributes,
start_time_unix_nano,
max_size=self._max_size,
max_scale=self._max_scale,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from itertools import permutations
from logging import WARNING
from math import ldexp
from sys import float_info
from types import MethodType
Expand All @@ -37,6 +38,9 @@
LogarithmMapping,
)
from opentelemetry.sdk.metrics._internal.measurement import Measurement
from opentelemetry.sdk.metrics.view import (
ExponentialBucketHistogramAggregation,
)


def get_counts(buckets: Buckets) -> int:
Expand Down Expand Up @@ -77,6 +81,39 @@ def swap(


class TestExponentialBucketHistogramAggregation(TestCase):
@patch("opentelemetry.sdk.metrics._internal.aggregation.LogarithmMapping")
def test_create_aggregation(self, mock_logarithm_mapping):
exponential_bucket_histogram_aggregation = (
ExponentialBucketHistogramAggregation()
)._create_aggregation(Mock(), Mock(), Mock())

self.assertEqual(
exponential_bucket_histogram_aggregation._max_scale, 20
)

mock_logarithm_mapping.assert_called_with(20)

exponential_bucket_histogram_aggregation = (
ExponentialBucketHistogramAggregation(max_scale=10)
)._create_aggregation(Mock(), Mock(), Mock())

self.assertEqual(
exponential_bucket_histogram_aggregation._max_scale, 10
)

mock_logarithm_mapping.assert_called_with(10)

with self.assertLogs(level=WARNING):
exponential_bucket_histogram_aggregation = (
ExponentialBucketHistogramAggregation(max_scale=100)
)._create_aggregation(Mock(), Mock(), Mock())

self.assertEqual(
exponential_bucket_histogram_aggregation._max_scale, 100
)

mock_logarithm_mapping.assert_called_with(100)

def assertInEpsilon(self, first, second, epsilon):
self.assertLessEqual(first, (second * (1 + epsilon)))
self.assertGreaterEqual(first, (second * (1 - epsilon)))
Expand Down

0 comments on commit 0d0668e

Please sign in to comment.