Skip to content

Commit

Permalink
Fixes and improvements to botocore instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanielRN committed Nov 14, 2020
1 parent 7513d7b commit 9b90b01
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
from wrapt import wrap_function_wrapper

from opentelemetry.instrumentation.boto.version import __version__
from opentelemetry.instrumentation.botocore import add_span_arg_tags, unwrap
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Resource
from opentelemetry.trace import SpanKind, get_tracer

Expand Down Expand Up @@ -204,3 +204,46 @@ def _patched_auth_request(self, original_func, instance, args, kwargs):
args,
kwargs,
)


def add_span_arg_tags(span, endpoint_name, args, args_names, args_traced):
def truncate_arg_value(value, max_len=1024):
"""Truncate values which are bytes and greater than `max_len`.
Useful for parameters like "Body" in `put_object` operations.
"""
if isinstance(value, bytes) and len(value) > max_len:
return b"..."

return value

def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)

if not span.is_recording():
return

if endpoint_name not in {"kms", "sts"}:
tags = dict(
(name, value)
for (name, value) in zip(args_names, args)
if name in args_traced
)
tags = flatten_dict(tags)
for key, value in {
k: truncate_arg_value(v)
for k, v in tags.items()
if k not in {"s3": ["params.Body"]}.get(endpoint_name, [])
}.items():
span.set_attribute(key, value)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
([#181](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/181))
- Make botocore instrumentation check if instrumentation has been suppressed
([#182](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/182))
- Botocore SpanKind as CLIENT and modify existing traced attributes
([#150])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/150)

## Version 0.13b0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@
import logging

from botocore.client import BaseClient
from botocore.exceptions import ClientError, ParamValidationError
from wrapt import ObjectProxy, wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry import propagators
from opentelemetry.instrumentation.botocore.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Resource
from opentelemetry.trace import SpanKind, get_tracer

Expand All @@ -75,15 +77,13 @@ def _patched_endpoint_prepare_request(wrapped, instance, args, kwargs):


class BotocoreInstrumentor(BaseInstrumentor):
"""A instrumentor for Botocore
"""An instrumentor for Botocore.
See `BaseInstrumentor`
"""

def _instrument(self, **kwargs):

# FIXME should the tracer provider be accessed via Configuration
# instead?
# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
Expand All @@ -104,137 +104,66 @@ def _instrument(self, **kwargs):
def _uninstrument(self, **kwargs):
unwrap(BaseClient, "_make_api_call")

# pylint: disable=too-many-branches
def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value("suppress_instrumentation"):
return original_func(*args, **kwargs)

endpoint_name = deep_getattr(instance, "_endpoint._endpoint_prefix")
# pylint: disable=protected-access
service_name = instance._service_model.service_name
operation_name, api_params = args

error = None
result = None

with self._tracer.start_as_current_span(
"{}.command".format(endpoint_name), kind=SpanKind.CONSUMER,
"{}".format(service_name), kind=SpanKind.CLIENT,
) as span:

operation = None
if args and span.is_recording():
operation = args[0]
span.resource = Resource(
attributes={
"endpoint": endpoint_name,
"operation": operation.lower(),
}
)

else:
span.resource = Resource(
attributes={"endpoint": endpoint_name}
)

add_span_arg_tags(
span,
endpoint_name,
args,
("action", "params", "path", "verb"),
{"params", "path", "verb"},
)

if span.is_recording():
region_name = deep_getattr(instance, "meta.region_name")

meta = {
"aws.agent": "botocore",
"aws.operation": operation,
"aws.region": region_name,
}
for key, value in meta.items():
span.set_attribute(key, value)

result = original_func(*args, **kwargs)
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute(
"aws.table_name", api_params["TableName"]
)

try:
result = original_func(*args, **kwargs)
except ClientError as ex:
error = ex

if error:
result = error.response

if span.is_recording():
span.set_attribute(
"http.status_code",
result["ResponseMetadata"]["HTTPStatusCode"],
)
span.set_attribute(
"retry_attempts",
result["ResponseMetadata"]["RetryAttempts"],
)
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
"http.status_code", metadata["HTTPStatusCode"],
)

if error:
raise error

return result


def unwrap(obj, attr):
function = getattr(obj, attr, None)
if (
function
and isinstance(function, ObjectProxy)
and hasattr(function, "__wrapped__")
):
setattr(obj, attr, function.__wrapped__)


def add_span_arg_tags(span, endpoint_name, args, args_names, args_traced):
def truncate_arg_value(value, max_len=1024):
"""Truncate values which are bytes and greater than `max_len`.
Useful for parameters like "Body" in `put_object` operations.
"""
if isinstance(value, bytes) and len(value) > max_len:
return b"..."

return value

def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)

if not span.is_recording():
return

if endpoint_name not in {"kms", "sts"}:
tags = dict(
(name, value)
for (name, value) in zip(args_names, args)
if name in args_traced
)
tags = flatten_dict(tags)
for key, value in {
k: truncate_arg_value(v)
for k, v in tags.items()
if k not in {"s3": ["params.Body"]}.get(endpoint_name, [])
}.items():
span.set_attribute(key, value)


def deep_getattr(obj, attr_string, default=None):
"""
Returns the attribute of ``obj`` at the dotted path given by
``attr_string``, if no such attribute is reachable, returns ``default``.
>>> deep_getattr(cass, "cluster")
<cassandra.cluster.Cluster object at 0xa20c350
>>> deep_getattr(cass, "cluster.metadata.partitioner")
u"org.apache.cassandra.dht.Murmur3Partitioner"
>>> deep_getattr(cass, "i.dont.exist", default="default")
"default"
"""
attrs = attr_string.split(".")
for attr in attrs:
try:
obj = getattr(obj, attr)
except AttributeError:
return default

return obj
Loading

0 comments on commit 9b90b01

Please sign in to comment.