Skip to content

Commit

Permalink
Add metrics instrumentation sqlalchemy (#1645)
Browse files Browse the repository at this point in the history
  • Loading branch information
shalevr committed Feb 26, 2023
1 parent 0417141 commit 7ffbfc3
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 37 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add metrics instrumentation for sqlalchemy
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))

- Fix exception in Urllib3 when dealing with filelike body.
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
| [opentelemetry-instrumentation-remoulade](./opentelemetry-instrumentation-remoulade) | remoulade >= 0.50 | No
| [opentelemetry-instrumentation-requests](./opentelemetry-instrumentation-requests) | requests ~= 2.0 | Yes
| [opentelemetry-instrumentation-sklearn](./opentelemetry-instrumentation-sklearn) | scikit-learn ~= 0.24.0 | No
| [opentelemetry-instrumentation-sqlalchemy](./opentelemetry-instrumentation-sqlalchemy) | sqlalchemy | No
| [opentelemetry-instrumentation-sqlalchemy](./opentelemetry-instrumentation-sqlalchemy) | sqlalchemy | Yes
| [opentelemetry-instrumentation-sqlite3](./opentelemetry-instrumentation-sqlite3) | sqlite3 | No
| [opentelemetry-instrumentation-starlette](./opentelemetry-instrumentation-starlette) | starlette ~= 0.13.0 | Yes
| [opentelemetry-instrumentation-system-metrics](./opentelemetry-instrumentation-system-metrics) | psutil >= 5 | No
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,16 @@
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import (
EngineTracer,
_get_tracer,
_wrap_connect,
_wrap_create_async_engine,
_wrap_create_engine,
)
from opentelemetry.instrumentation.sqlalchemy.package import _instruments
from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.trace import get_tracer


class SQLAlchemyInstrumentor(BaseInstrumentor):
Expand All @@ -136,32 +139,47 @@ def _instrument(self, **kwargs):
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)

connections_usage = meter.create_up_down_counter(
name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE,
unit="connections",
description="The number of connections that are currently in state described by the state attribute.",
)

enable_commenter = kwargs.get("enable_commenter", False)

_w(
"sqlalchemy",
"create_engine",
_wrap_create_engine(tracer_provider, enable_commenter),
_wrap_create_engine(tracer, connections_usage, enable_commenter),
)
_w(
"sqlalchemy.engine",
"create_engine",
_wrap_create_engine(tracer_provider, enable_commenter),
_wrap_create_engine(tracer, connections_usage, enable_commenter),
)
_w(
"sqlalchemy.engine.base",
"Engine.connect",
_wrap_connect(tracer_provider),
_wrap_connect(tracer),
)
if parse_version(sqlalchemy.__version__).release >= (1, 4):
_w(
"sqlalchemy.ext.asyncio",
"create_async_engine",
_wrap_create_async_engine(tracer_provider, enable_commenter),
_wrap_create_async_engine(
tracer, connections_usage, enable_commenter
),
)
if kwargs.get("engine") is not None:
return EngineTracer(
_get_tracer(tracer_provider),
tracer,
kwargs.get("engine"),
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
Expand All @@ -170,8 +188,9 @@ def _instrument(self, **kwargs):
):
return [
EngineTracer(
_get_tracer(tracer_provider),
tracer,
engine,
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
)

from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy.package import (
_instrumenting_module_name,
)
from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment
from opentelemetry.instrumentation.utils import _get_opentelemetry_values
Expand All @@ -44,49 +41,36 @@ def _normalize_vendor(vendor):
return vendor


def _get_tracer(tracer_provider=None):
return trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)


def _wrap_create_async_engine(tracer_provider=None, enable_commenter=False):
def _wrap_create_async_engine(
tracer, connections_usage, enable_commenter=False
):
# pylint: disable=unused-argument
def _wrap_create_async_engine_internal(func, module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer`
object that will listen to SQLAlchemy events.
"""
engine = func(*args, **kwargs)
EngineTracer(
_get_tracer(tracer_provider), engine.sync_engine, enable_commenter
tracer, engine.sync_engine, connections_usage, enable_commenter
)
return engine

return _wrap_create_async_engine_internal


def _wrap_create_engine(tracer_provider=None, enable_commenter=False):
# pylint: disable=unused-argument
def _wrap_create_engine_internal(func, module, args, kwargs):
def _wrap_create_engine(tracer, connections_usage, enable_commenter=False):
def _wrap_create_engine_internal(func, _module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer`
object that will listen to SQLAlchemy events.
"""
engine = func(*args, **kwargs)
EngineTracer(_get_tracer(tracer_provider), engine, enable_commenter)
EngineTracer(tracer, engine, connections_usage, enable_commenter)
return engine

return _wrap_create_engine_internal


def _wrap_connect(tracer_provider=None):
tracer = trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)

def _wrap_connect(tracer):
# pylint: disable=unused-argument
def _wrap_connect_internal(func, module, args, kwargs):
with tracer.start_as_current_span(
Expand All @@ -107,10 +91,16 @@ class EngineTracer:
_remove_event_listener_params = []

def __init__(
self, tracer, engine, enable_commenter=False, commenter_options=None
self,
tracer,
engine,
connections_usage,
enable_commenter=False,
commenter_options=None,
):
self.tracer = tracer
self.engine = engine
self.connections_usage = connections_usage
self.vendor = _normalize_vendor(engine.name)
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options if commenter_options else {}
Expand All @@ -123,6 +113,49 @@ def __init__(
engine, "after_cursor_execute", _after_cur_exec
)
self._register_event_listener(engine, "handle_error", _handle_error)
self._register_event_listener(engine, "connect", self._pool_connect)
self._register_event_listener(engine, "close", self._pool_close)
self._register_event_listener(engine, "checkin", self._pool_checkin)
self._register_event_listener(engine, "checkout", self._pool_checkout)

def _get_pool_name(self):
return self.engine.pool.logging_name or ""

def _add_idle_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
"state": "idle",
},
)

def _add_used_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
"state": "used",
},
)

def _pool_connect(self, _dbapi_connection, _connection_record):
self._add_idle_to_connection_usage(1)

def _pool_close(self, _dbapi_connection, _connection_record):
self._add_idle_to_connection_usage(-1)

# Called when a connection returns to the pool.
def _pool_checkin(self, _dbapi_connection, _connection_record):
self._add_used_to_connection_usage(-1)
self._add_idle_to_connection_usage(1)

# Called when a connection is retrieved from the Pool.
def _pool_checkout(
self, _dbapi_connection, _connection_record, _connection_proxy
):
self._add_idle_to_connection_usage(-1)
self._add_used_to_connection_usage(1)

@classmethod
def _register_event_listener(cls, target, identifier, func, *args, **kw):
Expand Down Expand Up @@ -153,9 +186,8 @@ def _operation_name(self, db_name, statement):
return self.vendor
return " ".join(parts)

# pylint: disable=unused-argument
def _before_cur_exec(
self, conn, cursor, statement, params, context, executemany
self, conn, cursor, statement, params, context, _executemany
):
attrs, found = _get_attributes_from_url(conn.engine.url)
if not found:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

_instrumenting_module_name = "opentelemetry.instrumentation.sqlalchemy"

_instruments = ("sqlalchemy",)

_supports_metrics = True
Loading

0 comments on commit 7ffbfc3

Please sign in to comment.