Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and Improvements to botocore instrumentation #150

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@
from wrapt import wrap_function_wrapper

from opentelemetry.instrumentation.boto.version import __version__
from opentelemetry.instrumentation.botocore import add_span_arg_tags, unwrap
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Resource
from opentelemetry.trace import SpanKind, get_tracer

logger = logging.getLogger(__name__)

SERVICE_PARAMS_BLOCK_LIST = {"s3": ["params.Body"]}


def _get_instance_region_name(instance):
region = getattr(instance, "region", None)
Expand Down Expand Up @@ -201,3 +203,50 @@ def _patched_auth_request(self, original_func, instance, args, kwargs):
args,
kwargs,
)


def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# NOTE: This should probably be in `opentelemetry.instrumentation.utils`.
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)


def add_span_arg_tags(span, aws_service, args, args_names, args_traced):
def truncate_arg_value(value, max_len=1024):
"""Truncate values which are bytes and greater than `max_len`.
NathanielRN marked this conversation as resolved.
Show resolved Hide resolved
Useful for parameters like "Body" in `put_object` operations.
"""
if isinstance(value, bytes) and len(value) > max_len:
return b"..."

return value

if not span.is_recording():
return

# Do not trace `Key Management Service` or `Secure Token Service` API calls
# over concerns of security leaks.
if aws_service not in {"kms", "sts"}:
tags = dict(
(name, value)
for (name, value) in zip(args_names, args)
if name in args_traced
)
tags = flatten_dict(tags)

for param_key, value in tags.items():
if param_key in SERVICE_PARAMS_BLOCK_LIST.get(aws_service, {}):
continue

span.set_attribute(param_key, truncate_arg_value(value))
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
([#181](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/181))
- Make botocore instrumentation check if instrumentation has been suppressed
([#182](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/182))
- Botocore SpanKind as CLIENT and modify existing traced attributes
([#150])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/150)

## Version 0.13b0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@
import logging

from botocore.client import BaseClient
from botocore.exceptions import ClientError, ParamValidationError
from wrapt import ObjectProxy, wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry import propagators
from opentelemetry.instrumentation.botocore.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Resource
from opentelemetry.trace import SpanKind, get_tracer

Expand All @@ -70,15 +72,13 @@ def _patched_endpoint_prepare_request(wrapped, instance, args, kwargs):


class BotocoreInstrumentor(BaseInstrumentor):
"""A instrumentor for Botocore
"""An instrumentor for Botocore.

See `BaseInstrumentor`
"""

def _instrument(self, **kwargs):

# FIXME should the tracer provider be accessed via Configuration
# instead?
# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
Expand All @@ -99,137 +99,66 @@ def _instrument(self, **kwargs):
def _uninstrument(self, **kwargs):
unwrap(BaseClient, "_make_api_call")

# pylint: disable=too-many-branches
def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value("suppress_instrumentation"):
return original_func(*args, **kwargs)

endpoint_name = deep_getattr(instance, "_endpoint._endpoint_prefix")
# pylint: disable=protected-access
service_name = instance._service_model.service_name
NathanielRN marked this conversation as resolved.
Show resolved Hide resolved
operation_name, api_params = args

error = None
result = None

with self._tracer.start_as_current_span(
"{}.command".format(endpoint_name), kind=SpanKind.CONSUMER,
"{}".format(service_name), kind=SpanKind.CLIENT,
) as span:

operation = None
if args and span.is_recording():
operation = args[0]
span.resource = Resource(
attributes={
"endpoint": endpoint_name,
"operation": operation.lower(),
}
)

else:
span.resource = Resource(
attributes={"endpoint": endpoint_name}
)

add_span_arg_tags(
span,
endpoint_name,
args,
("action", "params", "path", "verb"),
{"params", "path", "verb"},
)

if span.is_recording():
region_name = deep_getattr(instance, "meta.region_name")

meta = {
"aws.agent": "botocore",
"aws.operation": operation,
"aws.region": region_name,
}
for key, value in meta.items():
span.set_attribute(key, value)

result = original_func(*args, **kwargs)
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute(
"aws.table_name", api_params["TableName"]
)

try:
result = original_func(*args, **kwargs)
except ClientError as ex:
error = ex

if error:
result = error.response

if span.is_recording():
span.set_attribute(
"http.status_code",
result["ResponseMetadata"]["HTTPStatusCode"],
)
span.set_attribute(
"retry_attempts",
result["ResponseMetadata"]["RetryAttempts"],
)
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
"http.status_code", metadata["HTTPStatusCode"],
)

if error:
raise error

return result


def unwrap(obj, attr):
function = getattr(obj, attr, None)
if (
function
and isinstance(function, ObjectProxy)
and hasattr(function, "__wrapped__")
):
setattr(obj, attr, function.__wrapped__)


def add_span_arg_tags(span, endpoint_name, args, args_names, args_traced):
def truncate_arg_value(value, max_len=1024):
"""Truncate values which are bytes and greater than `max_len`.
Useful for parameters like "Body" in `put_object` operations.
"""
if isinstance(value, bytes) and len(value) > max_len:
return b"..."

return value

def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)

if not span.is_recording():
return

if endpoint_name not in {"kms", "sts"}:
tags = dict(
(name, value)
for (name, value) in zip(args_names, args)
if name in args_traced
)
tags = flatten_dict(tags)
for key, value in {
k: truncate_arg_value(v)
for k, v in tags.items()
if k not in {"s3": ["params.Body"]}.get(endpoint_name, [])
}.items():
span.set_attribute(key, value)


def deep_getattr(obj, attr_string, default=None):
"""
Returns the attribute of ``obj`` at the dotted path given by
``attr_string``, if no such attribute is reachable, returns ``default``.

>>> deep_getattr(cass, "cluster")
<cassandra.cluster.Cluster object at 0xa20c350

>>> deep_getattr(cass, "cluster.metadata.partitioner")
u"org.apache.cassandra.dht.Murmur3Partitioner"

>>> deep_getattr(cass, "i.dont.exist", default="default")
"default"
"""
attrs = attr_string.split(".")
for attr in attrs:
try:
obj = getattr(obj, attr)
except AttributeError:
return default

return obj
Loading