Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a weak reference to sqlalchemy Engine to avoid memory leak #1771

Merged
merged 23 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
49d7c53
Use a weak reference to sqlalchemy Engine to avoid memory leak
rbagd Apr 20, 2023
edf8d11
Made a mistake in resolving the weak reference
rbagd Apr 20, 2023
ed83653
Fixed formatting issues
rbagd Apr 22, 2023
6b02c5d
Updated changelog
rbagd Apr 22, 2023
0421847
Added unit test to check that engine was garbage collected
rbagd Apr 22, 2023
be48013
Do not save engine in EngineTracer to avoid memory leak
rbagd Apr 22, 2023
bd32d9d
Add an empty line to satisfy black formatter
rbagd Apr 22, 2023
75d31d7
Fix isort complaints
rbagd Apr 22, 2023
65afc9d
Merge branch 'main' into fix/memory-leak-event-listener
shalevr Apr 25, 2023
441afc7
Fixed the issue when pool name is not set and =None
rbagd Apr 25, 2023
b50a258
Fix formatting issue
rbagd Apr 25, 2023
005037c
Merge branch 'main' into fix/memory-leak-event-listener
rbagd Apr 29, 2023
6022804
Merge branch 'open-telemetry:main' into fix/memory-leak-event-listener
rbagd May 4, 2023
324e67b
Merge branch 'main' into fix/memory-leak-event-listener
rbagd May 7, 2023
afcf121
Rebased after changes in a recent commit
rbagd May 7, 2023
776d5e5
Merge branch 'main' into fix/memory-leak-event-listener
shalevr May 16, 2023
e650ff6
Merge branch 'main' into fix/memory-leak-event-listener
rbagd May 19, 2023
ede2d71
Merge branch 'main' into fix/memory-leak-event-listener
shalevr May 30, 2023
514491b
Updated PR number in changelog
rbagd Jun 4, 2023
8282a11
Merge branch 'main' into fix/memory-leak-event-listener
shalevr Jun 13, 2023
a947471
Merge branch 'main' into fix/memory-leak-event-listener
shalevr Jun 13, 2023
6519c3a
Merge branch 'main' into fix/memory-leak-event-listener
shalevr Jun 21, 2023
0156424
Merge branch 'main' into fix/memory-leak-event-listener
shalevr Jun 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-system-metrics` Add `process.` prefix to `runtime.memory`, `runtime.cpu.time`, and `runtime.gc_count`. Change `runtime.memory` from count to UpDownCounter. ([#1735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1735))
- Add request and response hooks for GRPC instrumentation (client only)
([#1706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1706))
- Fix memory leak in SQLAlchemy instrumentation where disposed `Engine` does not get garbage collected
([#1771](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1771)
- `opentelemetry-instrumentation-pymemcache` Update instrumentation to support pymemcache >4
([#1764](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1764))
- `opentelemetry-instrumentation-confluent-kafka` Add support for higher versions of confluent_kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import os
import re
import weakref

from sqlalchemy.event import ( # pylint: disable=no-name-in-module
listen,
Expand Down Expand Up @@ -99,11 +100,11 @@ def __init__(
commenter_options=None,
):
self.tracer = tracer
self.engine = engine
self.connections_usage = connections_usage
self.vendor = _normalize_vendor(engine.name)
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options if commenter_options else {}
self._engine_attrs = _get_attributes_from_engine(engine)
self._leading_comment_remover = re.compile(r"^/\*.*?\*/")

self._register_event_listener(
Expand All @@ -118,23 +119,11 @@ def __init__(
self._register_event_listener(engine, "checkin", self._pool_checkin)
self._register_event_listener(engine, "checkout", self._pool_checkout)

def _get_connection_string(self):
drivername = self.engine.url.drivername or ""
host = self.engine.url.host or ""
port = self.engine.url.port or ""
database = self.engine.url.database or ""
return f"{drivername}://{host}:{port}/{database}"

def _get_pool_name(self):
if self.engine.pool.logging_name is not None:
return self.engine.pool.logging_name
return self._get_connection_string()

def _add_idle_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
**self._engine_attrs,
shalevr marked this conversation as resolved.
Show resolved Hide resolved
"state": "idle",
},
)
Expand All @@ -143,7 +132,7 @@ def _add_used_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
**self._engine_attrs,
"state": "used",
},
)
Expand All @@ -169,12 +158,21 @@ def _pool_checkout(
@classmethod
def _register_event_listener(cls, target, identifier, func, *args, **kw):
listen(target, identifier, func, *args, **kw)
cls._remove_event_listener_params.append((target, identifier, func))
cls._remove_event_listener_params.append(
(weakref.ref(target), identifier, func)
)

@classmethod
def remove_all_event_listeners(cls):
for remove_params in cls._remove_event_listener_params:
remove(*remove_params)
for (
weak_ref_target,
identifier,
func,
) in cls._remove_event_listener_params:
# Remove an event listener only if saved weak reference points to an object
# which has not been garbage collected
if weak_ref_target() is not None:
remove(weak_ref_target(), identifier, func)
cls._remove_event_listener_params.clear()

def _operation_name(self, db_name, statement):
Expand Down Expand Up @@ -300,3 +298,22 @@ def _get_attributes_from_cursor(vendor, cursor, attrs):
if info.port:
attrs[SpanAttributes.NET_PEER_PORT] = int(info.port)
return attrs


def _get_connection_string(engine):
drivername = engine.url.drivername or ""
host = engine.url.host or ""
port = engine.url.port or ""
database = engine.url.database or ""
return f"{drivername}://{host}:{port}/{database}"


def _get_attributes_from_engine(engine):
"""Set metadata attributes of the database engine"""
attrs = {}

attrs["pool.name"] = getattr(
shalevr marked this conversation as resolved.
Show resolved Hide resolved
getattr(engine, "pool", None), "logging_name", None
) or _get_connection_string(engine)

return attrs
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,26 @@ def test_no_op_tracer_provider(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)

def test_no_memory_leakage_if_engine_diposed(self):
SQLAlchemyInstrumentor().instrument()
import gc
import weakref

from sqlalchemy import create_engine

callback = mock.Mock()

def make_shortlived_engine():
engine = create_engine("sqlite:///:memory:")
# Callback will be called if engine is deallocated during garbage
# collection
weakref.finalize(engine, callback)
with engine.connect() as conn:
conn.execute("SELECT 1 + 1;").fetchall()

for _ in range(0, 5):
make_shortlived_engine()

gc.collect()
assert callback.call_count == 5