Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add elasticsearch db.statement sanitization #1598

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598))
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))
- Add metric instrumentation for urllib
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

The instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
sanitize_query (bool) - an optional query sanitization flag
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is:
def request_hook(span: Span, method: str, url: str, kwargs)
Expand Down Expand Up @@ -96,6 +97,8 @@ def response_hook(span, response):
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer

from .utils import sanitize_body

logger = getLogger(__name__)


Expand Down Expand Up @@ -135,11 +138,16 @@ def _instrument(self, **kwargs):
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
sanitize_query = kwargs.get("sanitize_query", False)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the default be True? See also open-telemetry/opentelemetry-specification#3104

_wrap(
elasticsearch,
"Transport.perform_request",
_wrap_perform_request(
tracer, self._span_name_prefix, request_hook, response_hook
tracer,
sanitize_query,
self._span_name_prefix,
request_hook,
response_hook,
),
)

Expand All @@ -154,7 +162,11 @@ def _uninstrument(self, **kwargs):


def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
tracer,
sanitize_query,
span_name_prefix,
request_hook=None,
response_hook=None,
):
# pylint: disable=R0912,R0914
def wrapper(wrapped, _, args, kwargs):
Expand Down Expand Up @@ -213,7 +225,10 @@ def wrapper(wrapped, _, args, kwargs):
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
statement = str(body)
if sanitize_query:
statement = sanitize_body(body)
attributes[SpanAttributes.DB_STATEMENT] = statement
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

sanitized_keys = (
"message",
"should",
"filter",
"query",
"queries",
"intervals",
"match",
)
sanitized_value = "?"


# pylint: disable=C0103
def _flatten_dict(d, parent_key=""):
items = []
for k, v in d.items():
new_key = parent_key + "." + k if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key).items())
else:
items.append((new_key, v))
return dict(items)


def _unflatten_dict(d):
res = {}
for k, v in d.items():
keys = k.split(".")
d = res
for key in keys[:-1]:
if key not in d:
d[key] = {}
d = d[key]
d[keys[-1]] = v
return res


def sanitize_body(body) -> str:
flatten_body = _flatten_dict(body)

for key in flatten_body:
if key.endswith(sanitized_keys):
flatten_body[key] = sanitized_value

return str(_unflatten_dict(flatten_body))
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
interval_query = {
"query": {
"intervals": {
"my_text": {
"all_of": {
"ordered": True,
"intervals": [
{
"match": {
"query": "my favorite food",
"max_gaps": 0,
"ordered": True,
}
},
{
"any_of": {
"intervals": [
{"match": {"query": "hot water"}},
{"match": {"query": "cold porridge"}},
]
}
},
],
}
}
}
}
}

match_query = {"query": {"match": {"message": {"query": "this is a test"}}}}

filter_query = {
"query": {
"bool": {
"must": [
{"match": {"title": "Search"}},
{"match": {"content": "Elasticsearch"}},
],
"filter": [
{"term": {"status": "published"}},
{"range": {"publish_date": {"gte": "2015-01-01"}}},
],
}
}
}

interval_query_sanitized = {
"query": {
"intervals": {
"my_text": {"all_of": {"ordered": True, "intervals": "?"}}
}
}
}
match_query_sanitized = {"query": {"match": {"message": {"query": "?"}}}}
filter_query_sanitized = {
"query": {
"bool": {
"must": [
{"match": {"title": "Search"}},
{"match": {"content": "Elasticsearch"}},
],
"filter": "?",
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import threading
Expand All @@ -27,10 +28,13 @@
from opentelemetry.instrumentation.elasticsearch import (
ElasticsearchInstrumentor,
)
from opentelemetry.instrumentation.elasticsearch.utils import sanitize_body
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import StatusCode

from . import sanitization_queries # pylint: disable=no-name-in-module

major_version = elasticsearch.VERSION[0]

if major_version == 7:
Expand All @@ -42,14 +46,29 @@
else:
from . import helpers_es2 as helpers # pylint: disable=no-name-in-module


Article = helpers.Article


@mock.patch(
"elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request"
)
class TestElasticsearchIntegration(TestBase):
search_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index/_search",
"elasticsearch.method": helpers.dsl_search_method,
"elasticsearch.target": "test-index",
SpanAttributes.DB_STATEMENT: str(
{"query": {"bool": {"filter": [{"term": {"author": "testing"}}]}}}
),
}

create_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index",
"elasticsearch.method": "HEAD",
}

def setUp(self):
super().setUp()
self.tracer = self.tracer_provider.get_tracer(__name__)
Expand Down Expand Up @@ -241,21 +260,36 @@ def test_dsl_search(self, request_mock):
self.assertIsNotNone(span.end_time)
self.assertEqual(
span.attributes,
self.search_attributes,
)

def test_dsl_search_sanitized(self, request_mock):
# Reset instrumentation to use sanitized query (default)
ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(sanitize_query=True)

# update expected attributes to match sanitized query
sanitized_search_attributes = self.search_attributes.copy()
sanitized_search_attributes.update(
{
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index/_search",
"elasticsearch.method": helpers.dsl_search_method,
"elasticsearch.target": "test-index",
SpanAttributes.DB_STATEMENT: str(
{
"query": {
"bool": {
"filter": [{"term": {"author": "testing"}}]
}
}
}
),
},
SpanAttributes.DB_STATEMENT: "{'query': {'bool': {'filter': '?'}}}"
}
)

request_mock.return_value = (1, {}, '{"hits": {"hits": []}}')
client = Elasticsearch()
search = Search(using=client, index="test-index").filter(
"term", author="testing"
)
search.execute()
spans = self.get_finished_spans()
span = spans[0]
self.assertEqual(1, len(spans))
self.assertEqual(span.name, "Elasticsearch/<target>/_search")
self.assertIsNotNone(span.end_time)
self.assertEqual(
span.attributes,
sanitized_search_attributes,
)

def test_dsl_create(self, request_mock):
Expand All @@ -264,17 +298,14 @@ def test_dsl_create(self, request_mock):
Article.init(using=client)

spans = self.get_finished_spans()
assert spans
self.assertEqual(2, len(spans))
span1 = spans.by_attr(key="elasticsearch.method", value="HEAD")
span2 = spans.by_attr(key="elasticsearch.method", value="PUT")

self.assertEqual(
span1.attributes,
{
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index",
"elasticsearch.method": "HEAD",
},
self.create_attributes,
)

attributes = {
Expand All @@ -288,6 +319,25 @@ def test_dsl_create(self, request_mock):
helpers.dsl_create_statement,
)

def test_dsl_create_sanitized(self, request_mock):
# Reset instrumentation to explicitly use sanitized query
ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(sanitize_query=True)
request_mock.return_value = (1, {}, {})
client = Elasticsearch()
Article.init(using=client)

spans = self.get_finished_spans()
assert spans

self.assertEqual(2, len(spans))
span = spans.by_attr(key="elasticsearch.method", value="HEAD")

self.assertEqual(
span.attributes,
self.create_attributes,
)

def test_dsl_index(self, request_mock):
request_mock.return_value = helpers.dsl_index_result

Expand Down Expand Up @@ -412,3 +462,17 @@ def response_hook(span, response):
json.dumps(response_payload),
spans[0].attributes[response_attribute_name],
)

def test_body_sanitization(self, _):
self.assertEqual(
sanitize_body(sanitization_queries.interval_query),
str(sanitization_queries.interval_query_sanitized),
)
self.assertEqual(
sanitize_body(sanitization_queries.match_query),
str(sanitization_queries.match_query_sanitized),
)
self.assertEqual(
sanitize_body(sanitization_queries.filter_query),
str(sanitization_queries.filter_query_sanitized),
)