Skip to content

Commit

Permalink
Fix after CR and change the test format
Browse files Browse the repository at this point in the history
  • Loading branch information
shalevr committed Sep 22, 2022
1 parent d70acc2 commit 033c62c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ def _uninstrument(self, **kwargs):
self.patched_handlers = []


def patch_handler_class(self, cls, request_hook=None):
def patch_handler_class(instrumentation, cls, request_hook=None):
if getattr(cls, _OTEL_PATCHED_KEY, False):
return False

setattr(cls, _OTEL_PATCHED_KEY, True)
_wrap(cls, "prepare", partial(_prepare, self, request_hook))
_wrap(cls, "on_finish", partial(_on_finish, self))
_wrap(cls, "log_exception", partial(_log_exception, self))
_wrap(cls, "prepare", partial(_prepare, instrumentation, request_hook))
_wrap(cls, "on_finish", partial(_on_finish, instrumentation))
_wrap(cls, "log_exception", partial(_log_exception, instrumentation))
return True


Expand All @@ -319,28 +319,36 @@ def _wrap(cls, method_name, wrapper):
wrapt.apply_patch(cls, method_name, wrapper)


def _prepare(self, request_hook, func, handler, args, kwargs):
def _prepare(instrumentation, request_hook, func, handler, args, kwargs):
instrumentation.start_time = default_timer()
request = handler.request
if _excluded_urls.url_disabled(request.uri):
return func(*args, **kwargs)
ctx = _start_span(self, handler)

_record_prepare_metrics(instrumentation, handler)

ctx = _start_span(instrumentation.tracer, handler)
if request_hook:
request_hook(ctx.span, handler)
return func(*args, **kwargs)


def _on_finish(self, func, handler, args, kwargs):
def _on_finish(instrumentation, func, handler, args, kwargs):
response = func(*args, **kwargs)
_finish_span(self, handler)

_record_on_finish_metrics(instrumentation, handler)

_finish_span(instrumentation.tracer, handler)

return response


def _log_exception(self, func, handler, args, kwargs):
def _log_exception(instrumentation, func, handler, args, kwargs):
error = None
if len(args) == 3:
error = args[1]

_finish_span(self, handler, error)
_finish_span(instrumentation, handler, error)
return func(*args, **kwargs)


Expand Down Expand Up @@ -406,11 +414,9 @@ def _get_full_handler_name(handler):
return f"{klass.__module__}.{klass.__qualname__}"


def _start_span(self, handler) -> _TraceContext:
start_time = default_timer()

def _start_span(tracer, handler) -> _TraceContext:
span, token = _start_internal_or_server_span(
tracer=self.tracer,
tracer=tracer,
span_name=_get_operation_name(handler, handler.request),
start_time=time_ns(),
context_carrier=handler.request.headers,
Expand All @@ -429,14 +435,6 @@ def _start_span(self, handler) -> _TraceContext:
if len(custom_attributes) > 0:
span.set_attributes(custom_attributes)

metric_attributes = _create_metric_attributes(handler)
request_size = len(handler.request.body)

self.request_size_histogram.record(
request_size, attributes=metric_attributes
)
self.active_requests_histogram.add(1, attributes=metric_attributes)

activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=E1101
ctx = _TraceContext(activation, span, token)
Expand All @@ -449,15 +447,10 @@ def _start_span(self, handler) -> _TraceContext:
if propagator:
propagator.inject(handler, setter=response_propagation_setter)

elapsed_time = round((default_timer() - start_time) * 1000)

self.duration_histogram.record(elapsed_time, attributes=metric_attributes)
self.active_requests_histogram.add(-1, attributes=metric_attributes)

return ctx


def _finish_span(self, handler, error=None):
def _finish_span(tracer, handler, error=None):
status_code = handler.get_status()
reason = getattr(handler, "_reason")
finish_args = (None, None, None)
Expand All @@ -467,7 +460,7 @@ def _finish_span(self, handler, error=None):
if isinstance(error, tornado.web.HTTPError):
status_code = error.status_code
if not ctx and status_code == 404:
ctx = _start_span(self, handler)
ctx = _start_span(tracer, handler)
else:
status_code = 500
reason = None
Expand Down Expand Up @@ -508,13 +501,51 @@ def _finish_span(self, handler, error=None):
delattr(handler, _HANDLER_CONTEXT_KEY)


def _create_metric_attributes(handler):
def _record_prepare_metrics(instrumentation, handler):
request_size = len(handler.request.body)
metric_attributes = _create_metric_attributes(handler)

instrumentation.request_size_histogram.record(
request_size, attributes=metric_attributes
)

active_requests_attributes = _create_active_requests_attributes(
handler.request
)
instrumentation.active_requests_histogram.add(
1, attributes=active_requests_attributes
)


def _record_on_finish_metrics(instrumentation, handler):
elapsed_time = round((default_timer() - instrumentation.start_time) * 1000)

metric_attributes = _create_metric_attributes(handler)
instrumentation.duration_histogram.record(
elapsed_time, attributes=metric_attributes
)

active_requests_attributes = _create_active_requests_attributes(
handler.request
)
instrumentation.active_requests_histogram.add(
-1, attributes=active_requests_attributes
)


def _create_active_requests_attributes(request):
metric_attributes = {
SpanAttributes.HTTP_METHOD: handler.request.method,
SpanAttributes.HTTP_SCHEME: handler.request.protocol,
SpanAttributes.HTTP_STATUS_CODE: handler.get_status(),
SpanAttributes.HTTP_FLAVOR: handler.request.version,
SpanAttributes.HTTP_HOST: handler.request.host,
SpanAttributes.HTTP_METHOD: request.method,
SpanAttributes.HTTP_SCHEME: request.protocol,
SpanAttributes.HTTP_FLAVOR: request.version,
SpanAttributes.HTTP_HOST: request.host,
}

return metric_attributes


def _create_metric_attributes(handler):
metric_attributes = _create_active_requests_attributes(handler.request)
metric_attributes[SpanAttributes.HTTP_STATUS_CODE] = handler.get_status()

return metric_attributes
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,13 @@ def _finish_tracing_callback(
status_code = None
description = None
exc = future.exception()
response = future.result()
if span.is_recording() and exc:
if isinstance(exc, HTTPError):
status_code = exc.code
description = f"{type(exc).__name__}: {exc}"
else:
status_code = future.result().code
status_code = response.code

if status_code is not None:
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
Expand All @@ -114,7 +115,6 @@ def _finish_tracing_callback(
description=description,
)
)
response = future.result()
metric_attributes = _create_metric_attributes(response)
response_size = int(response.headers["Content-Length"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from opentelemetry import trace
from opentelemetry.instrumentation.tornado import TornadoInstrumentor
from opentelemetry.sdk.metrics.export import HistogramDataPoint
from opentelemetry.test.test_base import TestBase

from .tornado_test_app import make_app
Expand All @@ -31,6 +32,32 @@ def get_app(self):
app = make_app(tracer)
return app

def get_sorted_metrics(self):
resource_metrics = (
self.memory_metrics_reader.get_metrics_data().resource_metrics
)
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
all_metrics = [metric for metric in scope_metrics.metrics]
return self.sorted_metrics(all_metrics)

@staticmethod
def sorted_metrics(metrics):
"""
Sorts metrics by metric name.
"""
return sorted(
metrics,
key=lambda m: m.name,
)

def assertMetricHasAttributes(self, metric, expected_attributes):
for data_point in metric.data.data_points:
self.assertDictEqual(
expected_attributes,
dict(data_point.attributes),
)

def setUp(self):
super().setUp()
TornadoInstrumentor().instrument(
Expand All @@ -51,64 +78,60 @@ def test_basic_metrics(self):
response = self.fetch("/")
client_duration_estimated = (default_timer() - start_time) * 1000

expected_attributes = {
"http.status_code": 200,
"http.method": "GET",
"http.flavor": "HTTP/1.1",
"http.scheme": "http",
"http.host": response.request.headers["host"],
}
expected_response_attributes = {
"http.status_code": response.code,
"http.method": "GET",
"http.url": self.get_url("/"),
}
expected_data = {
"http.server.request.size": 0,
"http.server.response.size": int(
response.headers["Content-Length"]
),
}
expected_metrics = [
"http.server.duration",
"http.server.request.size",
"http.server.response.size",
"http.server.active_requests",
]
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 4)
active_request, duration, request_size, response_size = metrics

self.assertEqual(active_request.name, "http.server.active_requests")
self.assertMetricHasAttributes(
active_request,
{
"http.method": "GET",
"http.flavor": "HTTP/1.1",
"http.scheme": "http",
"http.host": response.request.headers["host"],
},
)

resource_metrics = (
self.memory_metrics_reader.get_metrics_data().resource_metrics
self.assertEqual(duration.name, "http.server.duration")
for data_point in duration.data.data_points:
self.assertAlmostEqual(
data_point.sum,
client_duration_estimated,
delta=200,
)
self.assertMetricHasAttributes(
duration,
{
"http.status_code": 201,
"http.method": "GET",
"http.flavor": "HTTP/1.1",
"http.scheme": "http",
"http.host": response.request.headers["host"],
},
)

self.assertEqual(request_size.name, "http.server.request.size")
self.assertMetricHasAttributes(
request_size,
{
"http.status_code": 200,
"http.method": "GET",
"http.flavor": "HTTP/1.1",
"http.scheme": "http",
"http.host": response.request.headers["host"],
},
)

self.assertEqual(response_size.name, "http.server.response.size")
self.assertMetricHasAttributes(
response_size,
{
"http.status_code": response.code,
"http.method": "GET",
"http.url": self.get_url("/"),
},
)
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
self.assertEqual(
len(scope_metrics.metrics), len(expected_metrics)
)
for metric in scope_metrics.metrics:
for data_point in metric.data.data_points:
if metric.name in expected_data:
self.assertEqual(
data_point.sum, expected_data[metric.name]
)

self.assertIn(metric.name, expected_metrics)
if metric.name == "http.server.duration":
self.assertAlmostEqual(
data_point.sum,
client_duration_estimated,
delta=1000,
)

if metric.name == "http.server.response.size":
self.assertDictEqual(
expected_response_attributes,
dict(data_point.attributes),
)
else:
self.assertDictEqual(
expected_attributes,
dict(data_point.attributes),
)

def test_metric_uninstrument(self):
self.fetch("/")
Expand All @@ -119,6 +142,6 @@ def test_metric_uninstrument(self):
for resource_metric in metrics_list.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
if metric.name != "http.server.active_requests":
for point in list(metric.data.data_points):
for point in list(metric.data.data_points):
if isinstance(point, HistogramDataPoint):
self.assertEqual(point.count, 1)

0 comments on commit 033c62c

Please sign in to comment.