Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add traceresponse headers for asgi apps (FastAPI, Starlette) #817

Merged
merged 2 commits into from
Dec 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.8.0-0.27b0...HEAD)

### Added

- `opentelemetry-instrumentation-asgi` now returns a `traceresponse` response header.
([#817](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/817))

### Fixed

- `opentelemetry-instrumentation-flask` Flask: Conditionally create SERVER spans
Expand All @@ -28,6 +33,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [1.7.1-0.26b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.7.0-0.26b0) - 2021-11-11

### Added
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't included in the last release but seemed like the right format


- `opentelemetry-instrumentation-aws-lambda` Add instrumentation for AWS Lambda Service - pkg metadata files (Part 1/2)
([#739](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/739))
- Add support for Python 3.10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,14 @@ def client_response_hook(span: Span, message: dict):

from opentelemetry import context, trace
from opentelemetry.instrumentation.asgi.version import __version__ # noqa
from opentelemetry.instrumentation.propagators import (
get_global_response_propagator,
)
from opentelemetry.instrumentation.utils import http_status_to_status_code
from opentelemetry.propagate import extract
from opentelemetry.propagators.textmap import Getter
from opentelemetry.propagators.textmap import Getter, Setter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import remove_url_credentials

Expand Down Expand Up @@ -152,6 +155,30 @@ def keys(self, carrier: dict) -> typing.List[str]:
asgi_getter = ASGIGetter()


class ASGISetter(Setter):
def set(
self, carrier: dict, key: str, value: str
) -> None: # pylint: disable=no-self-use
"""Sets response header values on an ASGI scope according to `the spec <https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event>`_.

Args:
carrier: ASGI scope object
key: response header name to set
value: response header value
Returns:
None
"""
headers = carrier.get("headers")
if not headers:
headers = []
carrier["headers"] = headers

headers.append([key.lower().encode(), value.encode()])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the Access-Control-Expose-Headers usually is cased, but the docs explicitly say lowercase.

Copy link
Contributor

@lzchen lzchen Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a stupid question but why do we encode the key and values in the header? Is this because asgi requires them to be byte strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lzchen good question, it's non-obvious. From the spec:

Keys:
type (Unicode string) – "http.response.start".
status (int) – HTTP status code.
headers (Iterable[[byte string, byte string]]) – An iterable of [name, value] two-item iterables, where name is the > header name, and value is the header value. Order must be preserved in the HTTP response. Header names must be lowercased. Optional; if missing defaults to an empty list. Pseudo headers (present in HTTP/2 and HTTP/3) must not be present.

Important parts:

  • headers (Iterable[[byte string, byte string]]) - spec says they should be encoded
  • Header names must be lowercased



asgi_setter = ASGISetter()


def collect_request_attributes(scope):
"""Collects HTTP request attributes from the ASGI scope and returns a
dictionary to be used as span creation attributes."""
Expand Down Expand Up @@ -295,54 +322,84 @@ async def __call__(self, scope, receive, send):
return await self.app(scope, receive, send)

token = context.attach(extract(scope, getter=asgi_getter))
span_name, additional_attributes = self.default_span_details(scope)
server_span_name, additional_attributes = self.default_span_details(
scope
)

try:
with self.tracer.start_as_current_span(
span_name,
server_span_name,
kind=trace.SpanKind.SERVER,
) as span:
if span.is_recording():
) as server_span:
if server_span.is_recording():
attributes = collect_request_attributes(scope)
attributes.update(additional_attributes)
for key, value in attributes.items():
span.set_attribute(key, value)
server_span.set_attribute(key, value)

if callable(self.server_request_hook):
self.server_request_hook(span, scope)

@wraps(receive)
async def wrapped_receive():
with self.tracer.start_as_current_span(
" ".join((span_name, scope["type"], "receive"))
) as receive_span:
if callable(self.client_request_hook):
self.client_request_hook(receive_span, scope)
message = await receive()
if receive_span.is_recording():
if message["type"] == "websocket.receive":
set_status_code(receive_span, 200)
receive_span.set_attribute("type", message["type"])
return message

@wraps(send)
async def wrapped_send(message):
with self.tracer.start_as_current_span(
" ".join((span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, message)
if send_span.is_recording():
if message["type"] == "http.response.start":
status_code = message["status"]
set_status_code(span, status_code)
set_status_code(send_span, status_code)
elif message["type"] == "websocket.send":
set_status_code(span, 200)
set_status_code(send_span, 200)
send_span.set_attribute("type", message["type"])
await send(message)

await self.app(scope, wrapped_receive, wrapped_send)
self.server_request_hook(server_span, scope)

otel_receive = self._get_otel_receive(
server_span_name, scope, receive
)

otel_send = self._get_otel_send(
server_span,
server_span_name,
scope,
send,
)

await self.app(scope, otel_receive, otel_send)
finally:
context.detach(token)

def _get_otel_receive(self, server_span_name, scope, receive):
@wraps(receive)
async def otel_receive():
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "receive"))
) as receive_span:
if callable(self.client_request_hook):
self.client_request_hook(receive_span, scope)
message = await receive()
if receive_span.is_recording():
if message["type"] == "websocket.receive":
set_status_code(receive_span, 200)
receive_span.set_attribute("type", message["type"])
return message

return otel_receive

def _get_otel_send(self, server_span, server_span_name, scope, send):
@wraps(send)
async def otel_send(message):
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, message)
if send_span.is_recording():
if message["type"] == "http.response.start":
status_code = message["status"]
set_status_code(server_span, status_code)
set_status_code(send_span, status_code)
elif message["type"] == "websocket.send":
set_status_code(server_span, 200)
set_status_code(send_span, 200)
send_span.set_attribute("type", message["type"])

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
Comment on lines +397 to +399
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is a bit obtuse but I'm sure there is a good extensibility reason for it. What I really wanted to do was:

propagator.inject(
    message,
    span=server_span,
    setter=asgi_setter,
)

But for now I will consider this yak shaved!

setter=asgi_setter,
)

await send(message)

return otel_send
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import opentelemetry.instrumentation.asgi as otel_asgi
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.propagators import (
TraceResponsePropagator,
get_global_response_propagator,
set_global_response_propagator,
)
from opentelemetry.sdk import resources
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.asgitestutil import (
AsgiTestBase,
setup_testing_defaults,
)
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import format_span_id, format_trace_id


async def http_app(scope, receive, send):
Expand Down Expand Up @@ -287,6 +293,38 @@ def update_expected_user_agent(expected):
outputs = self.get_all_output()
self.validate_outputs(outputs, modifiers=[update_expected_user_agent])

def test_traceresponse_header(self):
"""Test a traceresponse header is sent when a global propagator is set."""

orig = get_global_response_propagator()
set_global_response_propagator(TraceResponsePropagator())

app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)
self.seed_app(app)
self.send_default_request()

span = self.memory_exporter.get_finished_spans()[-1]
self.assertEqual(trace_api.SpanKind.SERVER, span.kind)

response_start, response_body, *_ = self.get_all_output()
self.assertEqual(response_body["body"], b"*")
self.assertEqual(response_start["status"], 200)

traceresponse = "00-{0}-{1}-01".format(
format_trace_id(span.get_span_context().trace_id),
format_span_id(span.get_span_context().span_id),
)
self.assertListEqual(
response_start["headers"],
[
[b"Content-Type", b"text/plain"],
[b"traceresponse", f"{traceresponse}".encode()],
[b"access-control-expose-headers", b"traceresponse"],
],
)

set_global_response_propagator(orig)

def test_websocket(self):
self.scope = {
"type": "websocket",
Expand Down Expand Up @@ -359,6 +397,46 @@ def test_websocket(self):
self.assertEqual(span.kind, expected["kind"])
self.assertDictEqual(dict(span.attributes), expected["attributes"])

def test_websocket_traceresponse_header(self):
"""Test a traceresponse header is set for websocket messages"""

orig = get_global_response_propagator()
set_global_response_propagator(TraceResponsePropagator())

self.scope = {
"type": "websocket",
"http_version": "1.1",
"scheme": "ws",
"path": "/",
"query_string": b"",
"headers": [],
"client": ("127.0.0.1", 32767),
"server": ("127.0.0.1", 80),
}
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)
self.seed_app(app)
self.send_input({"type": "websocket.connect"})
self.send_input({"type": "websocket.receive", "text": "ping"})
self.send_input({"type": "websocket.disconnect"})
_, socket_send, *_ = self.get_all_output()

span = self.memory_exporter.get_finished_spans()[-1]
self.assertEqual(trace_api.SpanKind.SERVER, span.kind)

traceresponse = "00-{0}-{1}-01".format(
format_trace_id(span.get_span_context().trace_id),
format_span_id(span.get_span_context().span_id),
)
self.assertListEqual(
socket_send["headers"],
[
[b"traceresponse", f"{traceresponse}".encode()],
[b"access-control-expose-headers", b"traceresponse"],
],
)

set_global_response_propagator(orig)

def test_lifespan(self):
self.scope["type"] = "lifespan"
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)
Expand Down