Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix gRPC URI Detection #1137

Merged
merged 3 commits into from
May 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 45 additions & 78 deletions newrelic/hooks/framework_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,30 @@
import time

from newrelic.api.external_trace import ExternalTrace
from newrelic.api.web_transaction import WebTransactionWrapper
from newrelic.api.transaction import current_transaction
from newrelic.api.time_trace import notice_error
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.api.transaction import current_transaction
from newrelic.api.web_transaction import WebTransactionWrapper
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import wrap_function_wrapper


def _get_uri_method(instance, *args, **kwargs):
target = instance._channel.target().decode('utf-8')
method = instance._method.decode('utf-8').lstrip('/')
uri = 'grpc://%s/%s' % (target, method)
target = instance._channel.target().decode("utf-8").lstrip("dns:///")
method = instance._method.decode("utf-8").lstrip("/")
uri = "grpc://%s/%s" % (target, method)
return (uri, method)


def _prepare_request(
transaction, guid, request,
timeout=None, metadata=None, *args, **kwargs):
def _prepare_request(transaction, guid, request, timeout=None, metadata=None, *args, **kwargs):
metadata = metadata and list(metadata) or []
dt_metadata = transaction._create_distributed_trace_data_with_guid(guid)
metadata.extend(
transaction._generate_distributed_trace_headers(dt_metadata)
)
metadata.extend(transaction._generate_distributed_trace_headers(dt_metadata))
args = (request, timeout, metadata) + args
return args, kwargs


def _prepare_request_stream(
transaction, guid, request_iterator, *args, **kwargs):
return _prepare_request(
transaction, guid, request_iterator, *args, **kwargs)
def _prepare_request_stream(transaction, guid, request_iterator, *args, **kwargs):
return _prepare_request(transaction, guid, request_iterator, *args, **kwargs)


def wrap_call(module, object_path, prepare):
Expand All @@ -56,7 +50,7 @@ def _call_wrapper(wrapped, instance, args, kwargs):
return wrapped(*args, **kwargs)

uri, method = _get_uri_method(instance)
with ExternalTrace('gRPC', uri, method, source=wrapped):
with ExternalTrace("gRPC", uri, method, source=wrapped):
args, kwargs = prepare(transaction, None, *args, **kwargs)
return wrapped(*args, **kwargs)

Expand All @@ -70,13 +64,13 @@ def _future_wrapper(wrapped, instance, args, kwargs):
if transaction is None:
return wrapped(*args, **kwargs)

guid = '%016x' % random.getrandbits(64)
guid = "%016x" % random.getrandbits(64)
uri, method = _get_uri_method(instance)

args, kwargs = prepare(transaction, guid, *args, **kwargs)
future = wrapped(*args, **kwargs)
future._nr_guid = guid
future._nr_args = {"library": 'gRPC', "url": uri, "method": method, "source": wrapped}
future._nr_args = {"library": "gRPC", "url": uri, "method": method, "source": wrapped}
future._nr_start_time = time.time()

# In non-streaming responses, result is typically called instead of
Expand All @@ -89,16 +83,16 @@ def _future_wrapper(wrapped, instance, args, kwargs):


def wrap_next(_wrapped, _instance, _args, _kwargs):
_nr_args = getattr(_instance, '_nr_args', None)
_nr_args = getattr(_instance, "_nr_args", None)
if not _nr_args:
return _wrapped(*_args, **_kwargs)

try:
return _wrapped(*_args, **_kwargs)
except Exception:
delattr(_instance, '_nr_args')
_nr_start_time = getattr(_instance, '_nr_start_time', 0.0)
_nr_guid = getattr(_instance, '_nr_guid', None)
delattr(_instance, "_nr_args")
_nr_start_time = getattr(_instance, "_nr_start_time", 0.0)
_nr_guid = getattr(_instance, "_nr_guid", None)

with ExternalTrace(**_nr_args) as t:
t.start_time = _nr_start_time or t.start_time
Expand All @@ -107,12 +101,12 @@ def wrap_next(_wrapped, _instance, _args, _kwargs):


def wrap_result(_wrapped, _instance, _args, _kwargs):
_nr_args = getattr(_instance, '_nr_args', None)
_nr_args = getattr(_instance, "_nr_args", None)
if not _nr_args:
return _wrapped(*_args, **_kwargs)
delattr(_instance, '_nr_args')
_nr_start_time = getattr(_instance, '_nr_start_time', 0.0)
_nr_guid = getattr(_instance, '_nr_guid', None)
delattr(_instance, "_nr_args")
_nr_start_time = getattr(_instance, "_nr_start_time", 0.0)
_nr_guid = getattr(_instance, "_nr_guid", None)

try:
result = _wrapped(*_args, **_kwargs)
Expand All @@ -136,31 +130,22 @@ def grpc_web_transaction(wrapped, instance, args, kwargs):
rpc_event, behavior = _bind_transaction_args(*args, **kwargs)
behavior_name = callable_name(behavior)

call_details = (
getattr(rpc_event, 'call_details', None) or
getattr(rpc_event, 'request_call_details', None))
call_details = getattr(rpc_event, "call_details", None) or getattr(rpc_event, "request_call_details", None)

metadata = (
getattr(rpc_event, 'invocation_metadata', None) or
getattr(rpc_event, 'request_metadata', None))
metadata = getattr(rpc_event, "invocation_metadata", None) or getattr(rpc_event, "request_metadata", None)

host = port = None
if call_details:
try:
host, port = call_details.host.split(b':', 1)
host, port = call_details.host.split(b":", 1)
except Exception:
pass

request_path = call_details.method

return WebTransactionWrapper(
wrapped,
name=behavior_name,
request_path=request_path,
host=host,
port=port,
headers=metadata,
source=behavior)(*args, **kwargs)
wrapped, name=behavior_name, request_path=request_path, host=host, port=port, headers=metadata, source=behavior
)(*args, **kwargs)


def _trailing_metadata(state, *args, **kwargs):
Expand All @@ -185,44 +170,26 @@ def _nr_wrap_abort(wrapped, instance, args, kwargs):


def instrument_grpc__channel(module):
wrap_call(module, '_UnaryUnaryMultiCallable.__call__',
_prepare_request)
wrap_call(module, '_UnaryUnaryMultiCallable.with_call',
_prepare_request)
wrap_future(module, '_UnaryUnaryMultiCallable.future',
_prepare_request)
wrap_future(module, '_UnaryStreamMultiCallable.__call__',
_prepare_request)
wrap_call(module, '_StreamUnaryMultiCallable.__call__',
_prepare_request_stream)
wrap_call(module, '_StreamUnaryMultiCallable.with_call',
_prepare_request_stream)
wrap_future(module, '_StreamUnaryMultiCallable.future',
_prepare_request_stream)
wrap_future(module, '_StreamStreamMultiCallable.__call__',
_prepare_request_stream)
if hasattr(module, '_MultiThreadedRendezvous'):
wrap_function_wrapper(module, '_MultiThreadedRendezvous.result',
wrap_result)
wrap_function_wrapper(module, '_MultiThreadedRendezvous._next',
wrap_next)
wrap_call(module, "_UnaryUnaryMultiCallable.__call__", _prepare_request)
wrap_call(module, "_UnaryUnaryMultiCallable.with_call", _prepare_request)
wrap_future(module, "_UnaryUnaryMultiCallable.future", _prepare_request)
wrap_future(module, "_UnaryStreamMultiCallable.__call__", _prepare_request)
wrap_call(module, "_StreamUnaryMultiCallable.__call__", _prepare_request_stream)
wrap_call(module, "_StreamUnaryMultiCallable.with_call", _prepare_request_stream)
wrap_future(module, "_StreamUnaryMultiCallable.future", _prepare_request_stream)
wrap_future(module, "_StreamStreamMultiCallable.__call__", _prepare_request_stream)
if hasattr(module, "_MultiThreadedRendezvous"):
wrap_function_wrapper(module, "_MultiThreadedRendezvous.result", wrap_result)
wrap_function_wrapper(module, "_MultiThreadedRendezvous._next", wrap_next)
else:
wrap_function_wrapper(module, '_Rendezvous.result',
wrap_result)
wrap_function_wrapper(module, '_Rendezvous._next',
wrap_next)
wrap_function_wrapper(module, '_Rendezvous.cancel',
wrap_result)
wrap_function_wrapper(module, "_Rendezvous.result", wrap_result)
wrap_function_wrapper(module, "_Rendezvous._next", wrap_next)
wrap_function_wrapper(module, "_Rendezvous.cancel", wrap_result)


def instrument_grpc_server(module):
wrap_function_wrapper(module, '_unary_response_in_pool',
grpc_web_transaction)
wrap_function_wrapper(module, '_stream_response_in_pool',
grpc_web_transaction)
wrap_function_wrapper(module, '_completion_code',
_nr_wrap_status_code)
wrap_function_wrapper(module, '_abortion_code',
_nr_wrap_status_code)
wrap_function_wrapper(module, '_abort',
_nr_wrap_abort)
wrap_function_wrapper(module, "_unary_response_in_pool", grpc_web_transaction)
wrap_function_wrapper(module, "_stream_response_in_pool", grpc_web_transaction)
wrap_function_wrapper(module, "_completion_code", _nr_wrap_status_code)
wrap_function_wrapper(module, "_abortion_code", _nr_wrap_status_code)
wrap_function_wrapper(module, "_abort", _nr_wrap_abort)
Loading