Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update asyncpg instrumentation to follow semantic conventions #188

Merged
merged 13 commits into from
Nov 24, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Update asyncpg instrumentation to follow semantic conventions
([#188](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/188))

## Version 0.12b0

Released 2020-08-14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,24 @@


def _hydrate_span_from_args(connection, query, parameters) -> dict:
span_attributes = {"db.type": "sql"}
span_attributes = {"db.system": "postgresql"}

params = getattr(connection, "_params", None)
span_attributes["db.instance"] = getattr(params, "database", None)
span_attributes["db.user"] = getattr(params, "user", None)
dbname = getattr(params, "database", None)
if dbname:
span_attributes["db.name"] = dbname
user = getattr(params, "user", None)
if user:
span_attributes["db.user"] = user

addr = getattr(connection, "_addr", None)
if isinstance(addr, tuple):
span_attributes["net.peer.name"] = addr[0]
span_attributes["net.peer.ip"] = addr[1]
span_attributes["net.transport"] = "IP.TCP"
elif isinstance(addr, str):
span_attributes["net.peer.name"] = addr
span_attributes["net.transport"] = "Unix"
Comment on lines +69 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Are there any docs explaining how to parse these out? I think they could be useful to add to the function doc string for _hydrate_span_from_args

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connection contains _addr attribute which is either a host/port tuple, or Unix socket string. May be something like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in a467449


if query is not None:
span_attributes["db.statement"] = query
Expand Down Expand Up @@ -105,16 +118,19 @@ async def _do_execute(self, func, instance, args, kwargs):
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:
span_attributes = _hydrate_span_from_args(
instance, args[0], args[1:] if self.capture_parameters else None
)
name = ""
if args[0]:
name = args[0]
elif span_attributes.get("db.name"):
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
name = span_attributes["db.name"]
else:
name = "postgresql" # Does it ever happen?
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

with tracer.start_as_current_span(name, kind=SpanKind.CLIENT) as span:
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCode

POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser")
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_PASSWORD", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_USER", "testuser")


def async_call(coro):
Expand Down Expand Up @@ -41,34 +41,28 @@ def setUpClass(cls):
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()

def check_span(self, span):
self.assertEqual(span.attributes["db.system"], "postgresql")
self.assertEqual(span.attributes["db.name"], POSTGRES_DB_NAME)
self.assertEqual(span.attributes["db.user"], POSTGRES_USER)
self.assertEqual(span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(span.attributes["net.peer.ip"], POSTGRES_PORT)

def test_instrumented_execute_method_without_arguments(self, *_, **__):
async_call(self._connection.execute("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT 42;",
},
)
self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT 42;")
self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;")

def test_instrumented_fetch_method_without_arguments(self, *_, **__):
async_call(self._connection.fetch("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT 42;",
},
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;")

def test_instrumented_transaction_method(self, *_, **__):
async def _transaction_execute():
Expand All @@ -79,35 +73,16 @@ async def _transaction_execute():

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(3, len(spans))
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "BEGIN;",
},
spans[0].attributes,
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "BEGIN;")
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "SELECT 42;",
},
spans[1].attributes,
)

self.check_span(spans[1])
self.assertEqual(spans[1].attributes["db.statement"], "SELECT 42;")
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "COMMIT;",
},
spans[2].attributes,
)

self.check_span(spans[2])
self.assertEqual(spans[2].attributes["db.statement"], "COMMIT;")
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)

def test_instrumented_failed_transaction_method(self, *_, **__):
Expand All @@ -120,57 +95,28 @@ async def _transaction_execute():

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(3, len(spans))
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "BEGIN;",
},
spans[0].attributes,
)

self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "BEGIN;")
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)

self.check_span(spans[1])
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "SELECT 42::uuid;",
},
spans[1].attributes,
)
self.assertEqual(
StatusCode.ERROR, spans[1].status.status_code,
)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "ROLLBACK;",
},
spans[2].attributes,
spans[1].attributes["db.statement"], "SELECT 42::uuid;"
)
self.assertEqual(StatusCode.ERROR, spans[1].status.status_code)

self.check_span(spans[2])
self.assertEqual(spans[2].attributes["db.statement"], "ROLLBACK;")
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)

def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
async_call(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
# This shouldn't be set because we don't capture parameters by
# default
#
# "db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")


class TestFunctionalAsyncPG_CaptureParameters(TestBase):
Expand All @@ -197,64 +143,56 @@ def setUpClass(cls):
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()

def check_span(self, span):
self.assertEqual(span.attributes["db.system"], "postgresql")
self.assertEqual(span.attributes["db.name"], POSTGRES_DB_NAME)
self.assertEqual(span.attributes["db.user"], POSTGRES_USER)
self.assertEqual(span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(span.attributes["net.peer.ip"], POSTGRES_PORT)

def test_instrumented_execute_method_with_arguments(self, *_, **__):
async_call(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)

self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT $1;")
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
spans[0].attributes["db.statement.parameters"], "('1',)"
)

def test_instrumented_fetch_method_with_arguments(self, *_, **__):
async_call(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
spans[0].attributes["db.statement.parameters"], "('1',)"
)

def test_instrumented_executemany_method_with_arguments(self, *_, **__):
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
spans[0].attributes["db.statement.parameters"], "([['1'], ['2']],)"
)

def test_instrumented_execute_interface_error_method(self, *_, **__):
with self.assertRaises(asyncpg.InterfaceError):
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;")
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
)