Skip to content

Commit

Permalink
Vendor the aiohttp instrumentation code and add it to the aiohttp mid…
Browse files Browse the repository at this point in the history
…dleware list

Closes #3829
  • Loading branch information
decko committed May 16, 2023
1 parent e8bd313 commit 375a081
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES/3829.doc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adds the Telemetry section to the docs with information about OpenTelemetry and its usage on Pulp.
1 change: 1 addition & 0 deletions CHANGES/3829.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implemented OpenTelemetry support and enabled tracing and metrics for the pulp-content app.
16 changes: 16 additions & 0 deletions docs/components.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,19 @@ An example payload:
}],
"postgresqlVersion": 90200
}
.. _telemetry:

Telemetry Support
-----------------

New Pulp installations can obtain telemetry data, like the number of requests, active connections and latency response for both `pulp-api` and `pulp-content` using OpenTelemetry. You can read more about `OpenTelemetry here <https://opentelemetry.io>`_.

To enable it you will need to set the following environment variables:

* ``PULP_OTEL_ENABLED`` set to ``True``.
* ``OTEL_EXPORTER_OTLP_ENDPOINT`` set to the address of your OpenTelemetry Collector instance ex. ``http://otel-collector:4318``.
* ``OTEL_EXPORTER_OTLP_PROTOCOL`` set to ``http/protobuf``.

You will need to run an instance of OpenTelemetry Collector. You can read more about the `OpenTelemetry Collector here <https://opentelemetry.io/docs/collector/>`_.
5 changes: 4 additions & 1 deletion pulpcore/content/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@

from .handler import Handler # noqa: E402: module level not at top of file
from .authentication import authenticate # noqa: E402: module level not at top of file
from .instrumentation import ( # noqa: E402: module level not at top of file
middleware as instrumentation,
)


log = logging.getLogger(__name__)

app = web.Application(middlewares=[authenticate])
app = web.Application(middlewares=[authenticate, instrumentation])

CONTENT_MODULE_NAME = "content"

Expand Down
247 changes: 247 additions & 0 deletions pulpcore/content/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# This code is based on the original PR which could be found
# here. https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1800
# The idea is to remove this module when the PR or other alternative
# gets merged into opentelemetry-python-contrib

import urllib
from aiohttp import web
from multidict import CIMultiDictProxy
from timeit import default_timer

from opentelemetry import context, trace, metrics
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import http_status_to_status_code
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import get_excluded_urls
from opentelemetry.util.http import remove_url_credentials

from typing import Tuple


_SUPPRESS_HTTP_INSTRUMENTATION_KEY = "suppress_http_instrumentation"

_duration_attrs = [
SpanAttributes.HTTP_METHOD,
SpanAttributes.HTTP_HOST,
SpanAttributes.HTTP_SCHEME,
SpanAttributes.HTTP_STATUS_CODE,
SpanAttributes.HTTP_FLAVOR,
SpanAttributes.HTTP_SERVER_NAME,
SpanAttributes.NET_HOST_NAME,
SpanAttributes.NET_HOST_PORT,
SpanAttributes.HTTP_ROUTE,
]

_active_requests_count_attrs = [
SpanAttributes.HTTP_METHOD,
SpanAttributes.HTTP_HOST,
SpanAttributes.HTTP_SCHEME,
SpanAttributes.HTTP_FLAVOR,
SpanAttributes.HTTP_SERVER_NAME,
]

tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
_excluded_urls = get_excluded_urls("AIOHTTP_SERVER")


def _parse_duration_attrs(req_attrs):
duration_attrs = {}
for attr_key in _duration_attrs:
if req_attrs.get(attr_key) is not None:
duration_attrs[attr_key] = req_attrs[attr_key]
return duration_attrs


def _parse_active_request_count_attrs(req_attrs):
active_requests_count_attrs = {}
for attr_key in _active_requests_count_attrs:
if req_attrs.get(attr_key) is not None:
active_requests_count_attrs[attr_key] = req_attrs[attr_key]
return active_requests_count_attrs


def get_default_span_details(request: web.Request) -> Tuple[str, dict]:
"""Default implementation for get_default_span_details
Args:
scope: the asgi scope dictionary
Returns:
a tuple of the span name, and any attributes to attach to the span.
"""
span_name = request.path.strip() or f"HTTP {request.method}"
return span_name, {}


def _get_view_func(request) -> str:
"""TODO: is this useful??"""
try:
return request.match_info.handler.__name__
except AttributeError:
return "unknown"


def collect_request_attributes(request: web.Request):
"""Collects HTTP request attributes from the ASGI scope and returns a
dictionary to be used as span creation attributes."""

server_host, port, http_url = (
request.url.host,
request.url.port,
str(request.url),
)
query_string = request.query_string
if query_string and http_url:
if isinstance(query_string, bytes):
query_string = query_string.decode("utf8")
http_url += "?" + urllib.parse.unquote(query_string)

result = {
SpanAttributes.HTTP_SCHEME: request.scheme,
SpanAttributes.HTTP_HOST: server_host,
SpanAttributes.NET_HOST_PORT: port,
SpanAttributes.HTTP_ROUTE: _get_view_func(request),
SpanAttributes.HTTP_FLAVOR: f"{request.version.major}.{request.version.minor}",
SpanAttributes.HTTP_TARGET: request.path,
SpanAttributes.HTTP_URL: remove_url_credentials(http_url),
}

http_method = request.method
if http_method:
result[SpanAttributes.HTTP_METHOD] = http_method

http_host_value_list = [request.host] if type(request.host) != list else request.host
if http_host_value_list:
result[SpanAttributes.HTTP_SERVER_NAME] = ",".join(http_host_value_list)
http_user_agent = request.headers.get("user-agent")
if http_user_agent:
result[SpanAttributes.HTTP_USER_AGENT] = http_user_agent

# remove None values
result = {k: v for k, v in result.items() if v is not None}

return result


def set_status_code(span, status_code):
"""Adds HTTP response attributes to span using the status_code argument."""
if not span.is_recording():
return
try:
status_code = int(status_code)
except ValueError:
span.set_status(
Status(
StatusCode.ERROR,
"Non-integer HTTP status: " + repr(status_code),
)
)
else:
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
span.set_status(Status(http_status_to_status_code(status_code, server_span=True)))


class AiohttpGetter(Getter):
"""Extract current trace from headers"""

def get(self, carrier, key: str):
"""Getter implementation to retrieve a HTTP header value from the ASGI
scope.
Args:
carrier: ASGI scope object
key: header name in scope
Returns:
A list with a single string with the header value if it exists,
else None.
"""
headers: CIMultiDictProxy = carrier.headers
if not headers:
return None
return headers.getall(key, None)

def keys(self, carrier: dict):
return list(carrier.keys())


getter = AiohttpGetter()


@web.middleware
async def middleware(request, handler):
"""Middleware for aiohttp implementing tracing logic"""
if (
context.get_value("suppress_instrumentation")
or context.get_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY)
or _excluded_urls.url_disabled(request.url.path)
):
return await handler(request)

span_name, additional_attributes = get_default_span_details(request)

req_attrs = collect_request_attributes(request)
duration_attrs = _parse_duration_attrs(req_attrs)
active_requests_count_attrs = _parse_active_request_count_attrs(req_attrs)

duration_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_DURATION,
unit="ms",
description="measures the duration of the inbound HTTP request",
)

active_requests_counter = meter.create_up_down_counter(
name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS,
unit="requests",
description="measures the number of concurrent HTTP requests those are currently in flight",
)

with tracer.start_as_current_span(
span_name,
kind=trace.SpanKind.SERVER,
) as span:
attributes = collect_request_attributes(request)
attributes.update(additional_attributes)
span.set_attributes(attributes)
start = default_timer()
active_requests_counter.add(1, active_requests_count_attrs)
try:
resp = await handler(request)
set_status_code(span, resp.status)
except web.HTTPException as ex:
set_status_code(span, ex.status_code)
raise
finally:
duration = max(round((default_timer() - start) * 1000), 0)
duration_histogram.record(duration, duration_attrs)
active_requests_counter.add(-1, active_requests_count_attrs)
return resp


class _InstrumentedApplication(web.Application):
"""Insert tracing middleware"""

def __init__(self, *args, **kwargs):
middlewares = kwargs.pop("middlewares", [])
middlewares.insert(0, middleware)
kwargs["middlewares"] = middlewares
super().__init__(*args, **kwargs)


class AioHttpServerInstrumentor(BaseInstrumentor):
# pylint: disable=protected-access,attribute-defined-outside-init
"""An instrumentor for aiohttp.web.Application
See `BaseInstrumentor`
"""

def _instrument(self, **kwargs):
self._original_app = web.Application
setattr(web, "Application", _InstrumentedApplication)

def _uninstrument(self, **kwargs):
setattr(web, "Application", self._original_app)

def instrumentation_dependencies(self):
return self._instruments

0 comments on commit 375a081

Please sign in to comment.