From 22b8e1da59736c68846542cf45579ec67b15716f Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Tue, 11 Feb 2020 17:12:22 +0100 Subject: [PATCH 01/12] Add mongodb to the CircleCI build stack --- .circleci/config.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6221a0b9..3f18b7d6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -16,6 +16,7 @@ jobs: - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 - image: couchbase/server-sandbox:5.5.0 + - image: circleci/mongo:4.2.3-ram working_directory: ~/repo @@ -70,6 +71,7 @@ jobs: - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 - image: couchbase/server-sandbox:5.5.0 + - image: circleci/mongo:4.2.3-ram working_directory: ~/repo @@ -122,6 +124,7 @@ jobs: - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 - image: couchbase/server-sandbox:5.5.0 + - image: circleci/mongo:4.2.3-ram working_directory: ~/repo From 2e1a65823d9e15693f44ccfd295442d6fbd47e35 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Tue, 11 Feb 2020 17:20:02 +0100 Subject: [PATCH 02/12] Add basic pymongo command events listener implementation --- instana/instrumentation/pymongo.py | 24 ++++++++++++++++++++++++ setup.py | 1 + tests/helpers.py | 8 ++++++++ tests/test_pymongo.py | 22 ++++++++++++++++++++++ 4 files changed, 55 insertions(+) create mode 100644 instana/instrumentation/pymongo.py create mode 100644 tests/test_pymongo.py diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py new file mode 100644 index 00000000..b0ae61ec --- /dev/null +++ b/instana/instrumentation/pymongo.py @@ -0,0 +1,24 @@ +from __future__ import absolute_import + +from ..log import logger + +try: + import pymongo + from pymongo import monitoring + + class MongoCommandTracer(monitoring.CommandListener): + + def started(self, event): + logger.debug("Command {0.command_name}({0.command}) with request id {0.request_id} started on server {0.connection_id}".format(event)) + + def succeeded(self, event): + logger.debug("Command {0.command_name} with request id {0.request_id} succeeded on server {0.connection_id}".format(event)) + + def failed(self, event): + logger.debug("Command {0.command_name} with request id {0.request_id} failed on server {0.connection_id}".format(event)) + + logger.debug("Instrumenting pymongo") + +except ImportError: + pass + diff --git a/setup.py b/setup.py index 2800675c..45bbbda7 100644 --- a/setup.py +++ b/setup.py @@ -83,6 +83,7 @@ def check_setuptools(): 'pyOpenSSL>=16.1.0;python_version<="2.7"', 'pytest>=3.0.1', 'psycopg2>=2.7.1', + 'pymongo>=3.7.0', 'redis>3.0.0', 'requests>=2.17.1', 'sqlalchemy>=1.1.15', diff --git a/tests/helpers.py b/tests/helpers.py index 2aba684e..c6e7c418 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -48,6 +48,14 @@ testenv['redis_host'] = os.environ.get('REDIS_HOST', '127.0.0.1') +""" +MongoDB Environment +""" +testenv['mongodb_host'] = os.environ.get('MONGO_HOST', '127.0.0.1') +testenv['mongodb_port'] = os.environ.get('MONGO_PORT', '27017') +testenv['mongodb_user'] = os.environ.get('MONGO_USER', None) +testenv['mongodb_pw'] = os.environ.get('MONGO_PW', None) + def get_first_span_by_name(spans, name): for span in spans: if span.n == name: diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py new file mode 100644 index 00000000..003ec390 --- /dev/null +++ b/tests/test_pymongo.py @@ -0,0 +1,22 @@ +from __future__ import absolute_import + +import logging + +from .helpers import testenv + +import pymongo + +logger = logging.getLogger(__name__) + +class TestPyMongo: + def setUp(self): + logger.warn("Connecting to MongoDB mongo://%s:@%s:%s", + testenv['mongodb_user'], testenv['mongodb_host'], testenv['mongodb_port']) + self.conn = pymongo.MongoClient(host=testenv['mongodb_host'], port=int(testenv['mongodb_port']), + username=testenv['mongodb_user'], password=testenv['mongodb_pw']) + + def tearDown(self): + return None + + def test_basic(self): + pass From 4d02df35a1193e3086c6c0d254709914b28741be Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Tue, 11 Feb 2020 17:20:57 +0100 Subject: [PATCH 03/12] Register MongoCommandTracer as a global event handler on startup --- instana/__init__.py | 1 + instana/instrumentation/pymongo.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/instana/__init__.py b/instana/__init__.py index 5cacd563..ecb4e162 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -82,6 +82,7 @@ def boot_agent(): from .instrumentation import sudsjurko from .instrumentation import urllib3 from .instrumentation.django import middleware + from .instrumentation import pymongo # Hooks from .hooks import hook_uwsgi diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py index b0ae61ec..2a71a867 100644 --- a/instana/instrumentation/pymongo.py +++ b/instana/instrumentation/pymongo.py @@ -17,6 +17,8 @@ def succeeded(self, event): def failed(self, event): logger.debug("Command {0.command_name} with request id {0.request_id} failed on server {0.connection_id}".format(event)) + monitoring.register(MongoCommandTracer()) + logger.debug("Instrumenting pymongo") except ImportError: From a103bb63929caf488c92cf8d02d55e210c9c5793 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Wed, 12 Feb 2020 13:14:28 +0100 Subject: [PATCH 04/12] Preserve MongoDB command within tracer until execution is complete --- instana/instrumentation/pymongo.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py index 2a71a867..9df1778e 100644 --- a/instana/instrumentation/pymongo.py +++ b/instana/instrumentation/pymongo.py @@ -6,17 +6,33 @@ import pymongo from pymongo import monitoring + class MongoCommand: + def __init__(self, name, query): + self.name = name + self.query = query + class MongoCommandTracer(monitoring.CommandListener): + def __init__(self): + self.__active_commands = {} def started(self, event): + self.__active_commands[event.request_id] = MongoCommand(event.command_name, event.command) logger.debug("Command {0.command_name}({0.command}) with request id {0.request_id} started on server {0.connection_id}".format(event)) def succeeded(self, event): - logger.debug("Command {0.command_name} with request id {0.request_id} succeeded on server {0.connection_id}".format(event)) + try: + logger.debug("Command {0.command_name}({1.query}) with request id {0.request_id} succeeded on server {0.connection_id}".format(event, self.__active_commands[event.request_id])) + del self.__active_commands[event.request_id] + except KeyError: + logger.warn("Request {} was not found in the requests list".format(event.request_id)) def failed(self, event): - logger.debug("Command {0.command_name} with request id {0.request_id} failed on server {0.connection_id}".format(event)) - + try: + logger.debug("Command {0.command_name}({1.query}) with request id {0.request_id} failed on server {0.connection_id}".format(event, self.__active_commands[event.request_id])) + del self.__active_commands[event.request_id] + except KeyError: + logger.warn("Request {} was not found in the requests list".format(event.request_id)) + monitoring.register(MongoCommandTracer()) logger.debug("Instrumenting pymongo") From c2aa55acd13f4026352f87991a54473abadd7f2f Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Wed, 12 Feb 2020 16:57:40 +0100 Subject: [PATCH 05/12] Register mongo as an exit span --- instana/recorder.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instana/recorder.py b/instana/recorder.py index 61d220d8..bd7d7ba7 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -25,15 +25,16 @@ class InstanaRecorder(SpanRecorder): THREAD_NAME = "Instana Span Reporting" registered_spans = ("aiohttp-client", "aiohttp-server", "cassandra", "couchbase", "django", "log", - "memcache", "mysql", "postgres", "rabbitmq", "redis", "render", "rpc-client", + "memcache", "mongo", "mysql", "postgres", "rabbitmq", "redis", "render", "rpc-client", "rpc-server", "sqlalchemy", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") http_spans = ("aiohttp-client", "aiohttp-server", "django", "http", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") - exit_spans = ("aiohttp-client", "cassandra", "couchbase", "log", "memcache", "mysql", "postgres", - "rabbitmq", "redis", "rpc-client", "sqlalchemy", "soap", "tornado-client", "urllib3") + exit_spans = ("aiohttp-client", "cassandra", "couchbase", "log", "memcache", "mongo", "mysql", "postgres", + "rabbitmq", "redis", "rpc-client", "sqlalchemy", "soap", "tornado-client", "urllib3", + "pymongo") entry_spans = ("aiohttp-server", "django", "wsgi", "rabbitmq", "rpc-server", "tornado-server") From 14601500ed545b6ea5e4004de4df1c14c9cf5c2b Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Wed, 12 Feb 2020 17:54:05 +0100 Subject: [PATCH 06/12] Initiate a new span each time a command is sent to MongoDB --- instana/instrumentation/pymongo.py | 61 ++++++--- tests/test_pymongo.py | 199 ++++++++++++++++++++++++++++- 2 files changed, 237 insertions(+), 23 deletions(-) diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py index 9df1778e..b91bff0c 100644 --- a/instana/instrumentation/pymongo.py +++ b/instana/instrumentation/pymongo.py @@ -1,42 +1,61 @@ from __future__ import absolute_import from ..log import logger +from ..singletons import tracer try: import pymongo from pymongo import monitoring - class MongoCommand: - def __init__(self, name, query): - self.name = name - self.query = query - class MongoCommandTracer(monitoring.CommandListener): def __init__(self): self.__active_commands = {} def started(self, event): - self.__active_commands[event.request_id] = MongoCommand(event.command_name, event.command) - logger.debug("Command {0.command_name}({0.command}) with request id {0.request_id} started on server {0.connection_id}".format(event)) + parent_span = tracer.active_span + + # return early if we're not tracing + if parent_span is None: + return + + with tracer.start_active_span("mongo", child_of=parent_span) as scope: + self._collect_tags(scope.span, event) + + scope.span.set_tag("db", event.database_name) + scope.span.set_tag("command", event.command_name) + + # include collection name into the namespace if provided + if event.command.has_key(event.command_name): + scope.span.set_tag("collection", event.command.get(event.command_name)) + + self.__active_commands[event.request_id] = scope def succeeded(self, event): - try: - logger.debug("Command {0.command_name}({1.query}) with request id {0.request_id} succeeded on server {0.connection_id}".format(event, self.__active_commands[event.request_id])) - del self.__active_commands[event.request_id] - except KeyError: - logger.warn("Request {} was not found in the requests list".format(event.request_id)) + active_span = self.__active_commands.pop(event.request_id, None) + + # return early if we're not tracing + if active_span is None: + return def failed(self, event): - try: - logger.debug("Command {0.command_name}({1.query}) with request id {0.request_id} failed on server {0.connection_id}".format(event, self.__active_commands[event.request_id])) - del self.__active_commands[event.request_id] - except KeyError: - logger.warn("Request {} was not found in the requests list".format(event.request_id)) - + active_span = self.__active_commands.pop(event.request_id, None) + + # return early if we're not tracing + if active_span is None: + return + + active_span.log_exception(event.failure) + + def _collect_tags(self, span, event): + (host, port) = event.connection_id + + span.set_tag("driver", "pymongo") + span.set_tag("host", host) + span.set_tag("port", str(port)) + monitoring.register(MongoCommandTracer()) - + logger.debug("Instrumenting pymongo") - + except ImportError: pass - diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py index 003ec390..85413f48 100644 --- a/tests/test_pymongo.py +++ b/tests/test_pymongo.py @@ -2,9 +2,15 @@ import logging +from nose.tools import (assert_equals, assert_not_equals, assert_is_none, assert_is_not_none, + assert_false, assert_true, assert_is_instance, assert_greater, assert_list_equal) + from .helpers import testenv +from instana.singletons import tracer +from instana.util import to_json import pymongo +import bson logger = logging.getLogger(__name__) @@ -12,11 +18,200 @@ class TestPyMongo: def setUp(self): logger.warn("Connecting to MongoDB mongo://%s:@%s:%s", testenv['mongodb_user'], testenv['mongodb_host'], testenv['mongodb_port']) + self.conn = pymongo.MongoClient(host=testenv['mongodb_host'], port=int(testenv['mongodb_port']), username=testenv['mongodb_user'], password=testenv['mongodb_pw']) + self.conn.test.records.delete_many(filter={}) + + self.recorder = tracer.recorder + self.recorder.clear_spans() def tearDown(self): return None - def test_basic(self): - pass + def test_successful_find_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.find_one({"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.custom.tags["driver"], "pymongo") + assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) + assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) + assert_equals(db_span.data.custom.tags["db"], "test") + assert_equals(db_span.data.custom.tags["collection"], "records") + assert_equals(db_span.data.custom.tags["command"], "find") + + def test_successful_insert_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.insert_one({"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.custom.tags["driver"], "pymongo") + assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) + assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) + assert_equals(db_span.data.custom.tags["db"], "test") + assert_equals(db_span.data.custom.tags["collection"], "records") + assert_equals(db_span.data.custom.tags["command"], "insert") + + def test_successful_update_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.update_one({"type": "string"}, {"$set": {"type": "int"}}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.custom.tags["driver"], "pymongo") + assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) + assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) + assert_equals(db_span.data.custom.tags["db"], "test") + assert_equals(db_span.data.custom.tags["collection"], "records") + assert_equals(db_span.data.custom.tags["command"], "update") + + def test_successful_delete_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.delete_one(filter={"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.custom.tags["driver"], "pymongo") + assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) + assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) + assert_equals(db_span.data.custom.tags["db"], "test") + assert_equals(db_span.data.custom.tags["collection"], "records") + assert_equals(db_span.data.custom.tags["command"], "delete") + + def test_successful_aggregate_query(self): + with tracer.start_active_span("test"): + self.conn.test.records.count_documents({"type": "string"}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.custom.tags["driver"], "pymongo") + assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) + assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) + assert_equals(db_span.data.custom.tags["db"], "test") + assert_equals(db_span.data.custom.tags["collection"], "records") + assert_equals(db_span.data.custom.tags["command"], "aggregate") + + def test_successful_map_reduce_query(self): + mapper = bson.code.Code("function () { this.tags.forEach(function(z) { emit(z, 1); }); }") + reducer = bson.code.Code("function (key, values) { return len(values); }") + + with tracer.start_active_span("test"): + self.conn.test.records.map_reduce(mapper, reducer, "results", query={"x": {"$lt": 2}}) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 2) + + db_span = spans[0] + test_span = spans[1] + + assert_equals(test_span.t, db_span.t) + assert_equals(db_span.p, test_span.s) + + assert_false(db_span.error) + assert_is_none(db_span.ec) + + assert_equals(db_span.n, "mongo") + assert_equals(db_span.data.custom.tags["driver"], "pymongo") + assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) + assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) + assert_equals(db_span.data.custom.tags["db"], "test") + assert_equals(db_span.data.custom.tags["collection"], "records") + assert_equals(db_span.data.custom.tags["command"], "mapreduce") + + def test_successful_mutiple_queries(self): + with tracer.start_active_span("test"): + self.conn.test.records.bulk_write([pymongo.InsertOne({"type": "string"}), + pymongo.UpdateOne({"type": "string"}, {"$set": {"type": "int"}}), + pymongo.DeleteOne({"type": "string"})]) + + assert_is_none(tracer.active_span) + + spans = self.recorder.queued_spans() + assert_equals(len(spans), 4) + + test_span = spans.pop() + + seen_span_ids = set() + commands = [] + for span in spans: + assert_equals(test_span.t, span.t) + assert_equals(span.p, test_span.s) + + # check if all spans got a unique id + assert_false(span.s in seen_span_ids) + + seen_span_ids.add(span.s) + commands.append(span.data.custom.tags["command"]) + + # ensure spans are ordered the same way as commands + assert_list_equal(commands, ["insert", "update", "delete"]) From 5edf6f8ec6b9bf00364cd794e7da4b24ccd5c6bc Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Thu, 13 Feb 2020 12:21:52 +0100 Subject: [PATCH 07/12] Attach mongo command json to the span --- instana/instrumentation/pymongo.py | 36 +++++++++++++++++++++++++----- tests/test_pymongo.py | 10 +++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py index b91bff0c..5e0573ac 100644 --- a/instana/instrumentation/pymongo.py +++ b/instana/instrumentation/pymongo.py @@ -19,10 +19,8 @@ def started(self, event): return with tracer.start_active_span("mongo", child_of=parent_span) as scope: - self._collect_tags(scope.span, event) - - scope.span.set_tag("db", event.database_name) - scope.span.set_tag("command", event.command_name) + self._collect_connection_tags(scope.span, event) + self._collect_command_tags(scope.span, event) # include collection name into the namespace if provided if event.command.has_key(event.command_name): @@ -46,12 +44,40 @@ def failed(self, event): active_span.log_exception(event.failure) - def _collect_tags(self, span, event): + def _collect_connection_tags(self, span, event): (host, port) = event.connection_id span.set_tag("driver", "pymongo") span.set_tag("host", host) span.set_tag("port", str(port)) + span.set_tag("db", event.database_name) + + def _collect_command_tags(self, span, event): + """ + Extract MongoDB command name and arguments and attach it to the span + """ + cmd = event.command_name + span.set_tag("command", cmd) + + if cmd == "find": + span.set_tag("filter", event.command.get("filter")) + elif cmd == "insert": + span.set_tag("json", event.command.get("documents")) + elif cmd == "update": + span.set_tag("json", event.command.get("updates")) + elif cmd == "delete": + span.set_tag("json", event.command.get("deletes")) + elif cmd == "aggregate": + span.set_tag("json", event.command.get("pipeline")) + elif cmd == "mapreduce": + data = { + "map": event.command.get("map"), + "reduce": event.command.get("reduce") + } + if event.command.has_key("query"): + data["query"] = event.command.get("query") + + span.set_tag("json", data) monitoring.register(MongoCommandTracer()) diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py index 85413f48..a4ffdf43 100644 --- a/tests/test_pymongo.py +++ b/tests/test_pymongo.py @@ -54,6 +54,8 @@ def test_successful_find_query(self): assert_equals(db_span.data.custom.tags["db"], "test") assert_equals(db_span.data.custom.tags["collection"], "records") assert_equals(db_span.data.custom.tags["command"], "find") + assert_equals(db_span.data.custom.tags["filter"], {"type": "string"}) + assert_false("json" in db_span.data.custom.tags) def test_successful_insert_query(self): with tracer.start_active_span("test"): @@ -80,6 +82,7 @@ def test_successful_insert_query(self): assert_equals(db_span.data.custom.tags["db"], "test") assert_equals(db_span.data.custom.tags["collection"], "records") assert_equals(db_span.data.custom.tags["command"], "insert") + assert_false("filter" in db_span.data.custom.tags) def test_successful_update_query(self): with tracer.start_active_span("test"): @@ -106,6 +109,8 @@ def test_successful_update_query(self): assert_equals(db_span.data.custom.tags["db"], "test") assert_equals(db_span.data.custom.tags["collection"], "records") assert_equals(db_span.data.custom.tags["command"], "update") + assert_false("filter" in db_span.data.custom.tags) + assert_equals(db_span.data.custom.tags["json"], [{"q": {"type": "string"}, "u": {"$set": {"type": "int"}}, "multi": False, "upsert": False}]) def test_successful_delete_query(self): with tracer.start_active_span("test"): @@ -132,6 +137,8 @@ def test_successful_delete_query(self): assert_equals(db_span.data.custom.tags["db"], "test") assert_equals(db_span.data.custom.tags["collection"], "records") assert_equals(db_span.data.custom.tags["command"], "delete") + assert_false("filter" in db_span.data.custom.tags) + assert_equals(db_span.data.custom.tags["json"], [{"q": {"type": "string"}, "limit": 1}]) def test_successful_aggregate_query(self): with tracer.start_active_span("test"): @@ -158,6 +165,8 @@ def test_successful_aggregate_query(self): assert_equals(db_span.data.custom.tags["db"], "test") assert_equals(db_span.data.custom.tags["collection"], "records") assert_equals(db_span.data.custom.tags["command"], "aggregate") + assert_false("filter" in db_span.data.custom.tags) + assert_equals(db_span.data.custom.tags["json"], [{'$match': {'type': 'string'}}, {'$group': {'_id': None, 'n': {'$sum': 1}}}]) def test_successful_map_reduce_query(self): mapper = bson.code.Code("function () { this.tags.forEach(function(z) { emit(z, 1); }); }") @@ -187,6 +196,7 @@ def test_successful_map_reduce_query(self): assert_equals(db_span.data.custom.tags["db"], "test") assert_equals(db_span.data.custom.tags["collection"], "records") assert_equals(db_span.data.custom.tags["command"], "mapreduce") + assert_equals(db_span.data.custom.tags["json"], {"map": mapper, "reduce": reducer, "query": {"x": {"$lt": 2}}}) def test_successful_mutiple_queries(self): with tracer.start_active_span("test"): From d23e51c77b6eecd976082e388574c43b53c9b606 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Thu, 13 Feb 2020 14:24:32 +0100 Subject: [PATCH 08/12] Add MongoDBData span type --- instana/json_span.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/instana/json_span.py b/instana/json_span.py index 43fccdf4..4a569663 100644 --- a/instana/json_span.py +++ b/instana/json_span.py @@ -97,6 +97,15 @@ class MySQLData(BaseSpan): error = None +class MongoDBData(BaseSpan): + service = None + namespace = None + command = None + filter = None + json = None + error = None + + class PostgresData(BaseSpan): db = None host = None From 312c4640e1cd2f999e4a6f0bb28b0a73b3b09b2b Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Thu, 13 Feb 2020 14:43:40 +0100 Subject: [PATCH 09/12] Send mongo spans and MongoDBData --- instana/recorder.py | 12 ++++++- tests/test_pymongo.py | 76 +++++++++++++++++-------------------------- 2 files changed, 40 insertions(+), 48 deletions(-) diff --git a/instana/recorder.py b/instana/recorder.py index bd7d7ba7..eb8c8d70 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -10,7 +10,7 @@ import instana.singletons from .json_span import (CassandraData, CouchbaseData, CustomData, Data, HttpData, JsonSpan, LogData, - MySQLData, PostgresData, RabbitmqData, RedisData, RenderData, + MongoDBData, MySQLData, PostgresData, RabbitmqData, RedisData, RenderData, RPCData, SDKData, SoapData, SQLAlchemyData) from .log import logger @@ -238,6 +238,16 @@ def build_registered_span(self, span): tskey = list(data.custom.logs.keys())[0] data.pg.error = data.custom.logs[tskey]['message'] + elif span.operation_name == "mongo": + service = "%s:%s" % (span.tags.pop('host', None), span.tags.pop('port', None)) + namespace = "%s.%s" % (span.tags.pop('db', "?"), span.tags.pop('collection', "?")) + data.mongo = MongoDBData(service=service, + namespace=namespace, + command=span.tags.pop('command', None), + filter=span.tags.pop('filter', None), + json=span.tags.pop('json', None), + error=span.tags.pop('command', None)) + elif span.operation_name == "log": data.log = {} # use last special key values diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py index a4ffdf43..bf7f1c97 100644 --- a/tests/test_pymongo.py +++ b/tests/test_pymongo.py @@ -48,14 +48,11 @@ def test_successful_find_query(self): assert_is_none(db_span.ec) assert_equals(db_span.n, "mongo") - assert_equals(db_span.data.custom.tags["driver"], "pymongo") - assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) - assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) - assert_equals(db_span.data.custom.tags["db"], "test") - assert_equals(db_span.data.custom.tags["collection"], "records") - assert_equals(db_span.data.custom.tags["command"], "find") - assert_equals(db_span.data.custom.tags["filter"], {"type": "string"}) - assert_false("json" in db_span.data.custom.tags) + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "find") + assert_equals(db_span.data.mongo.filter, {"type": "string"}) + assert_is_none(db_span.data.mongo.json) def test_successful_insert_query(self): with tracer.start_active_span("test"): @@ -76,13 +73,10 @@ def test_successful_insert_query(self): assert_is_none(db_span.ec) assert_equals(db_span.n, "mongo") - assert_equals(db_span.data.custom.tags["driver"], "pymongo") - assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) - assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) - assert_equals(db_span.data.custom.tags["db"], "test") - assert_equals(db_span.data.custom.tags["collection"], "records") - assert_equals(db_span.data.custom.tags["command"], "insert") - assert_false("filter" in db_span.data.custom.tags) + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "insert") + assert_is_none(db_span.data.mongo.filter) def test_successful_update_query(self): with tracer.start_active_span("test"): @@ -103,14 +97,11 @@ def test_successful_update_query(self): assert_is_none(db_span.ec) assert_equals(db_span.n, "mongo") - assert_equals(db_span.data.custom.tags["driver"], "pymongo") - assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) - assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) - assert_equals(db_span.data.custom.tags["db"], "test") - assert_equals(db_span.data.custom.tags["collection"], "records") - assert_equals(db_span.data.custom.tags["command"], "update") - assert_false("filter" in db_span.data.custom.tags) - assert_equals(db_span.data.custom.tags["json"], [{"q": {"type": "string"}, "u": {"$set": {"type": "int"}}, "multi": False, "upsert": False}]) + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "update") + assert_is_none(db_span.data.mongo.filter) + assert_equals(db_span.data.mongo.json, [{"q": {"type": "string"}, "u": {"$set": {"type": "int"}}, "multi": False, "upsert": False}]) def test_successful_delete_query(self): with tracer.start_active_span("test"): @@ -131,14 +122,11 @@ def test_successful_delete_query(self): assert_is_none(db_span.ec) assert_equals(db_span.n, "mongo") - assert_equals(db_span.data.custom.tags["driver"], "pymongo") - assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) - assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) - assert_equals(db_span.data.custom.tags["db"], "test") - assert_equals(db_span.data.custom.tags["collection"], "records") - assert_equals(db_span.data.custom.tags["command"], "delete") - assert_false("filter" in db_span.data.custom.tags) - assert_equals(db_span.data.custom.tags["json"], [{"q": {"type": "string"}, "limit": 1}]) + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "delete") + assert_is_none(db_span.data.mongo.filter) + assert_equals(db_span.data.mongo.json, [{"q": {"type": "string"}, "limit": 1}]) def test_successful_aggregate_query(self): with tracer.start_active_span("test"): @@ -159,14 +147,11 @@ def test_successful_aggregate_query(self): assert_is_none(db_span.ec) assert_equals(db_span.n, "mongo") - assert_equals(db_span.data.custom.tags["driver"], "pymongo") - assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) - assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) - assert_equals(db_span.data.custom.tags["db"], "test") - assert_equals(db_span.data.custom.tags["collection"], "records") - assert_equals(db_span.data.custom.tags["command"], "aggregate") - assert_false("filter" in db_span.data.custom.tags) - assert_equals(db_span.data.custom.tags["json"], [{'$match': {'type': 'string'}}, {'$group': {'_id': None, 'n': {'$sum': 1}}}]) + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "aggregate") + assert_is_none(db_span.data.mongo.filter) + assert_equals(db_span.data.mongo.json, [{'$match': {'type': 'string'}}, {'$group': {'_id': None, 'n': {'$sum': 1}}}]) def test_successful_map_reduce_query(self): mapper = bson.code.Code("function () { this.tags.forEach(function(z) { emit(z, 1); }); }") @@ -190,13 +175,10 @@ def test_successful_map_reduce_query(self): assert_is_none(db_span.ec) assert_equals(db_span.n, "mongo") - assert_equals(db_span.data.custom.tags["driver"], "pymongo") - assert_equals(db_span.data.custom.tags["host"], testenv['mongodb_host']) - assert_equals(db_span.data.custom.tags["port"], testenv['mongodb_port']) - assert_equals(db_span.data.custom.tags["db"], "test") - assert_equals(db_span.data.custom.tags["collection"], "records") - assert_equals(db_span.data.custom.tags["command"], "mapreduce") - assert_equals(db_span.data.custom.tags["json"], {"map": mapper, "reduce": reducer, "query": {"x": {"$lt": 2}}}) + assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) + assert_equals(db_span.data.mongo.namespace, "test.records") + assert_equals(db_span.data.mongo.command, "mapreduce") + assert_equals(db_span.data.mongo.json, {"map": mapper, "reduce": reducer, "query": {"x": {"$lt": 2}}}) def test_successful_mutiple_queries(self): with tracer.start_active_span("test"): @@ -221,7 +203,7 @@ def test_successful_mutiple_queries(self): assert_false(span.s in seen_span_ids) seen_span_ids.add(span.s) - commands.append(span.data.custom.tags["command"]) + commands.append(span.data.mongo.command) # ensure spans are ordered the same way as commands assert_list_equal(commands, ["insert", "update", "delete"]) From 94eb251f2f17b22dfbf702c669d5e2559d8a9586 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Fri, 14 Feb 2020 13:06:07 +0100 Subject: [PATCH 10/12] Send mongo.json and mongo.filter span tags as JSON strings --- instana/instrumentation/pymongo.py | 40 ++++++++++------- tests/test_pymongo.py | 72 ++++++++++++++++++++---------- 2 files changed, 73 insertions(+), 39 deletions(-) diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py index 5e0573ac..9a0e8eee 100644 --- a/instana/instrumentation/pymongo.py +++ b/instana/instrumentation/pymongo.py @@ -6,6 +6,7 @@ try: import pymongo from pymongo import monitoring + from bson import json_util class MongoCommandTracer(monitoring.CommandListener): def __init__(self): @@ -58,26 +59,33 @@ def _collect_command_tags(self, span, event): """ cmd = event.command_name span.set_tag("command", cmd) - - if cmd == "find": - span.set_tag("filter", event.command.get("filter")) - elif cmd == "insert": - span.set_tag("json", event.command.get("documents")) - elif cmd == "update": - span.set_tag("json", event.command.get("updates")) - elif cmd == "delete": - span.set_tag("json", event.command.get("deletes")) - elif cmd == "aggregate": - span.set_tag("json", event.command.get("pipeline")) + + for key in ["filter", "query"]: + if event.command.has_key(key): + span.set_tag("filter", json_util.dumps(event.command.get(key))) + break + + # The location of command documents within the command object depends on the name + # of this command. This is the name -> command object key mapping + cmd_doc_locations = { + "insert": "documents", + "update": "updates", + "delete": "deletes", + "aggregate": "pipeline" + } + + cmd_doc = None + if cmd in cmd_doc_locations: + cmd_doc = event.command.get(cmd_doc_locations[cmd]) elif cmd == "mapreduce": - data = { + # mapreduce command consists of two mandatory parts: map and reduce + cmd_doc = { "map": event.command.get("map"), "reduce": event.command.get("reduce") } - if event.command.has_key("query"): - data["query"] = event.command.get("query") - - span.set_tag("json", data) + + if cmd_doc is not None: + span.set_tag("json", json_util.dumps(cmd_doc)) monitoring.register(MongoCommandTracer()) diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py index bf7f1c97..26e8f531 100644 --- a/tests/test_pymongo.py +++ b/tests/test_pymongo.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import logging +import json from nose.tools import (assert_equals, assert_not_equals, assert_is_none, assert_is_not_none, assert_false, assert_true, assert_is_instance, assert_greater, assert_list_equal) @@ -32,7 +33,7 @@ def tearDown(self): def test_successful_find_query(self): with tracer.start_active_span("test"): self.conn.test.records.find_one({"type": "string"}) - + assert_is_none(tracer.active_span) spans = self.recorder.queued_spans() @@ -43,7 +44,7 @@ def test_successful_find_query(self): assert_equals(test_span.t, db_span.t) assert_equals(db_span.p, test_span.s) - + assert_false(db_span.error) assert_is_none(db_span.ec) @@ -51,13 +52,14 @@ def test_successful_find_query(self): assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") assert_equals(db_span.data.mongo.command, "find") - assert_equals(db_span.data.mongo.filter, {"type": "string"}) + + assert_equals(db_span.data.mongo.filter, '{"type": "string"}') assert_is_none(db_span.data.mongo.json) def test_successful_insert_query(self): with tracer.start_active_span("test"): self.conn.test.records.insert_one({"type": "string"}) - + assert_is_none(tracer.active_span) spans = self.recorder.queued_spans() @@ -68,7 +70,7 @@ def test_successful_insert_query(self): assert_equals(test_span.t, db_span.t) assert_equals(db_span.p, test_span.s) - + assert_false(db_span.error) assert_is_none(db_span.ec) @@ -76,12 +78,13 @@ def test_successful_insert_query(self): assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") assert_equals(db_span.data.mongo.command, "insert") + assert_is_none(db_span.data.mongo.filter) def test_successful_update_query(self): with tracer.start_active_span("test"): self.conn.test.records.update_one({"type": "string"}, {"$set": {"type": "int"}}) - + assert_is_none(tracer.active_span) spans = self.recorder.queued_spans() @@ -92,7 +95,7 @@ def test_successful_update_query(self): assert_equals(test_span.t, db_span.t) assert_equals(db_span.p, test_span.s) - + assert_false(db_span.error) assert_is_none(db_span.ec) @@ -100,13 +103,22 @@ def test_successful_update_query(self): assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") assert_equals(db_span.data.mongo.command, "update") + assert_is_none(db_span.data.mongo.filter) - assert_equals(db_span.data.mongo.json, [{"q": {"type": "string"}, "u": {"$set": {"type": "int"}}, "multi": False, "upsert": False}]) + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_true({ + "q": {"type": "string"}, + "u": {"$set": {"type": "int"}}, + "multi": False, + "upsert": False + } in payload, db_span.data.mongo.json) def test_successful_delete_query(self): with tracer.start_active_span("test"): self.conn.test.records.delete_one(filter={"type": "string"}) - + assert_is_none(tracer.active_span) spans = self.recorder.queued_spans() @@ -117,7 +129,7 @@ def test_successful_delete_query(self): assert_equals(test_span.t, db_span.t) assert_equals(db_span.p, test_span.s) - + assert_false(db_span.error) assert_is_none(db_span.ec) @@ -125,13 +137,17 @@ def test_successful_delete_query(self): assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") assert_equals(db_span.data.mongo.command, "delete") + assert_is_none(db_span.data.mongo.filter) - assert_equals(db_span.data.mongo.json, [{"q": {"type": "string"}, "limit": 1}]) + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_true({"q": {"type": "string"}, "limit": 1} in payload, db_span.data.mongo.json) def test_successful_aggregate_query(self): with tracer.start_active_span("test"): self.conn.test.records.count_documents({"type": "string"}) - + assert_is_none(tracer.active_span) spans = self.recorder.queued_spans() @@ -142,7 +158,7 @@ def test_successful_aggregate_query(self): assert_equals(test_span.t, db_span.t) assert_equals(db_span.p, test_span.s) - + assert_false(db_span.error) assert_is_none(db_span.ec) @@ -150,16 +166,20 @@ def test_successful_aggregate_query(self): assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") assert_equals(db_span.data.mongo.command, "aggregate") + assert_is_none(db_span.data.mongo.filter) - assert_equals(db_span.data.mongo.json, [{'$match': {'type': 'string'}}, {'$group': {'_id': None, 'n': {'$sum': 1}}}]) + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_true({"$match": {"type": "string"}} in payload, db_span.data.mongo.json) def test_successful_map_reduce_query(self): - mapper = bson.code.Code("function () { this.tags.forEach(function(z) { emit(z, 1); }); }") - reducer = bson.code.Code("function (key, values) { return len(values); }") - + mapper = "function () { this.tags.forEach(function(z) { emit(z, 1); }); }" + reducer = "function (key, values) { return len(values); }" + with tracer.start_active_span("test"): - self.conn.test.records.map_reduce(mapper, reducer, "results", query={"x": {"$lt": 2}}) - + self.conn.test.records.map_reduce(bson.code.Code(mapper), bson.code.Code(reducer), "results", query={"x": {"$lt": 2}}) + assert_is_none(tracer.active_span) spans = self.recorder.queued_spans() @@ -170,7 +190,7 @@ def test_successful_map_reduce_query(self): assert_equals(test_span.t, db_span.t) assert_equals(db_span.p, test_span.s) - + assert_false(db_span.error) assert_is_none(db_span.ec) @@ -178,7 +198,13 @@ def test_successful_map_reduce_query(self): assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") assert_equals(db_span.data.mongo.command, "mapreduce") - assert_equals(db_span.data.mongo.json, {"map": mapper, "reduce": reducer, "query": {"x": {"$lt": 2}}}) + + assert_equals(db_span.data.mongo.filter, '{"x": {"$lt": 2}}') + assert_is_not_none(db_span.data.mongo.json) + + payload = json.loads(db_span.data.mongo.json) + assert_equals(payload["map"], {"$code": mapper}, db_span.data.mongo.json) + assert_equals(payload["reduce"], {"$code": reducer}, db_span.data.mongo.json) def test_successful_mutiple_queries(self): with tracer.start_active_span("test"): @@ -192,7 +218,7 @@ def test_successful_mutiple_queries(self): assert_equals(len(spans), 4) test_span = spans.pop() - + seen_span_ids = set() commands = [] for span in spans: @@ -201,7 +227,7 @@ def test_successful_mutiple_queries(self): # check if all spans got a unique id assert_false(span.s in seen_span_ids) - + seen_span_ids.add(span.s) commands.append(span.data.mongo.command) From b95b29e4e87720fd476fb612f5e900b9f47b4b75 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Fri, 14 Feb 2020 14:23:09 +0100 Subject: [PATCH 11/12] Address mapreduce command case change in pymongo-3.9.0+ --- instana/instrumentation/pymongo.py | 2 +- tests/test_pymongo.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instana/instrumentation/pymongo.py b/instana/instrumentation/pymongo.py index 9a0e8eee..6c2ada3f 100644 --- a/instana/instrumentation/pymongo.py +++ b/instana/instrumentation/pymongo.py @@ -77,7 +77,7 @@ def _collect_command_tags(self, span, event): cmd_doc = None if cmd in cmd_doc_locations: cmd_doc = event.command.get(cmd_doc_locations[cmd]) - elif cmd == "mapreduce": + elif cmd.lower() == "mapreduce": # mapreduce command was renamed to mapReduce in pymongo 3.9.0 # mapreduce command consists of two mandatory parts: map and reduce cmd_doc = { "map": event.command.get("map"), diff --git a/tests/test_pymongo.py b/tests/test_pymongo.py index 26e8f531..5324f6f0 100644 --- a/tests/test_pymongo.py +++ b/tests/test_pymongo.py @@ -197,7 +197,7 @@ def test_successful_map_reduce_query(self): assert_equals(db_span.n, "mongo") assert_equals(db_span.data.mongo.service, "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) assert_equals(db_span.data.mongo.namespace, "test.records") - assert_equals(db_span.data.mongo.command, "mapreduce") + assert_equals(db_span.data.mongo.command.lower(), "mapreduce") # mapreduce command was renamed to mapReduce in pymongo 3.9.0 assert_equals(db_span.data.mongo.filter, '{"x": {"$lt": 2}}') assert_is_not_none(db_span.data.mongo.json) From 8e0309a39fccda8aeda7d6946738d52c1ddf9325 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Fri, 14 Feb 2020 14:50:30 +0100 Subject: [PATCH 12/12] Add myself to the contributors list and update the copyright year --- instana/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instana/__init__.py b/instana/__init__.py index ecb4e162..2cb79f56 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -25,8 +25,8 @@ import pkg_resources __author__ = 'Instana Inc.' -__copyright__ = 'Copyright 2019 Instana Inc.' -__credits__ = ['Pavlo Baron', 'Peter Giacomo Lombardo'] +__copyright__ = 'Copyright 2020 Instana Inc.' +__credits__ = ['Pavlo Baron', 'Peter Giacomo Lombardo', 'Andrey Slotin'] __license__ = 'MIT' __maintainer__ = 'Peter Giacomo Lombardo' __email__ = 'peter.lombardo@instana.com'