Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

psycopg2 and dbapi instrumentaiton fixes #246

Merged
merged 2 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ disable=missing-docstring,
protected-access, # temp-pylint-upgrade
super-init-not-called, # temp-pylint-upgrade
invalid-overridden-method, # temp-pylint-upgrade
missing-module-docstring, # temp-pylint-upgrad, # temp-pylint-upgradee
missing-module-docstring, # temp-pylint-upgrade

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
- `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
([#283](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/283))
- `opentelemetry-instrumentation-sqlalchemy` Use SQL operation and DB name as span name.
([#254](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/254))
- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer`
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types.
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query.
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))

## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from aiopg.utils import _ContextManager, _PoolAcquireContextManager

from opentelemetry.instrumentation.dbapi import (
CursorTracer,
DatabaseApiIntegration,
TracedCursor,
)
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
Expand Down Expand Up @@ -94,25 +94,29 @@ async def _acquire(self):
return TracedPoolProxy(pool, *args, **kwargs)


class AsyncTracedCursor(TracedCursor):
class AsyncCursorTracer(CursorTracer):
async def traced_execution(
self,
cursor,
query_method: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
):
name = ""
if len(args) > 0 and args[0]:
name = args[0]
elif self._db_api_integration.database:
name = self._db_api_integration.database
else:
name = self._db_api_integration.name
if args:
name = self.get_operation_name(cursor, args)

if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)

with self._db_api_integration.get_tracer().start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, *args)
self._populate_span(span, cursor, *args)
try:
result = await query_method(*args, **kwargs)
return result
Expand All @@ -123,31 +127,31 @@ async def traced_execution(


def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = AsyncTracedCursor(db_api_integration)
_traced_cursor = AsyncCursorTracer(db_api_integration)

# pylint: disable=abstract-method
class AsyncTracedCursorProxy(AsyncProxyObject):
class AsyncCursorTracerProxy(AsyncProxyObject):

# pylint: disable=unused-argument
def __init__(self, cursor, *args, **kwargs):
super().__init__(cursor)

async def execute(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
self, self.__wrapped__.execute, *args, **kwargs
)
return result

async def executemany(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
self, self.__wrapped__.executemany, *args, **kwargs
)
return result

async def callproc(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
self, self.__wrapped__.callproc, *args, **kwargs
)
return result

return AsyncTracedCursorProxy(cursor, *args, **kwargs)
return AsyncCursorTracerProxy(cursor, *args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def test_span_succeeded(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)

self.assertEqual(span.attributes["component"], "testcomponent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def trace_integration(
connection_attributes: typing.Dict = None,
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Expand All @@ -86,6 +87,7 @@ def trace_integration(
version=__version__,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
db_api_integration_factory=db_api_integration_factory,
)


Expand All @@ -99,6 +101,7 @@ def wrap_connect(
version: str = "",
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Expand All @@ -115,6 +118,9 @@ def wrap_connect(
capture_parameters: Configure if db.statement.parameters should be captured.

"""
db_api_integration_factory = (
db_api_integration_factory or DatabaseApiIntegration
)

# pylint: disable=unused-argument
def wrap_connect_(
Expand All @@ -123,7 +129,7 @@ def wrap_connect_(
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
db_integration = DatabaseApiIntegration(
db_integration = db_api_integration_factory(
name,
database_component,
database_type=database_type,
Expand Down Expand Up @@ -314,16 +320,19 @@ def __exit__(self, *args, **kwargs):
return TracedConnectionProxy(connection, *args, **kwargs)


class TracedCursor:
class CursorTracer:
def __init__(self, db_api_integration: DatabaseApiIntegration):
self._db_api_integration = db_api_integration

def _populate_span(
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
self,
span: trace_api.Span,
cursor,
*args: typing.Tuple[typing.Any, typing.Any]
):
if not span.is_recording():
return
statement = args[0] if args else ""
statement = self.get_statement(cursor, args)
span.set_attribute(
"component", self._db_api_integration.database_component
)
Expand All @@ -342,24 +351,38 @@ def _populate_span(
if self._db_api_integration.capture_parameters and len(args) > 1:
span.set_attribute("db.statement.parameters", str(args[1]))

def get_operation_name(self, cursor, args): # pylint: disable=no-self-use
if args and isinstance(args[0], str):
return args[0].split()[0]
return ""
owais marked this conversation as resolved.
Show resolved Hide resolved

def get_statement(self, cursor, args): # pylint: disable=no-self-use
if not args:
return ""
statement = args[0]
if isinstance(statement, bytes):
return statement.decode("utf8", "replace")
return statement

def traced_execution(
self,
cursor,
query_method: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
):
name = ""
if args:
name = args[0]
elif self._db_api_integration.database:
name = self._db_api_integration.database
else:
name = self._db_api_integration.name
name = self.get_operation_name(cursor, args)
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)

with self._db_api_integration.get_tracer().start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, *args)
self._populate_span(span, cursor, *args)
try:
result = query_method(*args, **kwargs)
return result
Expand All @@ -370,7 +393,7 @@ def traced_execution(


def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = TracedCursor(db_api_integration)
_cursor_tracer = CursorTracer(db_api_integration)

# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):
Expand All @@ -380,18 +403,18 @@ def __init__(self, cursor, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, cursor)

def execute(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
)

def __enter__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_span_succeeded(self):
spans_list = self.memory_exporter.get_finished_spans()
owais marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)

self.assertEqual(span.attributes["component"], "testcomponent")
Expand All @@ -65,6 +65,27 @@ def test_span_succeeded(self):
span.status.status_code, trace_api.status.StatusCode.UNSET
)

def test_span_name(self):
db_integration = dbapi.DatabaseApiIntegration(
self.tracer, "testcomponent", "testtype", {}
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
cursor = mock_connection.cursor()
cursor.execute("Test query", ("param1Value", False))
cursor.execute(
"""multi
line
query"""
)
cursor.execute("tab\tseparated query")
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 3)
self.assertEqual(spans_list[0].name, "Test")
self.assertEqual(spans_list[1].name, "multi")
self.assertEqual(spans_list[2].name, "tab")

def test_span_succeeded_with_capture_of_statement_parameters(self):
connection_props = {
"database": "testdatabase",
Expand Down Expand Up @@ -93,7 +114,7 @@ def test_span_succeeded_with_capture_of_statement_parameters(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)

self.assertEqual(span.attributes["component"], "testcomponent")
Expand Down
Loading