diff --git a/.travis.yml b/.travis.yml index c328f52f..7a533aba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,18 @@ language: python + python: - "2.7" - "3.3" - "3.4" - "3.5" - "3.6" + +before_install: + - "pip install --upgrade pip" + - "pip install --upgrade setuptools" + - 'mysql -e 'CREATE DATABASE travis_ci_test;' + install: "pip install -r requirements-test.txt" + + script: nosetests -v diff --git a/instana/__init__.py b/instana/__init__.py index 3e26c361..e66fd40c 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -56,4 +56,5 @@ def load(module): # noqa: ignore=W0611 from .instrumentation import urllib3 # noqa from .instrumentation import sudsjurko # noqa + from .instrumentation import mysqlpython # noqa from .instrumentation.django import middleware # noqa diff --git a/instana/instrumentation/mysqlpython.py b/instana/instrumentation/mysqlpython.py new file mode 100644 index 00000000..01057631 --- /dev/null +++ b/instana/instrumentation/mysqlpython.py @@ -0,0 +1,17 @@ +from __future__ import absolute_import + +from ..log import logger +from .pep0249 import ConnectionFactory + +try: + import MySQLdb # noqa + + cf = ConnectionFactory(connect_func=MySQLdb.connect, module_name='MySQLdb') + + setattr(MySQLdb, 'connect', cf) + if hasattr(MySQLdb, 'Connect'): + setattr(MySQLdb, 'Connect', cf) + + logger.debug("Instrumenting mysql-python") +except ImportError: + pass diff --git a/instana/instrumentation/pep0249.py b/instana/instrumentation/pep0249.py new file mode 100644 index 00000000..d01424a5 --- /dev/null +++ b/instana/instrumentation/pep0249.py @@ -0,0 +1,147 @@ +# This is a wrapper for PEP-0249: Python Database API Specification v2.0 +import opentracing.ext.tags as ext +import wrapt + +from ..tracer import internal_tracer +from ..log import logger + + +class CursorWrapper(wrapt.ObjectProxy): + __slots__ = ('_module_name', '_connect_params', '_cursor_params') + + def __init__(self, cursor, module_name, + connect_params=None, cursor_params=None): + super(CursorWrapper, self).__init__(wrapped=cursor) + self._module_name = module_name + self._connect_params = connect_params + self._cursor_params = cursor_params + + def _collect_kvs(self, span, sql): + try: + span.set_tag(ext.SPAN_KIND, 'exit') + span.set_tag(ext.DATABASE_INSTANCE, self._connect_params[1]['db']) + span.set_tag(ext.DATABASE_STATEMENT, sql) + span.set_tag(ext.DATABASE_TYPE, 'mysql') + span.set_tag(ext.DATABASE_USER, self._connect_params[1]['user']) + span.set_tag(ext.PEER_ADDRESS, "mysql://%s:%s" % + (self._connect_params[1]['host'], + self._connect_params[1]['port'])) + except Exception as e: + logger.debug(e) + finally: + return span + + + def execute(self, sql, params=None): + try: + span = None + context = internal_tracer.current_context() + + # If we're not tracing, just return + if context is None: + return self.__wrapped__.execute(sql, params) + + span = internal_tracer.start_span(self._module_name, child_of=context) + span = self._collect_kvs(span, sql) + span.set_tag('op', 'execute') + + result = self.__wrapped__.execute(sql, params) + except Exception as e: + if span: + span.log_exception(e) + raise + else: + return result + finally: + if span: + span.finish() + + def executemany(self, sql, seq_of_parameters): + try: + span = None + context = internal_tracer.current_context() + + # If we're not tracing, just return + if context is None: + return self.__wrapped__.execute(sql, params) + + span = internal_tracer.start_span(self._module_name, child_of=context) + span = self._collect_kvs(span, sql) + span.set_tag('op', 'executemany') + span.set_tag('count', len(seq_of_parameters)) + + result = self.__wrapped__.executemany(sql, seq_of_parameters) + except Exception as e: + if span: + span.log_exception(e) + raise + else: + return result + finally: + if span: + span.finish() + + + def callproc(self, proc_name, params): + try: + span = None + context = internal_tracer.current_context() + + # If we're not tracing, just return + if context is None: + return self.__wrapped__.execute(sql, params) + + span = internal_tracer.start_span(self._module_name, child_of=context) + span = self._collect_kvs(span, proc_name) + span.set_tag('op', 'callproc') + + result = self.__wrapped__.callproc(proc_name, params) + except Exception as e: + if span: + span.log_exception(e) + raise + else: + return result + finally: + if span: + span.finish() + + +class ConnectionWrapper(wrapt.ObjectProxy): + __slots__ = ('_module_name', '_connect_params') + + def __init__(self, connection, module_name, connect_params): + super(ConnectionWrapper, self).__init__(wrapped=connection) + self._module_name = module_name + self._connect_params = connect_params + + def cursor(self, *args, **kwargs): + return CursorWrapper( + cursor=self.__wrapped__.cursor(*args, **kwargs), + module_name=self._module_name, + connect_params=self._connect_params, + cursor_params=(args, kwargs) if args or kwargs else None) + + def begin(self): + return self.__wrapped__.begin() + + def commit(self): + return self.__wrapped__.commit() + + def rollback(self): + return self.__wrapped__.rollback() + + +class ConnectionFactory(object): + def __init__(self, connect_func, module_name): + self._connect_func = connect_func + self._module_name = module_name + self._wrapper_ctor = ConnectionWrapper + + def __call__(self, *args, **kwargs): + connect_params = (args, kwargs) if args or kwargs else None + + return self._wrapper_ctor( + connection=self._connect_func(*args, **kwargs), + module_name=self._module_name, + connect_params=connect_params) diff --git a/instana/span.py b/instana/span.py index 6d25e4ff..175e8e3f 100644 --- a/instana/span.py +++ b/instana/span.py @@ -16,7 +16,7 @@ def finish(self, finish_time=None): super(InstanaSpan, self).finish(finish_time) def log_exception(self, e): - if hasattr(e, 'message'): + if hasattr(e, 'message') and len(e.message): self.log_kv({'message': e.message}) elif hasattr(e, '__str__'): self.log_kv({'message': e.__str__()}) diff --git a/setup.py b/setup.py index 6f1662d7..7cb72522 100644 --- a/setup.py +++ b/setup.py @@ -26,10 +26,11 @@ 'test': [ 'nose>=1.0', 'flask>=0.12.2', + 'lxml>=3.4', + 'MySQL-python>=1.2.5;python_version<="2.7"', 'requests>=2.17.1', 'urllib3[secure]>=1.15', 'spyne>=2.9', - 'lxml>=3.4', 'suds-jurko>=0.6' ], }, diff --git a/tests/__init__.py b/tests/__init__.py index 5b1af350..2d16c57b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -13,11 +13,11 @@ # Spawn our background Flask app that the tests will throw # requests at. Don't continue until the test app is fully # up and running. -timer = threading.Thread(target=flaskalino.run) -timer.daemon = True -timer.name = "Background Flask app" +flask = threading.Thread(target=flaskalino.run) +flask.daemon = True +flask.name = "Background Flask app" print("Starting background Flask app...") -timer.start() +flask.start() # Background Soap Server @@ -25,11 +25,11 @@ # Spawn our background Flask app that the tests will throw # requests at. Don't continue until the test app is fully # up and running. -timer = threading.Thread(target=soapserver.serve_forever) -timer.daemon = True -timer.name = "Background Soap server" +soap = threading.Thread(target=soapserver.serve_forever) +soap.daemon = True +soap.name = "Background Soap server" print("Starting background Soap server...") -timer.start() +soap.start() time.sleep(1) diff --git a/tests/apps/soapserver4132.py b/tests/apps/soapserver4132.py index 088b3d54..b4e681bb 100644 --- a/tests/apps/soapserver4132.py +++ b/tests/apps/soapserver4132.py @@ -41,6 +41,15 @@ def client_fault(ctx): +# logging.basicConfig(level=logging.WARN) +logging.getLogger('suds').setLevel(logging.WARN) +logging.getLogger('suds.resolver').setLevel(logging.WARN) +logging.getLogger('spyne.protocol.xml').setLevel(logging.WARN) +logging.getLogger('spyne.model.complex').setLevel(logging.WARN) +logging.getLogger('spyne.interface._base').setLevel(logging.WARN) +logging.getLogger('spyne.interface.xml').setLevel(logging.WARN) +logging.getLogger('spyne.util.appreg').setLevel(logging.WARN) + app = Application([StanSoapService], 'instana.tests.app.ask_question', in_protocol=Soap11(validator='lxml'), out_protocol=Soap11()) @@ -48,11 +57,5 @@ def client_fault(ctx): wsgi_app = iWSGIMiddleware(WsgiApplication(app)) soapserver = make_server('127.0.0.1', 4132, wsgi_app) -logging.basicConfig(level=logging.WARN) -logging.getLogger('suds').setLevel(logging.WARN) -logging.getLogger('suds.resolver').setLevel(logging.WARN) -logging.getLogger('spyne.protocol.xml').setLevel(logging.WARN) -logging.getLogger('spyne.model.complex').setLevel(logging.WARN) - if __name__ == '__main__': soapserver.serve_forever() diff --git a/tests/test_mysql-python.py b/tests/test_mysql-python.py new file mode 100644 index 00000000..9041a3f3 --- /dev/null +++ b/tests/test_mysql-python.py @@ -0,0 +1,256 @@ +from __future__ import absolute_import + +import logging +import os +import sys +from unittest import SkipTest + +from nose.tools import assert_equals + +from instana.tracer import internal_tracer as tracer +from instana.util import to_json + +if sys.version_info < (3, 0): + import MySQLdb +else: + raise SkipTest("MySQL-python supported on Python 2.7 only") + + +logger = logging.getLogger(__name__) + + +if 'MYSQL_HOST' in os.environ: + mysql_host = os.environ['MYSQL_HOST'] +elif 'TRAVIS_MYSQL_HOST' in os.environ: + mysql_host = os.environ['TRAVIS_MYSQL_HOST'] +else: + mysql_host = '127.0.0.1' + +if 'MYSQL_PORT' in os.environ: + mysql_port = int(os.environ['MYSQL_PORT']) +else: + mysql_port = 3306 + +if 'MYSQL_DB' in os.environ: + mysql_db = os.environ['MYSQL_DB'] +else: + mysql_db = "travis_ci_test" + +if 'MYSQL_USER' in os.environ: + mysql_user = os.environ['MYSQL_USER'] +else: + mysql_user = "root" + +if 'MYSQL_PW' in os.environ: + mysql_pw = os.environ['MYSQL_PW'] +elif 'TRAVIS_MYSQL_PASS' in os.environ: + mysql_pw = os.environ['TRAVIS_MYSQL_PASS'] +else: + mysql_pw = '' + +create_table_query = 'CREATE TABLE IF NOT EXISTS users(id serial primary key, \ + name varchar(40) NOT NULL, email varchar(40) NOT NULL)' + +create_proc_query = """ +DROP PROCEDURE IF EXISTS test_proc; +CREATE PROCEDURE test_proc(IN t VARCHAR(255)) +BEGIN + SELECT name FROM users WHERE name = t; +END +""" + +db = MySQLdb.connect(host=mysql_host, port=mysql_port, + user=mysql_user, passwd=mysql_pw, + db=mysql_db) +db.cursor().execute(create_table_query) +db.cursor().execute(create_proc_query) +db.commit() +db.close() + + +class TestMySQLPython: + def setUp(self): + logger.warn("MySQL connecting: %s:@%s:3306/%s", mysql_user, mysql_host, mysql_db) + self.db = MySQLdb.connect(host=mysql_host, port=mysql_port, + user=mysql_user, passwd=mysql_pw, + db=mysql_db) + self.cursor = self.db.cursor() + self.recorder = tracer.recorder + self.recorder.clear_spans() + tracer.cur_ctx = None + + def tearDown(self): + """ Do nothing for now """ + # after each test, tracer context should be None (not tracing) + # assert_equals(None, tracer.current_context()) + return None + + def test_vanilla_query(self): + self.cursor.execute("""SELECT 1""") + result = self.cursor.fetchone() + assert_equals((1,), result) + + def test_basic_query(self): + span = tracer.start_span('test') + result = self.cursor.execute("""SELECT * from users""") + rows = self.cursor.fetchone() + span.finish() + + # import ipdb; ipdb.set_trace() + + assert(result >= 0) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(0, db_span.ec) + + assert_equals(db_span.data.sdk.name, "MySQLdb") + assert_equals(db_span.data.sdk.custom.tags['db.instance'], 'nodedb') + assert_equals(db_span.data.sdk.custom.tags['db.type'], 'mysql') + assert_equals(db_span.data.sdk.custom.tags['db.user'], 'root') + assert_equals(db_span.data.sdk.custom.tags['db.statement'], 'SELECT * from users') + assert_equals(db_span.data.sdk.custom.tags['peer.address'], 'mysql://mazzo:3306') + assert_equals(db_span.data.sdk.custom.tags['span.kind'], 'exit') + + def test_basic_insert(self): + span = tracer.start_span('test') + result = self.cursor.execute( + """INSERT INTO users(name, email) VALUES(%s, %s)""", + ('beaker', 'beaker@muppets.com')) + + span.finish() + + assert_equals(1, result) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(0, db_span.ec) + + assert_equals(db_span.data.sdk.name, "MySQLdb") + assert_equals(db_span.data.sdk.custom.tags['db.instance'], 'nodedb') + assert_equals(db_span.data.sdk.custom.tags['db.type'], 'mysql') + assert_equals(db_span.data.sdk.custom.tags['db.user'], 'root') + assert_equals(db_span.data.sdk.custom.tags['db.statement'], 'INSERT INTO users(name, email) VALUES(%s, %s)') + assert_equals(db_span.data.sdk.custom.tags['peer.address'], 'mysql://mazzo:3306') + assert_equals(db_span.data.sdk.custom.tags['span.kind'], 'exit') + assert_equals(db_span.data.sdk.custom.tags['op'], 'execute') + + def test_executemany(self): + span = tracer.start_span('test') + result = self.cursor.executemany("INSERT INTO users(name, email) VALUES(%s, %s)", + [('beaker', 'beaker@muppets.com'), ('beaker', 'beaker@muppets.com')]) + self.db.commit() + span.finish() + + assert_equals(2, result) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(0, db_span.ec) + + assert_equals(db_span.data.sdk.name, "MySQLdb") + assert_equals(db_span.data.sdk.custom.tags['db.instance'], 'nodedb') + assert_equals(db_span.data.sdk.custom.tags['db.type'], 'mysql') + assert_equals(db_span.data.sdk.custom.tags['db.user'], 'root') + assert_equals(db_span.data.sdk.custom.tags['db.statement'], 'INSERT INTO users(name, email) VALUES(%s, %s)') + assert_equals(db_span.data.sdk.custom.tags['peer.address'], 'mysql://mazzo:3306') + assert_equals(db_span.data.sdk.custom.tags['span.kind'], 'exit') + assert_equals(db_span.data.sdk.custom.tags['op'], 'executemany') + assert_equals(db_span.data.sdk.custom.tags['count'], 2) + + def test_call_proc(self): + span = tracer.start_span('test') + result = self.cursor.callproc('test_proc', ('beaker',)) + span.finish() + + assert(result) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(None, db_span.error) + assert_equals(0, db_span.ec) + + assert_equals(db_span.data.sdk.name, "MySQLdb") + assert_equals(db_span.data.sdk.custom.tags['db.instance'], 'nodedb') + assert_equals(db_span.data.sdk.custom.tags['db.type'], 'mysql') + assert_equals(db_span.data.sdk.custom.tags['db.user'], 'root') + assert_equals(db_span.data.sdk.custom.tags['db.statement'], 'test_proc') + assert_equals(db_span.data.sdk.custom.tags['peer.address'], 'mysql://mazzo:3306') + assert_equals(db_span.data.sdk.custom.tags['span.kind'], 'exit') + assert_equals(db_span.data.sdk.custom.tags['op'], 'callproc') + + def test_error_capture(self): + result = None + span = None + try: + span = tracer.start_span('test') + result = self.cursor.execute("""SELECT * from blah""") + rows = self.cursor.fetchone() + except Exception: + pass + finally: + if span: + span.finish() + + assert(result is None) + + spans = self.recorder.queued_spans() + assert_equals(2, len(spans)) + + db_span = spans[0] + test_span = spans[1] + + assert_equals("test", test_span.data.sdk.name) + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_equals(True, db_span.data.sdk.custom.tags['error']) + assert_equals(1, db_span.data.sdk.custom.tags['ec']) + + assert_equals(1, len(db_span.data.sdk.custom.logs.keys())) + key = db_span.data.sdk.custom.logs.keys()[0] + log = db_span.data.sdk.custom.logs[key]['message'] + assert_equals('(1146, "Table \'nodedb.blah\' doesn\'t exist")', log) + + assert_equals(db_span.data.sdk.name, "MySQLdb") + assert_equals(db_span.data.sdk.custom.tags['db.instance'], 'nodedb') + assert_equals(db_span.data.sdk.custom.tags['db.type'], 'mysql') + assert_equals(db_span.data.sdk.custom.tags['db.user'], 'root') + assert_equals(db_span.data.sdk.custom.tags['db.statement'], 'SELECT * from blah') + assert_equals(db_span.data.sdk.custom.tags['peer.address'], 'mysql://mazzo:3306') + assert_equals(db_span.data.sdk.custom.tags['span.kind'], 'exit')