diff --git a/.circleci/config.yml b/.circleci/config.yml index 6221a0b9..3f18b7d6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -16,6 +16,7 @@ jobs: - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 - image: couchbase/server-sandbox:5.5.0 + - image: circleci/mongo:4.2.3-ram working_directory: ~/repo @@ -70,6 +71,7 @@ jobs: - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 - image: couchbase/server-sandbox:5.5.0 + - image: circleci/mongo:4.2.3-ram working_directory: ~/repo @@ -122,6 +124,7 @@ jobs: - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 - image: couchbase/server-sandbox:5.5.0 + - image: circleci/mongo:4.2.3-ram working_directory: ~/repo diff --git a/instana/__init__.py b/instana/__init__.py index 5cacd563..2cb79f56 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -25,8 +25,8 @@ import pkg_resources __author__ = 'Instana Inc.' -__copyright__ = 'Copyright 2019 Instana Inc.' -__credits__ = ['Pavlo Baron', 'Peter Giacomo Lombardo'] +__copyright__ = 'Copyright 2020 Instana Inc.' +__credits__ = ['Pavlo Baron', 'Peter Giacomo Lombardo', 'Andrey Slotin'] __license__ = 'MIT' __maintainer__ = 'Peter Giacomo Lombardo' __email__ = 'peter.lombardo@instana.com' @@ -82,6 +82,7 @@ def boot_agent(): from .instrumentation import sudsjurko from .instrumentation import urllib3 from .instrumentation.django import middleware + from .instrumentation import pymongo # Hooks from .hooks import hook_uwsgi diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py new file mode 100644 index 00000000..6c2ada3f --- /dev/null +++ b/instana/instrumentation/pymongo.py @@ -0,0 +1,95 @@ +from __future__ import absolute_import + +from ..log import logger +from ..singletons import tracer + +try: + import pymongo + from pymongo import monitoring + from bson import json_util + + class MongoCommandTracer(monitoring.CommandListener): + def __init__(self): + self.__active_commands = {} + + def started(self, event): + parent_span = tracer.active_span + + # return early if we're not tracing + if parent_span is None: + return + + with tracer.start_active_span("mongo", child_of=parent_span) as scope: + self._collect_connection_tags(scope.span, event) + self._collect_command_tags(scope.span, event) + + # include collection name into the namespace if provided + if event.command.has_key(event.command_name): + scope.span.set_tag("collection", event.command.get(event.command_name)) + + self.__active_commands[event.request_id] = scope + + def succeeded(self, event): + active_span = self.__active_commands.pop(event.request_id, None) + + # return early if we're not tracing + if active_span is None: + return + + def failed(self, event): + active_span = self.__active_commands.pop(event.request_id, None) + + # return early if we're not tracing + if active_span is None: + return + + active_span.log_exception(event.failure) + + def _collect_connection_tags(self, span, event): + (host, port) = event.connection_id + + span.set_tag("driver", "pymongo") + span.set_tag("host", host) + span.set_tag("port", str(port)) + span.set_tag("db", event.database_name) + + def _collect_command_tags(self, span, event): + """ + Extract MongoDB command name and arguments and attach it to the span + """ + cmd = event.command_name + span.set_tag("command", cmd) + + for key in ["filter", "query"]: + if event.command.has_key(key): + span.set_tag("filter", json_util.dumps(event.command.get(key))) + break + + # The location of command documents within the command object depends on the name + # of this command. This is the name -> command object key mapping + cmd_doc_locations = { + "insert": "documents", + "update": "updates", + "delete": "deletes", + "aggregate": "pipeline" + } + + cmd_doc = None + if cmd in cmd_doc_locations: + cmd_doc = event.command.get(cmd_doc_locations[cmd]) + elif cmd.lower() == "mapreduce": # mapreduce command was renamed to mapReduce in pymongo 3.9.0 + # mapreduce command consists of two mandatory parts: map and reduce + cmd_doc = { + "map": event.command.get("map"), + "reduce": event.command.get("reduce") + } + + if cmd_doc is not None: + span.set_tag("json", json_util.dumps(cmd_doc)) + + monitoring.register(MongoCommandTracer()) + + logger.debug("Instrumenting pymongo") + +except ImportError: + pass diff --git a/instana/json_span.py b/instana/json_span.py index 43fccdf4..4a569663 100644 --- a/instana/json_span.py +++ b/instana/json_span.py @@ -97,6 +97,15 @@ class MySQLData(BaseSpan): error = None +class MongoDBData(BaseSpan): + service = None + namespace = None + command = None + filter = None + json = None + error = None + + class PostgresData(BaseSpan): db = None host = None diff --git a/instana/recorder.py b/instana/recorder.py index 61d220d8..eb8c8d70 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -10,7 +10,7 @@ import instana.singletons from .json_span import (CassandraData, CouchbaseData, CustomData, Data, HttpData, JsonSpan, LogData, - MySQLData, PostgresData, RabbitmqData, RedisData, RenderData, + MongoDBData, MySQLData, PostgresData, RabbitmqData, RedisData, RenderData, RPCData, SDKData, SoapData, SQLAlchemyData) from .log import logger @@ -25,15 +25,16 @@ class InstanaRecorder(SpanRecorder): THREAD_NAME = "Instana Span Reporting" registered_spans = ("aiohttp-client", "aiohttp-server", "cassandra", "couchbase", "django", "log", - "memcache", "mysql", "postgres", "rabbitmq", "redis", "render", "rpc-client", + "memcache", "mongo", "mysql", "postgres", "rabbitmq", "redis", "render", "rpc-client", "rpc-server", "sqlalchemy", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") http_spans = ("aiohttp-client", "aiohttp-server", "django", "http", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") - exit_spans = ("aiohttp-client", "cassandra", "couchbase", "log", "memcache", "mysql", "postgres", - "rabbitmq", "redis", "rpc-client", "sqlalchemy", "soap", "tornado-client", "urllib3") + exit_spans = ("aiohttp-client", "cassandra", "couchbase", "log", "memcache", "mongo", "mysql", "postgres", + "rabbitmq", "redis", "rpc-client", "sqlalchemy", "soap", "tornado-client", "urllib3", + "pymongo") entry_spans = ("aiohttp-server", "django", "wsgi", "rabbitmq", "rpc-server", "tornado-server") @@ -237,6 +238,16 @@ def build_registered_span(self, span): tskey = list(data.custom.logs.keys())[0] data.pg.error = data.custom.logs[tskey]['message'] + elif span.operation_name == "mongo": + service = "%s:%s" % (span.tags.pop('host', None), span.tags.pop('port', None)) + namespace = "%s.%s" % (span.tags.pop('db', "?"), span.tags.pop('collection', "?")) + data.mongo = MongoDBData(service=service, + namespace=namespace, + command=span.tags.pop('command', None), + filter=span.tags.pop('filter', None), + json=span.tags.pop('json', None), + error=span.tags.pop('command', None)) + elif span.operation_name == "log": data.log = {} # use last special key values diff --git a/setup.py b/setup.py index 2800675c..45bbbda7 100644 --- a/setup.py +++ b/setup.py @@ -83,6 +83,7 @@ def check_setuptools(): 'pyOpenSSL>=16.1.0;python_version<="2.7"', 'pytest>=3.0.1', 'psycopg2>=2.7.1', + 'pymongo>=3.7.0', 'redis>3.0.0', 'requests>=2.17.1', 'sqlalchemy>=1.1.15', diff --git a/tests/helpers.py b/tests/helpers.py index 2aba684e..c6e7c418 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -48,6 +48,14 @@ testenv['redis_host'] = os.environ.get('REDIS_HOST', '127.0.0.1') +""" +MongoDB Environment +""" +testenv['mongodb_host'] = os.environ.get('MONGO_HOST', '127.0.0.1') +testenv['mongodb_port'] = os.environ.get('MONGO_PORT', '27017') +testenv['mongodb_user'] = os.environ.get('MONGO_USER', None) +testenv['mongodb_pw'] = os.environ.get('MONGO_PW', None) + def get_first_span_by_name(spans, name): for span in spans: if span.n == name: diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py new file mode 100644 index 00000000..5324f6f0 --- /dev/null +++ b/tests/test_pymongo.py @@ -0,0 +1,235 @@ +from __future__ import absolute_import + +import logging +import json + +from nose.tools import (assert_equals, assert_not_equals, assert_is_none, assert_is_not_none, + assert_false, assert_true, assert_is_instance, assert_greater, assert_list_equal) + +from .helpers import testenv +from instana.singletons import tracer +from instana.util import to_json + +import pymongo +import bson + +logger = logging.getLogger(__name__) + +class TestPyMongo: + def setUp(self): + logger.warn("Connecting to MongoDB mongo://%s:@%s:%s", + testenv['mongodb_user'], testenv['mongodb_host'], testenv['mongodb_port']) + + self.conn = pymongo.MongoClient(host=testenv['mongodb_host'], port=int(testenv['mongodb_port']), + username=testenv['mongodb_user'], password=testenv['mongodb_pw']) + self.conn.test.records.delete_many(filter={}) + + self.recorder = tracer.recorder + self.recorder.clear_spans() + + def tearDown(self): + return None + + def test_successful_find_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.find_one({"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "find") + + assert_equals(db_span.data.mongo.filter, '{"type": "string"}') + assert_is_none(db_span.data.mongo.json) + + def test_successful_insert_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.insert_one({"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "insert") + + assert_is_none(db_span.data.mongo.filter) + + def test_successful_update_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.update_one({"type": "string"}, {"$set": {"type": "int"}}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "update") + + assert_is_none(db_span.data.mongo.filter) + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_true({ + "q": {"type": "string"}, + "u": {"$set": {"type": "int"}}, + "multi": False, + "upsert": False + } in payload, db_span.data.mongo.json) + + def test_successful_delete_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.delete_one(filter={"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "delete") + + assert_is_none(db_span.data.mongo.filter) + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_true({"q": {"type": "string"}, "limit": 1} in payload, db_span.data.mongo.json) + + def test_successful_aggregate_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.count_documents({"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "aggregate") + + assert_is_none(db_span.data.mongo.filter) + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_true({"$match": {"type": "string"}} in payload, db_span.data.mongo.json) + + def test_successful_map_reduce_query(self): + mapper = "function () { this.tags.forEach(function(z) { emit(z, 1); }); }" + reducer = "function (key, values) { return len(values); }" + + with tracer.start_active_span("test"): + self.conn.test.records.map_reduce(bson.code.Code(mapper), bson.code.Code(reducer), "results", query={"x": {"$lt": 2}}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command.lower(), "mapreduce") # mapreduce command was renamed to mapReduce in pymongo 3.9.0 + + assert_equals(db_span.data.mongo.filter, '{"x": {"$lt": 2}}') + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_equals(payload["map"], {"$code": mapper}, db_span.data.mongo.json) + assert_equals(payload["reduce"], {"$code": reducer}, db_span.data.mongo.json) + + def test_successful_mutiple_queries(self): + with tracer.start_active_span("test"): + self.conn.test.records.bulk_write([pymongo.InsertOne({"type": "string"}), + pymongo.UpdateOne({"type": "string"}, {"$set": {"type": "int"}}), + pymongo.DeleteOne({"type": "string"})]) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 4) + + test_span = spans.pop() + + seen_span_ids = set() + commands = [] + for span in spans: + assert_equals(test_span.t, span.t) + assert_equals(span.p, test_span.s) + + # check if all spans got a unique id + assert_false(span.s in seen_span_ids) + + seen_span_ids.add(span.s) + commands.append(span.data.mongo.command) + + # ensure spans are ordered the same way as commands + assert_list_equal(commands, ["insert", "update", "delete"])