diff --git a/CHANGELOG.md b/CHANGELOG.md index 9def2f55eb..5ccc90d0c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,12 +6,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.3.0-0.22b0...HEAD) -- `opentelemetry-sdk-extension-aws` Update AWS entry points to match spec - ([#566](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/566)) -- Include Flask 2.0 as compatible with existing flask instrumentation - ([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545)) -- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk` - ([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558)) ### Changed - `opentelemetry-instrumentation-tornado` properly instrument work done in tornado on_finish method. @@ -36,6 +30,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#567](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/567)) - `opentelemetry-instrumentation-grpc` Fixed asynchonous unary call traces ([#536](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/536)) +- `opentelemetry-sdk-extension-aws` Update AWS entry points to match spec + ([#566](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/566)) +- Include Flask 2.0 as compatible with existing flask instrumentation + ([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545)) +- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk` + ([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558)) +- Change `opentelemetry-instrumentation-httpx` to replace `client` classes with instrumented versions. + ([#577](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/577)) ### Added - `opentelemetry-instrumentation-httpx` Add `httpx` instrumentation diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index fa3d29faf2..90d077027c 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import typing import httpx @@ -31,6 +32,8 @@ from opentelemetry.trace.span import Span from opentelemetry.trace.status import Status +_logger = logging.getLogger(__name__) + URL = typing.Tuple[bytes, bytes, typing.Optional[int], bytes] Headers = typing.List[typing.Tuple[bytes, bytes]] RequestHook = typing.Callable[[Span, "RequestInfo"], None] @@ -258,98 +261,48 @@ async def handle_async_request( return status_code, headers, stream, extensions -def _instrument( - tracer_provider: TracerProvider = None, - request_hook: typing.Optional[RequestHook] = None, - response_hook: typing.Optional[ResponseHook] = None, -) -> None: - """Enables tracing of all Client and AsyncClient instances - - When a Client or AsyncClient gets created, a telemetry transport is passed - in to the instance. - """ - # pylint:disable=unused-argument - def instrumented_sync_send(wrapped, instance, args, kwargs): - if context.get_value("suppress_instrumentation"): - return wrapped(*args, **kwargs) +class _InstrumentedClient(httpx.Client): - transport = instance._transport or httpx.HTTPTransport() - telemetry_transport = SyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) + _tracer_provider = None + _request_hook = None + _response_hook = None - instance._transport = telemetry_transport - return wrapped(*args, **kwargs) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) - async def instrumented_async_send(wrapped, instance, args, kwargs): - if context.get_value("suppress_instrumentation"): - return await wrapped(*args, **kwargs) + self._original_transport = self._transport + self._is_instrumented_by_opentelemetry = True - transport = instance._transport or httpx.AsyncHTTPTransport() - telemetry_transport = AsyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, + self._transport = SyncOpenTelemetryTransport( + self._transport, + tracer_provider=_InstrumentedClient._tracer_provider, + request_hook=_InstrumentedClient._request_hook, + response_hook=_InstrumentedClient._response_hook, ) - instance._transport = telemetry_transport - return await wrapped(*args, **kwargs) - wrapt.wrap_function_wrapper(httpx.Client, "send", instrumented_sync_send) +class _InstrumentedAsyncClient(httpx.AsyncClient): - wrapt.wrap_function_wrapper( - httpx.AsyncClient, "send", instrumented_async_send - ) + _tracer_provider = None + _request_hook = None + _response_hook = None + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) -def _instrument_client( - client: typing.Union[httpx.Client, httpx.AsyncClient], - tracer_provider: TracerProvider = None, - request_hook: typing.Optional[RequestHook] = None, - response_hook: typing.Optional[ResponseHook] = None, -) -> None: - """Enables instrumentation for the given Client or AsyncClient""" - # pylint: disable=protected-access - if isinstance(client, httpx.Client): - transport = client._transport or httpx.HTTPTransport() - telemetry_transport = SyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) - elif isinstance(client, httpx.AsyncClient): - transport = client._transport or httpx.AsyncHTTPTransport() - telemetry_transport = AsyncOpenTelemetryTransport( - transport, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) - else: - raise TypeError("Invalid client provided") - client._transport = telemetry_transport + self._original_transport = self._transport + self._is_instrumented_by_opentelemetry = True - -def _uninstrument() -> None: - """Disables instrumenting for all newly created Client and AsyncClient instances""" - unwrap(httpx.Client, "send") - unwrap(httpx.AsyncClient, "send") - - -def _uninstrument_client( - client: typing.Union[httpx.Client, httpx.AsyncClient] -) -> None: - """Disables instrumentation for the given Client or AsyncClient""" - # pylint: disable=protected-access - unwrap(client, "send") + self._transport = AsyncOpenTelemetryTransport( + self._transport, + tracer_provider=_InstrumentedAsyncClient._tracer_provider, + request_hook=_InstrumentedAsyncClient._request_hook, + response_hook=_InstrumentedAsyncClient._response_hook, + ) class HTTPXClientInstrumentor(BaseInstrumentor): + # pylint: disable=protected-access,attribute-defined-outside-init """An instrumentor for httpx Client and AsyncClient See `BaseInstrumentor` @@ -369,14 +322,31 @@ def _instrument(self, **kwargs): ``response_hook``: A hook that receives the span, request, and response that is called right before the span ends """ - _instrument( - tracer_provider=kwargs.get("tracer_provider"), - request_hook=kwargs.get("request_hook"), - response_hook=kwargs.get("response_hook"), - ) + self._original_client = httpx.Client + self._original_async_client = httpx.AsyncClient + request_hook = kwargs.get("request_hook") + response_hook = kwargs.get("response_hook") + if callable(request_hook): + _InstrumentedClient._request_hook = request_hook + _InstrumentedAsyncClient._request_hook = request_hook + if callable(response_hook): + _InstrumentedClient._response_hook = response_hook + _InstrumentedAsyncClient._response_hook = response_hook + tracer_provider = kwargs.get("tracer_provider") + _InstrumentedClient._tracer_provider = tracer_provider + _InstrumentedAsyncClient._tracer_provider = tracer_provider + httpx.Client = _InstrumentedClient + httpx.AsyncClient = _InstrumentedAsyncClient def _uninstrument(self, **kwargs): - _uninstrument() + httpx.Client = self._original_client + httpx.AsyncClient = self._original_async_client + _InstrumentedClient._tracer_provider = None + _InstrumentedClient._request_hook = None + _InstrumentedClient._response_hook = None + _InstrumentedAsyncClient._tracer_provider = None + _InstrumentedAsyncClient._request_hook = None + _InstrumentedAsyncClient._response_hook = None @staticmethod def instrument_client( @@ -395,12 +365,34 @@ def instrument_client( response_hook: A hook that receives the span, request, and response that is called right before the span ends """ - _instrument_client( - client, - tracer_provider=tracer_provider, - request_hook=request_hook, - response_hook=response_hook, - ) + # pylint: disable=protected-access + if not hasattr(client, "_is_instrumented_by_opentelemetry"): + client._is_instrumented_by_opentelemetry = False + + if not client._is_instrumented_by_opentelemetry: + if isinstance(client, httpx.Client): + client._original_transport = client._transport + transport = client._transport or httpx.HTTPTransport() + client._transport = SyncOpenTelemetryTransport( + transport, + tracer_provider=tracer_provider, + request_hook=request_hook, + response_hook=response_hook, + ) + client._is_instrumented_by_opentelemetry = True + if isinstance(client, httpx.AsyncClient): + transport = client._transport or httpx.AsyncHTTPTransport() + client._transport = AsyncOpenTelemetryTransport( + transport, + tracer_provider=tracer_provider, + request_hook=request_hook, + response_hook=response_hook, + ) + client._is_instrumented_by_opentelemetry = True + else: + _logger.warning( + "Attempting to instrument Httpx client while already instrumented" + ) @staticmethod def uninstrument_client( @@ -411,4 +403,12 @@ def uninstrument_client( Args: client: The httpx Client or AsyncClient instance """ - _uninstrument_client(client) + if hasattr(client, "_original_transport"): + client._transport = client._original_transport + del client._original_transport + client._is_instrumented_by_opentelemetry = False + else: + _logger.warning( + "Attempting to uninstrument Httpx " + "client while already uninstrumented" + ) diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index e6d8427341..c092c46626 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -157,6 +157,11 @@ def test_basic(self): span, opentelemetry.instrumentation.httpx ) + def test_basic_multiple(self): + self.perform_request(self.URL) + self.perform_request(self.URL) + self.assert_span(num_spans=2) + def test_not_foundbasic(self): url_404 = "http://httpbin.org/status/404" @@ -375,12 +380,9 @@ def create_client( pass def setUp(self): - self.client = self.create_client() - HTTPXClientInstrumentor().instrument() super().setUp() - - def tearDown(self): - super().tearDown() + HTTPXClientInstrumentor().instrument() + self.client = self.create_client() HTTPXClientInstrumentor().uninstrument() def test_custom_tracer_provider(self): @@ -388,7 +390,6 @@ def test_custom_tracer_provider(self): result = self.create_tracer_provider(resource=resource) tracer_provider, exporter = result - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=tracer_provider ) @@ -398,9 +399,9 @@ def test_custom_tracer_provider(self): self.assertEqual(result.text, "Hello!") span = self.assert_span(exporter=exporter) self.assertIs(span.resource, resource) + HTTPXClientInstrumentor().uninstrument() def test_response_hook(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, response_hook=self.response_hook, @@ -419,9 +420,9 @@ def test_response_hook(self): HTTP_RESPONSE_BODY: "Hello!", }, ) + HTTPXClientInstrumentor().uninstrument() def test_request_hook(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, request_hook=self.request_hook, @@ -432,9 +433,9 @@ def test_request_hook(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "GET" + self.URL) + HTTPXClientInstrumentor().uninstrument() def test_request_hook_no_span_update(self): - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=self.tracer_provider, request_hook=self.no_update_request_hook, @@ -445,10 +446,10 @@ def test_request_hook_no_span_update(self): self.assertEqual(result.text, "Hello!") span = self.assert_span() self.assertEqual(span.name, "HTTP GET") + HTTPXClientInstrumentor().uninstrument() def test_not_recording(self): with mock.patch("opentelemetry.trace.INVALID_SPAN") as mock_span: - HTTPXClientInstrumentor().uninstrument() HTTPXClientInstrumentor().instrument( tracer_provider=trace._DefaultTracerProvider() ) @@ -463,8 +464,10 @@ def test_not_recording(self): self.assertTrue(mock_span.is_recording.called) self.assertFalse(mock_span.set_attribute.called) self.assertFalse(mock_span.set_status.called) + HTTPXClientInstrumentor().uninstrument() def test_suppress_instrumentation_new_client(self): + HTTPXClientInstrumentor().instrument() token = context.attach( context.set_value("suppress_instrumentation", True) ) @@ -476,32 +479,22 @@ def test_suppress_instrumentation_new_client(self): context.detach(token) self.assert_span(num_spans=0) - - def test_existing_client(self): HTTPXClientInstrumentor().uninstrument() - client = self.create_client() - HTTPXClientInstrumentor().instrument() - result = self.perform_request(self.URL, client=client) - self.assertEqual(result.text, "Hello!") - self.assert_span(num_spans=1) def test_instrument_client(self): - HTTPXClientInstrumentor().uninstrument() client = self.create_client() HTTPXClientInstrumentor().instrument_client(client) result = self.perform_request(self.URL, client=client) self.assertEqual(result.text, "Hello!") self.assert_span(num_spans=1) - # instrument again to avoid annoying warning message - HTTPXClientInstrumentor().instrument() def test_uninstrument(self): + HTTPXClientInstrumentor().instrument() HTTPXClientInstrumentor().uninstrument() - result = self.perform_request(self.URL) + client = self.create_client() + result = self.perform_request(self.URL, client=client) self.assertEqual(result.text, "Hello!") self.assert_span(num_spans=0) - # instrument again to avoid annoying warning message - HTTPXClientInstrumentor().instrument() def test_uninstrument_client(self): HTTPXClientInstrumentor().uninstrument_client(self.client) @@ -512,6 +505,7 @@ def test_uninstrument_client(self): self.assert_span(num_spans=0) def test_uninstrument_new_client(self): + HTTPXClientInstrumentor().instrument() client1 = self.create_client() HTTPXClientInstrumentor().uninstrument_client(client1)