diff --git a/CHANGELOG.md b/CHANGELOG.md index 42168a6ce7..b56a453d60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-sdk-extension-aws` Release AWS Python SDK Extension as 1.0.0 ([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667)) +### Added +- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks + ([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670)) + ### Changed - `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument ([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664)) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index f72a4bffa2..eb01ba08cc 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -44,6 +44,41 @@ .. code-block:: python ElasticsearchInstrumentor("my-custom-prefix").instrument() + + +The `instrument` method accepts the following keyword args: + +tracer_provider (TracerProvider) - an optional tracer provider +request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request + this function signature is: + def request_hook(span: Span, method: str, url: str, kwargs) +response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request + this function signature is: + def response_hook(span: Span, response: dict) + +for example: + +.. code: python + + from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor + import elasticsearch + + def request_hook(span, method, url, kwargs): + if span and span.is_recording(): + span.set_attribute("custom_user_attribute_from_request_hook", "some-value") + + def response_hook(span, response): + if span and span.is_recording(): + span.set_attribute("custom_user_attribute_from_response_hook", "some-value") + + # instrument elasticsearch with request and response hooks + ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook) + + # Using elasticsearch as normal now will automatically generate spans, + # including user custom attributes added from the hooks + es = elasticsearch.Elasticsearch() + es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()}) + es.get(index='my-index', doc_type='my-type', id=1) """ from logging import getLogger @@ -97,17 +132,23 @@ def _instrument(self, **kwargs): """ tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + request_hook = kwargs.get("request_hook") + response_hook = kwargs.get("response_hook") _wrap( elasticsearch, "Transport.perform_request", - _wrap_perform_request(tracer, self._span_name_prefix), + _wrap_perform_request( + tracer, self._span_name_prefix, request_hook, response_hook + ), ) def _uninstrument(self, **kwargs): unwrap(elasticsearch.Transport, "perform_request") -def _wrap_perform_request(tracer, span_name_prefix): +def _wrap_perform_request( + tracer, span_name_prefix, request_hook=None, response_hook=None +): # pylint: disable=R0912 def wrapper(wrapped, _, args, kwargs): method = url = None @@ -127,6 +168,10 @@ def wrapper(wrapped, _, args, kwargs): with tracer.start_as_current_span( op_name, kind=SpanKind.CLIENT, ) as span: + + if callable(request_hook): + request_hook(span, method, url, kwargs) + if span.is_recording(): attributes = { SpanAttributes.DB_SYSTEM: "elasticsearch", @@ -150,6 +195,9 @@ def wrapper(wrapped, _, args, kwargs): "elasticsearch.{0}".format(member), str(rv[member]), ) + + if callable(response_hook): + response_hook(span, rv) return rv return wrapper diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index f2eaf509a5..ed6e75e6fc 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import os import threading from ast import literal_eval @@ -316,3 +316,101 @@ def test_dsl_index(self, request_mock): "title": "About searching", }, ) + + def test_request_hook(self, request_mock): + request_hook_method_attribute = "request_hook.method" + request_hook_url_attribute = "request_hook.url" + request_hook_kwargs_attribute = "request_hook.kwargs" + + def request_hook(span, method, url, kwargs): + + attributes = { + request_hook_method_attribute: method, + request_hook_url_attribute: url, + request_hook_kwargs_attribute: json.dumps(kwargs), + } + + if span and span.is_recording(): + span.set_attributes(attributes) + + ElasticsearchInstrumentor().uninstrument() + ElasticsearchInstrumentor().instrument(request_hook=request_hook) + + request_mock.return_value = ( + 1, + {}, + '{"found": false, "timed_out": true, "took": 7}', + ) + es = Elasticsearch() + index = "test-index" + doc_type = "tweet" + doc_id = 1 + kwargs = {"params": {"test": True}} + es.get(index=index, doc_type=doc_type, id=doc_id, **kwargs) + + spans = self.get_finished_spans() + + self.assertEqual(1, len(spans)) + self.assertEqual( + "GET", spans[0].attributes[request_hook_method_attribute] + ) + self.assertEqual( + f"/{index}/{doc_type}/{doc_id}", + spans[0].attributes[request_hook_url_attribute], + ) + self.assertEqual( + json.dumps(kwargs), + spans[0].attributes[request_hook_kwargs_attribute], + ) + + def test_response_hook(self, request_mock): + response_attribute_name = "db.query_result" + + def response_hook(span, response): + if span and span.is_recording(): + span.set_attribute( + response_attribute_name, json.dumps(response) + ) + + ElasticsearchInstrumentor().uninstrument() + ElasticsearchInstrumentor().instrument(response_hook=response_hook) + + response_payload = { + "took": 9, + "timed_out": False, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0, + }, + "hits": { + "total": {"value": 1, "relation": "eq"}, + "max_score": 0.18232156, + "hits": [ + { + "_index": "test-index", + "_type": "tweet", + "_id": "1", + "_score": 0.18232156, + "_source": {"name": "tester"}, + } + ], + }, + } + + request_mock.return_value = ( + 1, + {}, + json.dumps(response_payload), + ) + es = Elasticsearch() + es.get(index="test-index", doc_type="tweet", id=1) + + spans = self.get_finished_spans() + + self.assertEqual(1, len(spans)) + self.assertEqual( + json.dumps(response_payload), + spans[0].attributes[response_attribute_name], + )