diff --git a/.pylintrc b/.pylintrc index 9f767af293..2a2ad87040 100644 --- a/.pylintrc +++ b/.pylintrc @@ -78,7 +78,7 @@ disable=missing-docstring, protected-access, # temp-pylint-upgrade super-init-not-called, # temp-pylint-upgrade invalid-overridden-method, # temp-pylint-upgrade - missing-module-docstring, # temp-pylint-upgrad, # temp-pylint-upgradee + missing-module-docstring, # temp-pylint-upgrade # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ef56d7034..86fc70995c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,9 +56,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager ([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235)) - `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk - ([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235)) + ([#283](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/283)) - `opentelemetry-instrumentation-sqlalchemy` Use SQL operation and DB name as span name. ([#254](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/254)) +- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer` + ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) +- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types. + ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) +- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query. + ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) ## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26 diff --git a/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py b/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py index 9824237565..b130ef2b51 100644 --- a/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py @@ -4,8 +4,8 @@ from aiopg.utils import _ContextManager, _PoolAcquireContextManager from opentelemetry.instrumentation.dbapi import ( + CursorTracer, DatabaseApiIntegration, - TracedCursor, ) from opentelemetry.trace import SpanKind from opentelemetry.trace.status import Status, StatusCode @@ -94,25 +94,29 @@ async def _acquire(self): return TracedPoolProxy(pool, *args, **kwargs) -class AsyncTracedCursor(TracedCursor): +class AsyncCursorTracer(CursorTracer): async def traced_execution( self, + cursor, query_method: typing.Callable[..., typing.Any], *args: typing.Tuple[typing.Any, typing.Any], **kwargs: typing.Dict[typing.Any, typing.Any] ): name = "" - if len(args) > 0 and args[0]: - name = args[0] - elif self._db_api_integration.database: - name = self._db_api_integration.database - else: - name = self._db_api_integration.name + if args: + name = self.get_operation_name(cursor, args) + + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) with self._db_api_integration.get_tracer().start_as_current_span( name, kind=SpanKind.CLIENT ) as span: - self._populate_span(span, *args) + self._populate_span(span, cursor, *args) try: result = await query_method(*args, **kwargs) return result @@ -123,10 +127,10 @@ async def traced_execution( def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): - _traced_cursor = AsyncTracedCursor(db_api_integration) + _traced_cursor = AsyncCursorTracer(db_api_integration) # pylint: disable=abstract-method - class AsyncTracedCursorProxy(AsyncProxyObject): + class AsyncCursorTracerProxy(AsyncProxyObject): # pylint: disable=unused-argument def __init__(self, cursor, *args, **kwargs): @@ -134,20 +138,20 @@ def __init__(self, cursor, *args, **kwargs): async def execute(self, *args, **kwargs): result = await _traced_cursor.traced_execution( - self.__wrapped__.execute, *args, **kwargs + self, self.__wrapped__.execute, *args, **kwargs ) return result async def executemany(self, *args, **kwargs): result = await _traced_cursor.traced_execution( - self.__wrapped__.executemany, *args, **kwargs + self, self.__wrapped__.executemany, *args, **kwargs ) return result async def callproc(self, *args, **kwargs): result = await _traced_cursor.traced_execution( - self.__wrapped__.callproc, *args, **kwargs + self, self.__wrapped__.callproc, *args, **kwargs ) return result - return AsyncTracedCursorProxy(cursor, *args, **kwargs) + return AsyncCursorTracerProxy(cursor, *args, **kwargs) diff --git a/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py b/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py index ad935cfdfd..78c342df9b 100644 --- a/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py @@ -256,7 +256,7 @@ def test_span_succeeded(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "Test query") + self.assertEqual(span.name, "Test") self.assertIs(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes["component"], "testcomponent") diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py index 197f4ade44..67c25f9881 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py @@ -60,6 +60,7 @@ def trace_integration( connection_attributes: typing.Dict = None, tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + db_api_integration_factory=None, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ @@ -86,6 +87,7 @@ def trace_integration( version=__version__, tracer_provider=tracer_provider, capture_parameters=capture_parameters, + db_api_integration_factory=db_api_integration_factory, ) @@ -99,6 +101,7 @@ def wrap_connect( version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + db_api_integration_factory=None, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ @@ -115,6 +118,9 @@ def wrap_connect( capture_parameters: Configure if db.statement.parameters should be captured. """ + db_api_integration_factory = ( + db_api_integration_factory or DatabaseApiIntegration + ) # pylint: disable=unused-argument def wrap_connect_( @@ -123,7 +129,7 @@ def wrap_connect_( args: typing.Tuple[typing.Any, typing.Any], kwargs: typing.Dict[typing.Any, typing.Any], ): - db_integration = DatabaseApiIntegration( + db_integration = db_api_integration_factory( name, database_component, database_type=database_type, @@ -314,16 +320,19 @@ def __exit__(self, *args, **kwargs): return TracedConnectionProxy(connection, *args, **kwargs) -class TracedCursor: +class CursorTracer: def __init__(self, db_api_integration: DatabaseApiIntegration): self._db_api_integration = db_api_integration def _populate_span( - self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any] + self, + span: trace_api.Span, + cursor, + *args: typing.Tuple[typing.Any, typing.Any] ): if not span.is_recording(): return - statement = args[0] if args else "" + statement = self.get_statement(cursor, args) span.set_attribute( "component", self._db_api_integration.database_component ) @@ -342,24 +351,38 @@ def _populate_span( if self._db_api_integration.capture_parameters and len(args) > 1: span.set_attribute("db.statement.parameters", str(args[1])) + def get_operation_name(self, cursor, args): # pylint: disable=no-self-use + if args and isinstance(args[0], str): + return args[0].split()[0] + return "" + + def get_statement(self, cursor, args): # pylint: disable=no-self-use + if not args: + return "" + statement = args[0] + if isinstance(statement, bytes): + return statement.decode("utf8", "replace") + return statement + def traced_execution( self, + cursor, query_method: typing.Callable[..., typing.Any], *args: typing.Tuple[typing.Any, typing.Any], **kwargs: typing.Dict[typing.Any, typing.Any] ): - name = "" - if args: - name = args[0] - elif self._db_api_integration.database: - name = self._db_api_integration.database - else: - name = self._db_api_integration.name + name = self.get_operation_name(cursor, args) + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) with self._db_api_integration.get_tracer().start_as_current_span( name, kind=SpanKind.CLIENT ) as span: - self._populate_span(span, *args) + self._populate_span(span, cursor, *args) try: result = query_method(*args, **kwargs) return result @@ -370,7 +393,7 @@ def traced_execution( def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): - _traced_cursor = TracedCursor(db_api_integration) + _cursor_tracer = CursorTracer(db_api_integration) # pylint: disable=abstract-method class TracedCursorProxy(wrapt.ObjectProxy): @@ -380,18 +403,18 @@ def __init__(self, cursor, *args, **kwargs): wrapt.ObjectProxy.__init__(self, cursor) def execute(self, *args, **kwargs): - return _traced_cursor.traced_execution( - self.__wrapped__.execute, *args, **kwargs + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.execute, *args, **kwargs ) def executemany(self, *args, **kwargs): - return _traced_cursor.traced_execution( - self.__wrapped__.executemany, *args, **kwargs + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs ) def callproc(self, *args, **kwargs): - return _traced_cursor.traced_execution( - self.__wrapped__.callproc, *args, **kwargs + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs ) def __enter__(self): diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py index e69bf60c9d..c07f44c2b4 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py @@ -50,7 +50,7 @@ def test_span_succeeded(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "Test query") + self.assertEqual(span.name, "Test") self.assertIs(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes["component"], "testcomponent") @@ -65,6 +65,27 @@ def test_span_succeeded(self): span.status.status_code, trace_api.status.StatusCode.UNSET ) + def test_span_name(self): + db_integration = dbapi.DatabaseApiIntegration( + self.tracer, "testcomponent", "testtype", {} + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + cursor = mock_connection.cursor() + cursor.execute("Test query", ("param1Value", False)) + cursor.execute( + """multi + line + query""" + ) + cursor.execute("tab\tseparated query") + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 3) + self.assertEqual(spans_list[0].name, "Test") + self.assertEqual(spans_list[1].name, "multi") + self.assertEqual(spans_list[2].name, "tab") + def test_span_succeeded_with_capture_of_statement_parameters(self): connection_props = { "database": "testdatabase", @@ -93,7 +114,7 @@ def test_span_succeeded_with_capture_of_statement_parameters(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "Test query") + self.assertEqual(span.name, "Test") self.assertIs(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes["component"], "testcomponent") diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py index 4b8799402e..ef807963aa 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py @@ -39,12 +39,19 @@ --- """ +import typing + import psycopg2 +from psycopg2.extensions import ( + cursor as pg_cursor, # pylint: disable=no-name-in-module +) +from psycopg2.sql import Composed # pylint: disable=no-name-in-module from opentelemetry.instrumentation import dbapi from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg2.version import __version__ -from opentelemetry.trace import get_tracer + +_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" class Psycopg2Instrumentor(BaseInstrumentor): @@ -62,7 +69,6 @@ def _instrument(self, **kwargs): """Integrate with PostgreSQL Psycopg library. Psycopg: http://initd.org/psycopg/ """ - tracer_provider = kwargs.get("tracer_provider") dbapi.wrap_connect( @@ -74,39 +80,101 @@ def _instrument(self, **kwargs): self._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + db_api_integration_factory=DatabaseApiIntegration, ) def _uninstrument(self, **kwargs): """"Disable Psycopg2 instrumentation""" dbapi.unwrap_connect(psycopg2, "connect") - # pylint:disable=no-self-use - def instrument_connection(self, connection): - """Enable instrumentation in a Psycopg2 connection. - - Args: - connection: The connection to instrument. + # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql + def instrument_connection(self, connection): # pylint: disable=no-self-use + setattr( + connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory + ) + connection.cursor_factory = _new_cursor_factory() + return connection + + # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql + def uninstrument_connection( + self, connection + ): # pylint: disable=no-self-use + connection.cursor_factory = getattr( + connection, _OTEL_CURSOR_FACTORY_KEY, None + ) + return connection + + +# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql +class DatabaseApiIntegration(dbapi.DatabaseApiIntegration): + def wrapped_connection( + self, + connect_method: typing.Callable[..., typing.Any], + args: typing.Tuple[typing.Any, typing.Any], + kwargs: typing.Dict[typing.Any, typing.Any], + ): + """Add object proxy to connection object.""" + base_cursor_factory = kwargs.pop("cursor_factory", None) + new_factory_kwargs = {"db_api": self} + if base_cursor_factory: + new_factory_kwargs["base_factory"] = base_cursor_factory + kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs) + connection = connect_method(*args, **kwargs) + self.get_connection_attributes(connection) + return connection + + +class CursorTracer(dbapi.CursorTracer): + def get_operation_name(self, cursor, args): + if not args: + return "" + + statement = args[0] + if isinstance(statement, Composed): + statement = statement.as_string(cursor) + + if isinstance(statement, str): + return statement.split()[0] + + return "" + + def get_statement(self, cursor, args): + if not args: + return "" + + statement = args[0] + if isinstance(statement, Composed): + statement = statement.as_string(cursor) + return statement + + +def _new_cursor_factory(db_api=None, base_factory=None): + if not db_api: + db_api = DatabaseApiIntegration( + __name__, + Psycopg2Instrumentor._DATABASE_COMPONENT, + database_type=Psycopg2Instrumentor._DATABASE_TYPE, + connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES, + version=__version__, + ) - Returns: - An instrumented connection. - """ - tracer = get_tracer(__name__, __version__) + base_factory = base_factory or pg_cursor + _cursor_tracer = CursorTracer(db_api) - return dbapi.instrument_connection( - tracer, - connection, - self._DATABASE_COMPONENT, - self._DATABASE_TYPE, - self._CONNECTION_ATTRIBUTES, - ) + class TracedCursorFactory(base_factory): + def execute(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().execute, *args, **kwargs + ) - def uninstrument_connection(self, connection): - """Disable instrumentation in a Psycopg2 connection. + def executemany(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().executemany, *args, **kwargs + ) - Args: - connection: The connection to uninstrument. + def callproc(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().callproc, *args, **kwargs + ) - Returns: - An uninstrumented connection. - """ - return dbapi.uninstrument_connection(connection) + return TracedCursorFactory diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py index cb127c7a5e..e25fd7a934 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import types from unittest import mock import psycopg2 @@ -22,15 +23,69 @@ from opentelemetry.test.test_base import TestBase +class MockCursor: + + execute = mock.MagicMock(spec=types.MethodType) + execute.__name__ = "execute" + + executemany = mock.MagicMock(spec=types.MethodType) + executemany.__name__ = "executemany" + + callproc = mock.MagicMock(spec=types.MethodType) + callproc.__name__ = "callproc" + + rowcount = "SomeRowCount" + + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + return self + + +class MockConnection: + + commit = mock.MagicMock(spec=types.MethodType) + commit.__name__ = "commit" + + rollback = mock.MagicMock(spec=types.MethodType) + rollback.__name__ = "rollback" + + def __init__(self, *args, **kwargs): + self.cursor_factory = kwargs.pop("cursor_factory", None) + + def cursor(self): + if self.cursor_factory: + return self.cursor_factory(self) + return MockCursor() + + def get_dsn_parameters(self): # pylint: disable=no-self-use + return dict(dbname="test") + + class TestPostgresqlIntegration(TestBase): + def setUp(self): + self.cursor_mock = mock.patch( + "opentelemetry.instrumentation.psycopg2.pg_cursor", MockCursor + ) + self.connection_mock = mock.patch("psycopg2.connect", MockConnection) + + self.cursor_mock.start() + self.connection_mock.start() + def tearDown(self): super().tearDown() + self.memory_exporter.clear() + self.cursor_mock.stop() + self.connection_mock.stop() with self.disable_logging(): Psycopg2Instrumentor().uninstrument() - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_instrumentor(self, mock_connect): + def test_instrumentor(self): Psycopg2Instrumentor().instrument() cnx = psycopg2.connect(database="test") @@ -60,9 +115,8 @@ def test_instrumentor(self, mock_connect): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_not_recording(self, mock_connect): + def test_not_recording(self): mock_tracer = mock.Mock() mock_span = mock.Mock() mock_span.is_recording.return_value = False @@ -83,9 +137,8 @@ def test_not_recording(self, mock_connect): Psycopg2Instrumentor().uninstrument() - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_custom_tracer_provider(self, mock_connect): + def test_custom_tracer_provider(self): resource = resources.Resource.create({}) result = self.create_tracer_provider(resource=resource) tracer_provider, exporter = result @@ -103,9 +156,8 @@ def test_custom_tracer_provider(self, mock_connect): self.assertIs(span.resource, resource) - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_instrument_connection(self, mock_connect): + def test_instrument_connection(self): cnx = psycopg2.connect(database="test") query = "SELECT * FROM test" cursor = cnx.cursor() @@ -121,9 +173,8 @@ def test_instrument_connection(self, mock_connect): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_uninstrument_connection(self, mock_connect): + def test_uninstrument_connection(self): Psycopg2Instrumentor().instrument() cnx = psycopg2.connect(database="test") query = "SELECT * FROM test" diff --git a/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py b/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py index a4fc887061..6b8b0cb696 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py +++ b/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py @@ -60,7 +60,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -68,7 +68,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = [("1",), ("2",), ("3",)] self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" diff --git a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py index ec6eed3132..df818ab137 100644 --- a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py @@ -81,7 +81,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id INT)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_connection_context_manager(self): """Should create a child span for execute with connection context""" @@ -90,7 +90,7 @@ def test_execute_with_connection_context_manager(self): with self._connection as conn: cursor = conn.cursor() cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_cursor_context_manager(self): """Should create a child span for execute with cursor context""" @@ -98,7 +98,7 @@ def test_execute_with_cursor_context_manager(self): with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -106,7 +106,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py index 030aecc66d..423151316a 100644 --- a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py @@ -89,7 +89,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): async_call(self._cursor.execute(stmt)) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -98,7 +98,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) async_call(self._cursor.executemany(stmt, data)) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" @@ -167,7 +167,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): async_call(self._cursor.execute(stmt)) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -176,7 +176,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) async_call(self._cursor.executemany(stmt, data)) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py index 76116dfd28..53fe3cacfb 100644 --- a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py @@ -15,6 +15,7 @@ import os import psycopg2 +from psycopg2 import sql from opentelemetry import trace as trace_api from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor @@ -81,7 +82,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_connection_context_manager(self): """Should create a child span for execute with connection context""" @@ -90,7 +91,7 @@ def test_execute_with_connection_context_manager(self): with self._connection as conn: cursor = conn.cursor() cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_cursor_context_manager(self): """Should create a child span for execute with cursor context""" @@ -98,7 +99,7 @@ def test_execute_with_cursor_context_manager(self): with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") self.assertTrue(cursor.closed) def test_executemany(self): @@ -107,7 +108,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" @@ -116,3 +117,30 @@ def test_callproc(self): ): self._cursor.callproc("test", ()) self.validate_spans("test") + + def test_register_types(self): + psycopg2.extras.register_default_jsonb( + conn_or_curs=self._cursor, loads=lambda x: x + ) + + def test_composed_queries(self): + stmt = "CREATE TABLE IF NOT EXISTS users (id integer, name varchar)" + with self._tracer.start_as_current_span("rootSpan"): + self._cursor.execute(stmt) + self.validate_spans("CREATE") + + self._cursor.execute( + sql.SQL("SELECT FROM {table} where {field}='{value}'").format( + table=sql.Identifier("users"), + field=sql.Identifier("name"), + value=sql.Identifier("abc"), + ) + ) + + spans = self.memory_exporter.get_finished_spans() + span = spans[2] + self.assertEqual(span.name, "SELECT") + self.assertEqual( + span.attributes["db.statement"], + 'SELECT FROM "users" where "name"=\'"abc"\'', + ) diff --git a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py index b8e4404805..278bfc4d42 100644 --- a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py @@ -78,7 +78,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id INT)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_cursor_context_manager(self): """Should create a child span for execute with cursor context""" @@ -86,7 +86,7 @@ def test_execute_with_cursor_context_manager(self): with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -94,7 +94,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc"""