Skip to content

Commit

Permalink
Rename AwsXRayFormat to AwsXRayPropagator in its own file + Add N…
Browse files Browse the repository at this point in the history
…athanielRN as Propagator owner
  • Loading branch information
NathanielRN committed Oct 12, 2021
1 parent e8af7a3 commit ae20787
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 322 deletions.
5 changes: 4 additions & 1 deletion .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ components:
instrumentation/opentelemetry-instrumentation-pika:
- oxeye-nikolay
- nikosokolik


propagator/opentelemetry-propagator-aws-xray:
- NathanielRN

sdk-extension/opentelemetry-sdk-extension-aws:
- NathanielRN
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#720](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/720))
- `opentelemetry-instrumentation-sqlalchemy` Respect provided tracer provider when instrumenting SQLAlchemy
([#728](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/728))
- `opentelemetry-propagators-aws-xray` Rename `AwsXRayFormat` to `AwsXRayPropagator`
([#729](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/729))


### Changed
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_simple_start_span(benchmark):
Make sure the test file is under the `tests/performance/benchmarks/` folder of
the package it is benchmarking and further has a path that corresponds to the
file in the package it is testing. Make sure that the file name begins with
`test_benchmark_`. (e.g. `sdk-extension/opentelemetry-sdk-extension-aws/tests/performance/benchmarks/trace/propagation/test_benchmark_aws_xray_format.py`)
`test_benchmark_`. (e.g. `propagator/opentelemetry-propagator-aws-xray/tests/performance/benchmarks/trace/propagation/test_benchmark_aws_xray_propagator.py`)

## Pull Requests

Expand Down
8 changes: 4 additions & 4 deletions docs/nitpick-exceptions.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ class_references=
opentelemetry.propagators.textmap.Setter
opentelemetry.propagators.textmap.Getter
opentelemetry.propagators.textmap.TextMapPropagator
; - AwsXRayFormat
; - AwsXRayPropagator
opentelemetry.propagators.textmap.DefaultGetter
; API
opentelemetry.propagators.textmap.Getter
; - DatadogFormat
; - AWSXRayFormat
; - AWSXRayPropagator
opentelemetry.sdk.trace.id_generator.IdGenerator
; - AwsXRayIdGenerator
TextMapPropagator
CarrierT
Setter
Getter
; - AwsXRayFormat.extract
; - AwsXRayPropagator.extract
; httpx changes __module__ causing Sphinx to error and no Sphinx site is available
httpx.Client
httpx.AsyncClient
Expand All @@ -29,7 +29,7 @@ class_references=
anys=
; API
opentelemetry.propagators.textmap.TextMapPropagator.fields
; - AWSXRayFormat
; - AWSXRayPropagator
TraceId
; - AwsXRayIdGenerator
TraceIdRatioBased
Expand Down
4 changes: 2 additions & 2 deletions propagator/opentelemetry-propagator-aws-xray/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ Or by setting this propagator in your instrumented application:
.. code-block:: python
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.aws import AwsXRayFormat
from opentelemetry.propagators.aws import AwsXRayPropagator
set_global_textmap(AwsXRayFormat())
set_global_textmap(AwsXRayPropagator())
References
Expand Down
2 changes: 1 addition & 1 deletion propagator/opentelemetry-propagator-aws-xray/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ install_requires =

[options.entry_points]
opentelemetry_propagator =
xray = opentelemetry.propagators.aws:AwsXRayFormat
xray = opentelemetry.propagators.aws:AwsXRayPropagator

[options.extras_require]
test =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,312 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""
AWS X-Ray Propagator
--------------------
from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator

The **AWS X-Ray Propagator** provides a propagator that when used, adds a `trace
header`_ to outgoing traces that is compatible with the AWS X-Ray backend service.
This allows the trace context to be propagated when a trace spans multiple AWS
services.
The same propagator setup is used to extract a context sent by external systems
so that child span have the correct parent context.
**NOTE**: Because the parent context parsed from the ``X-Amzn-Trace-Id`` header
assumes the context is _not_ sampled by default, users should make sure to add
``Sampled=1`` to their ``X-Amzn-Trace-Id`` headers so that the child spans are
sampled.
Usage
-----
Use the provided AWS X-Ray Propagator to inject the necessary context into
traces sent to external systems.
This can be done by either setting this environment variable:
::
export OTEL_PROPAGATORS = xray
Or by setting this propagator in your instrumented application:
.. code-block:: python
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.aws import AwsXRayFormat
set_global_textmap(AwsXRayFormat())
API
---
.. _trace header: https://docs.aws.amazon.com/xray/latest/devguide/xray-concepts.html#xray-concepts-tracingheader
"""

import logging
import typing

from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.propagators.textmap import (
CarrierT,
Getter,
Setter,
TextMapPropagator,
default_getter,
default_setter,
)

TRACE_HEADER_KEY = "X-Amzn-Trace-Id"
KV_PAIR_DELIMITER = ";"
KEY_AND_VALUE_DELIMITER = "="

TRACE_ID_KEY = "Root"
TRACE_ID_LENGTH = 35
TRACE_ID_VERSION = "1"
TRACE_ID_DELIMITER = "-"
TRACE_ID_DELIMITER_INDEX_1 = 1
TRACE_ID_DELIMITER_INDEX_2 = 10
TRACE_ID_FIRST_PART_LENGTH = 8

PARENT_ID_KEY = "Parent"
PARENT_ID_LENGTH = 16

SAMPLED_FLAG_KEY = "Sampled"
SAMPLED_FLAG_LENGTH = 1
IS_SAMPLED = "1"
NOT_SAMPLED = "0"


_logger = logging.getLogger(__name__)


class AwsParseTraceHeaderError(Exception):
def __init__(self, message):
super().__init__()
self.message = message


class AwsXRayFormat(TextMapPropagator):
"""Propagator for the AWS X-Ray Trace Header propagation protocol.
See: https://docs.aws.amazon.com/xray/latest/devguide/xray-concepts.html#xray-concepts-tracingheader
"""

# AWS

def extract(
self,
carrier: CarrierT,
context: typing.Optional[Context] = None,
getter: Getter = default_getter,
) -> Context:
if context is None:
context = Context()

trace_header_list = getter.get(carrier, TRACE_HEADER_KEY)

if not trace_header_list or len(trace_header_list) != 1:
return context

trace_header = trace_header_list[0]

if not trace_header:
return context

try:
(
trace_id,
span_id,
sampled,
) = AwsXRayFormat._extract_span_properties(trace_header)
except AwsParseTraceHeaderError as err:
_logger.debug(err.message)
return context

options = 0
if sampled:
options |= trace.TraceFlags.SAMPLED

span_context = trace.SpanContext(
trace_id=trace_id,
span_id=span_id,
is_remote=True,
trace_flags=trace.TraceFlags(options),
trace_state=trace.TraceState(),
)

if not span_context.is_valid:
_logger.debug(
"Invalid Span Extracted. Inserting INVALID span into provided context."
)
return context

return trace.set_span_in_context(
trace.NonRecordingSpan(span_context), context=context
)

@staticmethod
def _extract_span_properties(trace_header):
trace_id = trace.INVALID_TRACE_ID
span_id = trace.INVALID_SPAN_ID
sampled = False

for kv_pair_str in trace_header.split(KV_PAIR_DELIMITER):
try:
key_str, value_str = kv_pair_str.split(KEY_AND_VALUE_DELIMITER)
key, value = key_str.strip(), value_str.strip()
except ValueError as ex:
raise AwsParseTraceHeaderError(
(
"Error parsing X-Ray trace header. Invalid key value pair: %s. Returning INVALID span context.",
kv_pair_str,
)
) from ex
if key == TRACE_ID_KEY:
if not AwsXRayFormat._validate_trace_id(value):
raise AwsParseTraceHeaderError(
(
"Invalid TraceId in X-Ray trace header: '%s' with value '%s'. Returning INVALID span context.",
TRACE_HEADER_KEY,
trace_header,
)
)

try:
trace_id = AwsXRayFormat._parse_trace_id(value)
except ValueError as ex:
raise AwsParseTraceHeaderError(
(
"Invalid TraceId in X-Ray trace header: '%s' with value '%s'. Returning INVALID span context.",
TRACE_HEADER_KEY,
trace_header,
)
) from ex
elif key == PARENT_ID_KEY:
if not AwsXRayFormat._validate_span_id(value):
raise AwsParseTraceHeaderError(
(
"Invalid ParentId in X-Ray trace header: '%s' with value '%s'. Returning INVALID span context.",
TRACE_HEADER_KEY,
trace_header,
)
)

try:
span_id = AwsXRayFormat._parse_span_id(value)
except ValueError as ex:
raise AwsParseTraceHeaderError(
(
"Invalid TraceId in X-Ray trace header: '%s' with value '%s'. Returning INVALID span context.",
TRACE_HEADER_KEY,
trace_header,
)
) from ex
elif key == SAMPLED_FLAG_KEY:
if not AwsXRayFormat._validate_sampled_flag(value):
raise AwsParseTraceHeaderError(
(
"Invalid Sampling flag in X-Ray trace header: '%s' with value '%s'. Returning INVALID span context.",
TRACE_HEADER_KEY,
trace_header,
)
)

sampled = AwsXRayFormat._parse_sampled_flag(value)

return trace_id, span_id, sampled

@staticmethod
def _validate_trace_id(trace_id_str):
return (
len(trace_id_str) == TRACE_ID_LENGTH
and trace_id_str.startswith(TRACE_ID_VERSION)
and trace_id_str[TRACE_ID_DELIMITER_INDEX_1] == TRACE_ID_DELIMITER
and trace_id_str[TRACE_ID_DELIMITER_INDEX_2] == TRACE_ID_DELIMITER
)

@staticmethod
def _parse_trace_id(trace_id_str):
timestamp_subset = trace_id_str[
TRACE_ID_DELIMITER_INDEX_1 + 1 : TRACE_ID_DELIMITER_INDEX_2
]
unique_id_subset = trace_id_str[
TRACE_ID_DELIMITER_INDEX_2 + 1 : TRACE_ID_LENGTH
]
return int(timestamp_subset + unique_id_subset, 16)

@staticmethod
def _validate_span_id(span_id_str):
return len(span_id_str) == PARENT_ID_LENGTH

@staticmethod
def _parse_span_id(span_id_str):
return int(span_id_str, 16)

@staticmethod
def _validate_sampled_flag(sampled_flag_str):
return len(
sampled_flag_str
) == SAMPLED_FLAG_LENGTH and sampled_flag_str in (
IS_SAMPLED,
NOT_SAMPLED,
)

@staticmethod
def _parse_sampled_flag(sampled_flag_str):
return sampled_flag_str[0] == IS_SAMPLED

def inject(
self,
carrier: CarrierT,
context: typing.Optional[Context] = None,
setter: Setter = default_setter,
) -> None:
span = trace.get_current_span(context=context)

span_context = span.get_span_context()
if not span_context.is_valid:
return

otel_trace_id = f"{span_context.trace_id:032x}"
xray_trace_id = TRACE_ID_DELIMITER.join(
[
TRACE_ID_VERSION,
otel_trace_id[:TRACE_ID_FIRST_PART_LENGTH],
otel_trace_id[TRACE_ID_FIRST_PART_LENGTH:],
]
)

parent_id = f"{span_context.span_id:016x}"

sampling_flag = (
IS_SAMPLED
if span_context.trace_flags & trace.TraceFlags.SAMPLED
else NOT_SAMPLED
)

# TODO: Add OT trace state to the X-Ray trace header

trace_header = KV_PAIR_DELIMITER.join(
[
KEY_AND_VALUE_DELIMITER.join([key, value])
for key, value in [
(TRACE_ID_KEY, xray_trace_id),
(PARENT_ID_KEY, parent_id),
(SAMPLED_FLAG_KEY, sampling_flag),
]
]
)

setter.set(
carrier, TRACE_HEADER_KEY, trace_header,
)

@property
def fields(self):
"""Returns a set with the fields set in `inject`."""

return {TRACE_HEADER_KEY}
__all__ = ["AwsXRayPropagator"]
Loading

0 comments on commit ae20787

Please sign in to comment.