diff --git a/instana/__init__.py b/instana/__init__.py index 5445cd51..867be0e8 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -73,6 +73,7 @@ def boot_agent(): from .instrumentation.tornado import server from .instrumentation import logging from .instrumentation import pymysql + from .instrumentation import psycopg2 from .instrumentation import redis from .instrumentation import sqlalchemy from .instrumentation import sudsjurko diff --git a/instana/instrumentation/pep0249.py b/instana/instrumentation/pep0249.py index b58cac91..207e3af2 100644 --- a/instana/instrumentation/pep0249.py +++ b/instana/instrumentation/pep0249.py @@ -20,9 +20,14 @@ def __init__(self, cursor, module_name, def _collect_kvs(self, span, sql): try: span.set_tag(ext.SPAN_KIND, 'exit') - span.set_tag(ext.DATABASE_INSTANCE, self._connect_params[1]['db']) + + if 'db' in self._connect_params[1]: + span.set_tag(ext.DATABASE_INSTANCE, self._connect_params[1]['db']) + elif 'database' in self._connect_params[1]: + span.set_tag(ext.DATABASE_INSTANCE, self._connect_params[1]['database']) + span.set_tag(ext.DATABASE_STATEMENT, sql_sanitizer(sql)) - span.set_tag(ext.DATABASE_TYPE, 'mysql') + # span.set_tag(ext.DATABASE_TYPE, 'mysql') span.set_tag(ext.DATABASE_USER, self._connect_params[1]['user']) span.set_tag('host', "%s:%s" % (self._connect_params[1]['host'], @@ -35,8 +40,8 @@ def _collect_kvs(self, span, sql): def execute(self, sql, params=None): parent_span = tracer.active_span - # If we're not tracing, just return - if parent_span is None: + # If not tracing or we're being called from sqlalchemy, just pass through + if (parent_span is None) or (parent_span.operation_name == "sqlalchemy"): return self.__wrapped__.execute(sql, params) with tracer.start_active_span(self._module_name, child_of=parent_span) as scope: @@ -54,8 +59,8 @@ def execute(self, sql, params=None): def executemany(self, sql, seq_of_parameters): parent_span = tracer.active_span - # If we're not tracing, just return - if parent_span is None: + # If not tracing or we're being called from sqlalchemy, just pass through + if (parent_span is None) or (parent_span.operation_name == "sqlalchemy"): return self.__wrapped__.executemany(sql, seq_of_parameters) with tracer.start_active_span(self._module_name, child_of=parent_span) as scope: @@ -73,8 +78,8 @@ def executemany(self, sql, seq_of_parameters): def callproc(self, proc_name, params): parent_span = tracer.active_span - # If we're not tracing, just return - if parent_span is None: + # If not tracing or we're being called from sqlalchemy, just pass through + if (parent_span is None) or (parent_span.operation_name == "sqlalchemy"): return self.__wrapped__.execute(proc_name, params) with tracer.start_active_span(self._module_name, child_of=parent_span) as scope: diff --git a/instana/instrumentation/psycopg2.py b/instana/instrumentation/psycopg2.py new file mode 100644 index 00000000..d2fb7a13 --- /dev/null +++ b/instana/instrumentation/psycopg2.py @@ -0,0 +1,30 @@ +from __future__ import absolute_import + +import copy +import wrapt + +from ..log import logger +from .pep0249 import ConnectionFactory + +try: + import psycopg2 + import psycopg2.extras + + cf = ConnectionFactory(connect_func=psycopg2.connect, module_name='postgres') + + setattr(psycopg2, 'connect', cf) + if hasattr(psycopg2, 'Connect'): + setattr(psycopg2, 'Connect', cf) + + @wrapt.patch_function_wrapper('psycopg2.extensions', 'register_type') + def register_type_with_instana(wrapped, instance, args, kwargs): + args_clone = list(copy.copy(args)) + + if hasattr(args_clone[1], '__wrapped__'): + args_clone[1] = args_clone[1].__wrapped__ + + return wrapped(*args_clone, **kwargs) + + logger.debug("Instrumenting psycopg2") +except ImportError: + pass diff --git a/instana/json_span.py b/instana/json_span.py index e6cc3eb8..628c1986 100644 --- a/instana/json_span.py +++ b/instana/json_span.py @@ -36,6 +36,7 @@ class Data(BaseSpan): custom = None http = None log = None + pg = None rabbitmq = None redis = None rpc = None @@ -74,6 +75,15 @@ class MySQLData(BaseSpan): error = None +class PostgresData(BaseSpan): + db = None + host = None + port = None + user = None + stmt = None + error = None + + class RabbitmqData(BaseSpan): exchange = None queue = None diff --git a/instana/recorder.py b/instana/recorder.py index 97f5dcf6..d532532a 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -9,7 +9,7 @@ import instana.singletons -from .json_span import (CustomData, Data, HttpData, JsonSpan, LogData, MySQLData, +from .json_span import (CustomData, Data, HttpData, JsonSpan, LogData, MySQLData, PostgresData, RabbitmqData, RedisData, RenderData, RPCData, SDKData, SoapData, SQLAlchemyData) from .log import logger @@ -24,12 +24,12 @@ class InstanaRecorder(SpanRecorder): THREAD_NAME = "Instana Span Reporting" registered_spans = ("aiohttp-client", "aiohttp-server", "django", "log", "memcache", "mysql", - "rabbitmq", "redis", "render", "rpc-client", "rpc-server", "sqlalchemy", "soap", + "postgres", "rabbitmq", "redis", "render", "rpc-client", "rpc-server", "sqlalchemy", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") http_spans = ("aiohttp-client", "aiohttp-server", "django", "http", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") - exit_spans = ("aiohttp-client", "log", "memcache", "mysql", "rabbitmq", "redis", "rpc-client", + exit_spans = ("aiohttp-client", "log", "memcache", "mysql", "postgres", "rabbitmq", "redis", "rpc-client", "sqlalchemy", "soap", "tornado-client", "urllib3") entry_spans = ("aiohttp-server", "django", "wsgi", "rabbitmq", "rpc-server", "tornado-server") local_spans = ("log", "render") @@ -202,6 +202,16 @@ def build_registered_span(self, span): tskey = list(data.custom.logs.keys())[0] data.mysql.error = data.custom.logs[tskey]['message'] + if span.operation_name == "postgres": + data.pg = PostgresData(host=span.tags.pop('host', None), + db=span.tags.pop(ext.DATABASE_INSTANCE, None), + user=span.tags.pop(ext.DATABASE_USER, None), + stmt=span.tags.pop(ext.DATABASE_STATEMENT, None), + error=span.tags.pop('pg.error', None)) + if (data.custom is not None) and (data.custom.logs is not None) and len(data.custom.logs): + tskey = list(data.custom.logs.keys())[0] + data.pg.error = data.custom.logs[tskey]['message'] + if span.operation_name == "log": data.log = {} # use last special key values diff --git a/setup.py b/setup.py index f2a3cebc..5ebe83d2 100644 --- a/setup.py +++ b/setup.py @@ -76,10 +76,10 @@ def check_setuptools(): 'mock>=2.0.0', 'mysqlclient>=1.3.14;python_version>="3.5"', 'MySQL-python>=1.2.5;python_version<="2.7"', - 'psycopg2>=2.7.1', 'PyMySQL[rsa]>=0.9.1', 'pyOpenSSL>=16.1.0;python_version<="2.7"', 'pytest>=3.0.1', + 'psycopg2>=2.7.1', 'redis<3.0.0', 'requests>=2.17.1', 'sqlalchemy>=1.1.15', diff --git a/tests/helpers.py b/tests/helpers.py index 96aec224..5d64a58e 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -21,23 +21,12 @@ """ PostgreSQL Environment """ -if 'POSTGRESQL_HOST' in os.environ: - testenv['postgresql_host']= os.environ['POSTGRESQL_HOST'] -elif 'TRAVIS_POSTGRESQL_HOST' in os.environ: - testenv['postgresql_host'] = os.environ['TRAVIS_POSTGRESQL_HOST'] -else: - testenv['postgresql_host'] = '127.0.0.1' - -testenv['postgresql_port'] = int(os.environ.get('POSTGRESQL_PORT', '3306')) +testenv['postgresql_host'] = os.environ.get('POSTGRESQL_HOST', '127.0.0.1') +testenv['postgresql_port'] = int(os.environ.get('POSTGRESQL_PORT', '5432')) testenv['postgresql_db'] = os.environ.get('POSTGRESQL_DB', 'circle_test') testenv['postgresql_user'] = os.environ.get('POSTGRESQL_USER', 'root') +testenv['postgresql_pw'] = os.environ.get('POSTGRESQL_PW', '') -if 'POSTGRESQL_PW' in os.environ: - testenv['postgresql_pw'] = os.environ['POSTGRESQL_PW'] -elif 'TRAVIS_POSTGRESQL_PASS' in os.environ: - testenv['postgresql_pw'] = os.environ['TRAVIS_POSTGRESQL_PASS'] -else: - testenv['postgresql_pw'] = '' """ Redis Environment diff --git a/tests/test_psycopg2.py b/tests/test_psycopg2.py new file mode 100644 index 00000000..c6a43e43 --- /dev/null +++ b/tests/test_psycopg2.py @@ -0,0 +1,211 @@ +from __future__ import absolute_import + +import logging + +from nose.tools import assert_equals + +from instana.singletons import tracer + +from .helpers import testenv + +import psycopg2 +import psycopg2.extras + +logger = logging.getLogger(__name__) + +create_table_query = """ +CREATE TABLE IF NOT EXISTS users( + id serial PRIMARY KEY, + name VARCHAR (50), + password VARCHAR (50), + email VARCHAR (355), + created_on TIMESTAMP, + last_login TIMESTAMP +); +""" + +create_proc_query = """\ +CREATE OR REPLACE FUNCTION test_proc(candidate VARCHAR(70)) +RETURNS text AS $$ +BEGIN + RETURN(SELECT name FROM users where email = candidate); +END; +$$ LANGUAGE plpgsql; +""" + +drop_proc_query = "DROP FUNCTION IF EXISTS test_proc(VARCHAR(70));" + +db = psycopg2.connect(host=testenv['postgresql_host'], port=testenv['postgresql_port'], + user=testenv['postgresql_user'], password=testenv['postgresql_pw'], + database=testenv['postgresql_db']) + +cursor = db.cursor() +cursor.execute(create_table_query) +cursor.execute(drop_proc_query) +cursor.execute(create_proc_query) +db.commit() +cursor.close() +db.close() + + +class TestPsycoPG2: + def setUp(self): + logger.warning("Postgresql connecting: %s:@%s:5432/%s", testenv['postgresql_user'], testenv['postgresql_host'], testenv['postgresql_db']) + self.db = psycopg2.connect(host=testenv['postgresql_host'], port=testenv['postgresql_port'], + user=testenv['postgresql_user'], password=testenv['postgresql_pw'], + database=testenv['postgresql_db']) + self.cursor = self.db.cursor() + self.recorder = tracer.recorder + self.recorder.clear_spans() + tracer.cur_ctx = None + + def tearDown(self): + """ Do nothing for now """ + return None + + def test_vanilla_query(self): + assert psycopg2.extras.register_uuid(None, self.db) + assert psycopg2.extras.register_uuid(None, self.db.cursor()) + + self.cursor.execute("""SELECT * from users""") + result = self.cursor.fetchone() + + assert_equals(6, len(result)) + + spans = self.recorder.queued_spans() + assert_equals(0, len(spans)) + + def test_basic_query(self): + with tracer.start_active_span('test'): + self.cursor.execute("""SELECT * from users""") + self.cursor.fetchone() + self.db.commit() + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(None, db_span.ec) + + assert_equals(db_span.n, "postgres") + assert_equals(db_span.data.pg.db, testenv['postgresql_db']) + assert_equals(db_span.data.pg.user, testenv['postgresql_user']) + assert_equals(db_span.data.pg.stmt, 'SELECT * from users') + assert_equals(db_span.data.pg.host, "%s:5432" % testenv['postgresql_host']) + + def test_basic_insert(self): + with tracer.start_active_span('test'): + self.cursor.execute("""INSERT INTO users(name, email) VALUES(%s, %s)""", ('beaker', 'beaker@muppets.com')) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(None, db_span.ec) + + assert_equals(db_span.n, "postgres") + assert_equals(db_span.data.pg.db, testenv['postgresql_db']) + assert_equals(db_span.data.pg.user, testenv['postgresql_user']) + assert_equals(db_span.data.pg.stmt, 'INSERT INTO users(name, email) VALUES(%s, %s)') + assert_equals(db_span.data.pg.host, "%s:5432" % testenv['postgresql_host']) + + def test_executemany(self): + result = None + with tracer.start_active_span('test'): + result = self.cursor.executemany("INSERT INTO users(name, email) VALUES(%s, %s)", + [('beaker', 'beaker@muppets.com'), ('beaker', 'beaker@muppets.com')]) + self.db.commit() + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(None, db_span.ec) + + assert_equals(db_span.n, "postgres") + assert_equals(db_span.data.pg.db, testenv['postgresql_db']) + assert_equals(db_span.data.pg.user, testenv['postgresql_user']) + assert_equals(db_span.data.pg.stmt, 'INSERT INTO users(name, email) VALUES(%s, %s)') + assert_equals(db_span.data.pg.host, "%s:5432" % testenv['postgresql_host']) + + def test_call_proc(self): + result = None + with tracer.start_active_span('test'): + result = self.cursor.callproc('test_proc', ('beaker',)) + + assert(type(result) is tuple) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(None, db_span.ec) + + assert_equals(db_span.n, "postgres") + assert_equals(db_span.data.pg.db, testenv['postgresql_db']) + assert_equals(db_span.data.pg.user, testenv['postgresql_user']) + assert_equals(db_span.data.pg.stmt, 'test_proc') + assert_equals(db_span.data.pg.host, "%s:5432" % testenv['postgresql_host']) + + def test_error_capture(self): + result = None + span = None + try: + with tracer.start_active_span('test'): + result = self.cursor.execute("""SELECT * from blah""") + self.cursor.fetchone() + except Exception: + pass + finally: + if span: + span.finish() + + assert(result is None) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(True, db_span.error) + assert_equals(1, db_span.ec) + assert_equals(db_span.data.pg.error, 'relation "blah" does not exist\nLINE 1: SELECT * from blah\n ^\n') + + assert_equals(db_span.n, "postgres") + assert_equals(db_span.data.pg.db, testenv['postgresql_db']) + assert_equals(db_span.data.pg.user, testenv['postgresql_user']) + assert_equals(db_span.data.pg.stmt, 'SELECT * from blah') + assert_equals(db_span.data.pg.host, "%s:5432" % testenv['postgresql_host'])