From 39b54526ae493b382bab662b3eedc9828f45dfb4 Mon Sep 17 00:00:00 2001 From: Itay Gibel Date: Thu, 9 Sep 2021 15:33:35 +0300 Subject: [PATCH 1/5] adding response_hook to elastic instrumentation --- CHANGELOG.md | 3 ++ .../instrumentation/elasticsearch/__init__.py | 24 +++++---- .../tests/test_elasticsearch.py | 54 ++++++++++++++++++- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e20fa06661..4211af37c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-sdk-extension-aws` Release AWS Python SDK Extension as 1.0.0 ([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667)) +### Added +- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` callback + ### Changed - `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument ([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664)) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index f72a4bffa2..9c0c268a92 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -97,17 +97,20 @@ def _instrument(self, **kwargs): """ tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + response_hook = kwargs.get("response_hook", _default_response_hook) _wrap( elasticsearch, "Transport.perform_request", - _wrap_perform_request(tracer, self._span_name_prefix), + _wrap_perform_request( + tracer, self._span_name_prefix, response_hook + ), ) def _uninstrument(self, **kwargs): unwrap(elasticsearch.Transport, "perform_request") -def _wrap_perform_request(tracer, span_name_prefix): +def _wrap_perform_request(tracer, span_name_prefix, response_hook): # pylint: disable=R0912 def wrapper(wrapped, _, args, kwargs): method = url = None @@ -143,13 +146,16 @@ def wrapper(wrapped, _, args, kwargs): span.set_attribute(key, value) rv = wrapped(*args, **kwargs) - if isinstance(rv, dict) and span.is_recording(): - for member in _ATTRIBUTES_FROM_RESULT: - if member in rv: - span.set_attribute( - "elasticsearch.{0}".format(member), - str(rv[member]), - ) + response_hook(span, rv) return rv return wrapper + + +def _default_response_hook(span, response): + if isinstance(response, dict) and span.is_recording(): + for member in _ATTRIBUTES_FROM_RESULT: + if member in response: + span.set_attribute( + "elasticsearch.{0}".format(member), str(response[member]), + ) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index f2eaf509a5..aa70d7b73f 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import os import threading from ast import literal_eval @@ -316,3 +316,55 @@ def test_dsl_index(self, request_mock): "title": "About searching", }, ) + + def test_response_hook(self, request_mock): + response_attribute_name = "db.query_result" + + def response_hook(span, response): + if span and span.is_recording(): + span.set_attribute( + response_attribute_name, json.dumps(response) + ) + + ElasticsearchInstrumentor().uninstrument() + ElasticsearchInstrumentor().instrument(response_hook=response_hook) + + response_payload = { + "took": 9, + "timed_out": False, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0, + }, + "hits": { + "total": {"value": 1, "relation": "eq"}, + "max_score": 0.18232156, + "hits": [ + { + "_index": "test-index", + "_type": "tweet", + "_id": "1", + "_score": 0.18232156, + "_source": {"name": "tester"}, + } + ], + }, + } + + request_mock.return_value = ( + 1, + {}, + json.dumps(response_payload), + ) + es = Elasticsearch() + es.get(index="test-index", doc_type="tweet", id=1) + + spans = self.get_finished_spans() + + self.assertEqual(1, len(spans)) + self.assertEqual( + json.dumps(response_payload), + spans[0].attributes[response_attribute_name], + ) From a40c9a9852f7da9382ac62b51c15a3585f000885 Mon Sep 17 00:00:00 2001 From: Itay Gibel Date: Thu, 9 Sep 2021 15:53:09 +0300 Subject: [PATCH 2/5] add PR link to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4211af37c7..3efd7ed4d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-elasticsearch` Added `response_hook` callback + ([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670)) ### Changed - `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument From 0eff6fe3662e62eed9695aca46e163b2ecb1db3c Mon Sep 17 00:00:00 2001 From: Itay Gibel Date: Mon, 13 Sep 2021 12:57:21 +0300 Subject: [PATCH 3/5] revert default_response_hook --- .../instrumentation/elasticsearch/__init__.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index 9c0c268a92..be2f196e98 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -97,7 +97,7 @@ def _instrument(self, **kwargs): """ tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) - response_hook = kwargs.get("response_hook", _default_response_hook) + response_hook = kwargs.get("response_hook") _wrap( elasticsearch, "Transport.perform_request", @@ -110,7 +110,7 @@ def _uninstrument(self, **kwargs): unwrap(elasticsearch.Transport, "perform_request") -def _wrap_perform_request(tracer, span_name_prefix, response_hook): +def _wrap_perform_request(tracer, span_name_prefix, response_hook=None): # pylint: disable=R0912 def wrapper(wrapped, _, args, kwargs): method = url = None @@ -146,16 +146,16 @@ def wrapper(wrapped, _, args, kwargs): span.set_attribute(key, value) rv = wrapped(*args, **kwargs) - response_hook(span, rv) + if isinstance(rv, dict) and span.is_recording(): + for member in _ATTRIBUTES_FROM_RESULT: + if member in rv: + span.set_attribute( + "elasticsearch.{0}".format(member), + str(rv[member]), + ) + + if callable(response_hook): + response_hook(span, rv) return rv return wrapper - - -def _default_response_hook(span, response): - if isinstance(response, dict) and span.is_recording(): - for member in _ATTRIBUTES_FROM_RESULT: - if member in response: - span.set_attribute( - "elasticsearch.{0}".format(member), str(response[member]), - ) From 891eda95412bcea1afb39a3dcb6871ebcfa069dd Mon Sep 17 00:00:00 2001 From: Itay Gibel Date: Mon, 13 Sep 2021 13:36:49 +0300 Subject: [PATCH 4/5] adding request hook --- CHANGELOG.md | 2 +- .../instrumentation/elasticsearch/__init__.py | 11 ++++- .../tests/test_elasticsearch.py | 46 +++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3efd7ed4d9..f0f2e5c22e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667)) ### Added -- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` callback +- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks ([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670)) ### Changed diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index be2f196e98..62dacf94d1 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -97,12 +97,13 @@ def _instrument(self, **kwargs): """ tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + request_hook = kwargs.get("request_hook") response_hook = kwargs.get("response_hook") _wrap( elasticsearch, "Transport.perform_request", _wrap_perform_request( - tracer, self._span_name_prefix, response_hook + tracer, self._span_name_prefix, request_hook, response_hook ), ) @@ -110,7 +111,9 @@ def _uninstrument(self, **kwargs): unwrap(elasticsearch.Transport, "perform_request") -def _wrap_perform_request(tracer, span_name_prefix, response_hook=None): +def _wrap_perform_request( + tracer, span_name_prefix, request_hook=None, response_hook=None +): # pylint: disable=R0912 def wrapper(wrapped, _, args, kwargs): method = url = None @@ -130,6 +133,10 @@ def wrapper(wrapped, _, args, kwargs): with tracer.start_as_current_span( op_name, kind=SpanKind.CLIENT, ) as span: + + if callable(request_hook): + request_hook(span, method, url, kwargs) + if span.is_recording(): attributes = { SpanAttributes.DB_SYSTEM: "elasticsearch", diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index aa70d7b73f..ed6e75e6fc 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -317,6 +317,52 @@ def test_dsl_index(self, request_mock): }, ) + def test_request_hook(self, request_mock): + request_hook_method_attribute = "request_hook.method" + request_hook_url_attribute = "request_hook.url" + request_hook_kwargs_attribute = "request_hook.kwargs" + + def request_hook(span, method, url, kwargs): + + attributes = { + request_hook_method_attribute: method, + request_hook_url_attribute: url, + request_hook_kwargs_attribute: json.dumps(kwargs), + } + + if span and span.is_recording(): + span.set_attributes(attributes) + + ElasticsearchInstrumentor().uninstrument() + ElasticsearchInstrumentor().instrument(request_hook=request_hook) + + request_mock.return_value = ( + 1, + {}, + '{"found": false, "timed_out": true, "took": 7}', + ) + es = Elasticsearch() + index = "test-index" + doc_type = "tweet" + doc_id = 1 + kwargs = {"params": {"test": True}} + es.get(index=index, doc_type=doc_type, id=doc_id, **kwargs) + + spans = self.get_finished_spans() + + self.assertEqual(1, len(spans)) + self.assertEqual( + "GET", spans[0].attributes[request_hook_method_attribute] + ) + self.assertEqual( + f"/{index}/{doc_type}/{doc_id}", + spans[0].attributes[request_hook_url_attribute], + ) + self.assertEqual( + json.dumps(kwargs), + spans[0].attributes[request_hook_kwargs_attribute], + ) + def test_response_hook(self, request_mock): response_attribute_name = "db.query_result" From d0f8bace76bb64e27c107080c1631345d909dbcf Mon Sep 17 00:00:00 2001 From: Itay Gibel Date: Tue, 14 Sep 2021 09:50:30 +0300 Subject: [PATCH 5/5] updating the usage docs --- .../instrumentation/elasticsearch/__init__.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index 62dacf94d1..eb01ba08cc 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -44,6 +44,41 @@ .. code-block:: python ElasticsearchInstrumentor("my-custom-prefix").instrument() + + +The `instrument` method accepts the following keyword args: + +tracer_provider (TracerProvider) - an optional tracer provider +request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request + this function signature is: + def request_hook(span: Span, method: str, url: str, kwargs) +response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request + this function signature is: + def response_hook(span: Span, response: dict) + +for example: + +.. code: python + + from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor + import elasticsearch + + def request_hook(span, method, url, kwargs): + if span and span.is_recording(): + span.set_attribute("custom_user_attribute_from_request_hook", "some-value") + + def response_hook(span, response): + if span and span.is_recording(): + span.set_attribute("custom_user_attribute_from_response_hook", "some-value") + + # instrument elasticsearch with request and response hooks + ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook) + + # Using elasticsearch as normal now will automatically generate spans, + # including user custom attributes added from the hooks + es = elasticsearch.Elasticsearch() + es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()}) + es.get(index='my-index', doc_type='my-type', id=1) """ from logging import getLogger