Skip to content

Commit

Permalink
Add a note about force_flush implementation for OTLP exporters (#3262)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Jun 13, 2023
1 parent a5520e8 commit 5ec5064
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ def _translate_data(
def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._export(batch)

def shutdown(self) -> None:
pass
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

@property
def _exporting(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@

import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
from typing import ( # noqa: F401
Any,
Callable,
Dict,
Generic,
List,
Optional,
Tuple,
Union,
)
from typing import Sequence as TypingSequence
from typing import TypeVar
from urllib.parse import urlparse
Expand All @@ -45,7 +54,7 @@
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
)
from opentelemetry.proto.common.v1.common_pb2 import (
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
AnyValue,
ArrayValue,
KeyValue,
Expand Down Expand Up @@ -97,44 +106,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:
return _ENVIRON_TO_COMPRESSION[environ_value]


def _translate_value(value: Any) -> KeyValue:
if isinstance(value, bool):
any_value = AnyValue(bool_value=value)

elif isinstance(value, str):
any_value = AnyValue(string_value=value)

elif isinstance(value, int):
any_value = AnyValue(int_value=value)

elif isinstance(value, float):
any_value = AnyValue(double_value=value)

elif isinstance(value, Sequence):
any_value = AnyValue(
array_value=ArrayValue(values=[_translate_value(v) for v in value])
)

# Tracing specs currently does not support Mapping type attributes
# elif isinstance(value, Mapping):
# any_value = AnyValue(
# kvlist_value=KeyValueList(
# values=[
# _translate_key_values(str(k), v) for k, v in value.items()
# ]
# )
# )

else:
raise Exception(f"Invalid type {type(value)} of value {value}")

return any_value


def _translate_key_values(key: str, value: Any) -> KeyValue:
return KeyValue(key=key, value=_translate_value(value))


@deprecated(
version="1.18.0",
reason="Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead",
Expand Down Expand Up @@ -271,17 +242,6 @@ def _translate_data(
) -> ExportServiceRequestT:
pass

def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
output = []
if attributes:

for key, value in attributes.items():
try:
output.append(_translate_key_values(key, value))
except Exception as error: # pylint: disable=broad-except
logger.exception(error)
return output

def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,5 @@ def _exporting(self) -> str:
return "metrics"

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common._internal import _encode_value
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value
from opentelemetry.exporter.otlp.proto.grpc.version import __version__
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
Expand Down Expand Up @@ -367,7 +367,7 @@ def test_translate_log_data(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Zhengzhou, We have a heaviest rains in 1000 years"
),
attributes=[
Expand Down Expand Up @@ -426,7 +426,7 @@ def test_translate_multiple_logs(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Zhengzhou, We have a heaviest rains in 1000 years"
),
attributes=[
Expand Down Expand Up @@ -463,13 +463,13 @@ def test_translate_multiple_logs(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Sydney, Opera House is closed"
),
attributes=[
KeyValue(
key="custom_attr",
value=_translate_value([1, 2, 3]),
value=_encode_value([1, 2, 3]),
),
],
flags=int(
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_translate_multiple_logs(self):
16,
"big",
),
body=_translate_value(
body=_encode_value(
"Mumbai, Boil water before drinking"
),
attributes=[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def test_shutdown(self):
warning.records[0].message,
"Exporter already shutdown, ignoring batch",
)
self.exporter = OTLPMetricExporter()

def test_shutdown_wait_last_export(self):
add_MetricsServiceServicer_to_server(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry.attributes import BoundedAttributes
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
_is_backoff_v2,
_translate_key_values,
from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_key_value,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
Expand Down Expand Up @@ -840,31 +840,31 @@ def test_span_status_translate(self):

# pylint:disable=no-member
def test_translate_key_values(self):
bool_value = _translate_key_values("bool_type", False)
bool_value = _encode_key_value("bool_type", False)
self.assertTrue(isinstance(bool_value, KeyValue))
self.assertEqual(bool_value.key, "bool_type")
self.assertTrue(isinstance(bool_value.value, AnyValue))
self.assertFalse(bool_value.value.bool_value)

str_value = _translate_key_values("str_type", "str")
str_value = _encode_key_value("str_type", "str")
self.assertTrue(isinstance(str_value, KeyValue))
self.assertEqual(str_value.key, "str_type")
self.assertTrue(isinstance(str_value.value, AnyValue))
self.assertEqual(str_value.value.string_value, "str")

int_value = _translate_key_values("int_type", 2)
int_value = _encode_key_value("int_type", 2)
self.assertTrue(isinstance(int_value, KeyValue))
self.assertEqual(int_value.key, "int_type")
self.assertTrue(isinstance(int_value.value, AnyValue))
self.assertEqual(int_value.value.int_value, 2)

double_value = _translate_key_values("double_type", 3.2)
double_value = _encode_key_value("double_type", 3.2)
self.assertTrue(isinstance(double_value, KeyValue))
self.assertEqual(double_value.key, "double_type")
self.assertTrue(isinstance(double_value.value, AnyValue))
self.assertEqual(double_value.value.double_value, 3.2)

seq_value = _translate_key_values("seq_type", ["asd", "123"])
seq_value = _encode_key_value("seq_type", ["asd", "123"])
self.assertTrue(isinstance(seq_value, KeyValue))
self.assertEqual(seq_value.key, "seq_type")
self.assertTrue(isinstance(seq_value.value, AnyValue))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
return LogExportResult.FAILURE
return LogExportResult.FAILURE

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

def shutdown(self):
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def _exporting(self) -> str:
return "metrics"

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def shutdown(self):
self._shutdown = True

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True


Expand Down

0 comments on commit 5ec5064

Please sign in to comment.