Skip to content

Commit

Permalink
Merge pull request #84 from zodb/close-memcached-on-release
Browse files Browse the repository at this point in the history
Release memcache connections too. Fixes #80.
  • Loading branch information
jamadden committed Jun 28, 2016
2 parents 657a3ae + 00ee76f commit 65795df
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 39 deletions.
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
2.0.0b2 (unreleased)
====================

- Nothing changed yet.
- Memcache connections are explicitly released instead of waiting for
GC to do it for us. This is especially important with PyPy and/or
``python-memcached``. See :issue:`80`.


2.0.0b1 (2016-06-28)
Expand Down
94 changes: 57 additions & 37 deletions relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ def new_instance(self):
else:
return StorageCache(self.adapter, self.options, self.prefix)

def release(self):
"""
Release resources held by this instance.
This is usually memcache connections if they're in use.
"""
for client in self.clients_local_first:
client.disconnect_all()

if not self.options.share_local_cache:
# If we have our own local cache not shared with anyone,
# go ahead and clear it to release any memory it's holding.
self.clients_local_first[0].flush_all()

def clear(self):
"""Remove all data from the cache. Called by speed tests."""
for client in self.clients_local_first:
Expand All @@ -124,7 +138,7 @@ def clear(self):
self.commit_count = object()

def _check_tid_after_load(self, oid_int, actual_tid_int,
expect_tid_int=None):
expect_tid_int=None):
"""Verify the tid of an object loaded from the database is sane."""

if actual_tid_int > self.current_tid:
Expand All @@ -133,16 +147,17 @@ def _check_tid_after_load(self, oid_int, actual_tid_int,
# would be a consistency violation. However, the cause is hard
# to track down, so issue a ReadConflictError and hope that
# the application retries successfully.
raise ReadConflictError("Got data for OID 0x%(oid_int)x from "
"future transaction %(actual_tid_int)d (%(got_ts)s). "
"Current transaction is %(current_tid)d (%(current_ts)s)."
% {
'oid_int': oid_int,
'actual_tid_int': actual_tid_int,
'current_tid': self.current_tid,
'got_ts': str(TimeStamp(p64(actual_tid_int))),
'current_ts': str(TimeStamp(p64(self.current_tid))),
})
msg = ("Got data for OID 0x%(oid_int)x from "
"future transaction %(actual_tid_int)d (%(got_ts)s). "
"Current transaction is %(current_tid)d (%(current_ts)s)."
% {
'oid_int': oid_int,
'actual_tid_int': actual_tid_int,
'current_tid': self.current_tid,
'got_ts': str(TimeStamp(p64(actual_tid_int))),
'current_ts': str(TimeStamp(p64(self.current_tid))),
})
raise ReadConflictError(msg)

if expect_tid_int is not None and actual_tid_int != expect_tid_int:
# Uh-oh, the cache is inconsistent with the database.
Expand All @@ -165,30 +180,31 @@ def _check_tid_after_load(self, oid_int, actual_tid_int,
# wrong tid in delta_after*.
cp0, cp1 = self.checkpoints
import os
import threading
raise AssertionError("Detected an inconsistency "
"between the RelStorage cache and the database "
"while loading an object using the delta_after0 dict. "
"Please verify the database is configured for "
"ACID compliance and that all clients are using "
"the same commit lock. "
"(oid_int=%(oid_int)r, expect_tid_int=%(expect_tid_int)r, "
"actual_tid_int=%(actual_tid_int)r, "
"current_tid=%(current_tid)r, cp0=%(cp0)r, cp1=%(cp1)r, "
"len(delta_after0)=%(lda0)r, len(delta_after1)=%(lda1)r, "
"pid=%(pid)r, thread_ident=%(thread_ident)r)"
% {
'oid_int': oid_int,
'expect_tid_int': expect_tid_int,
'actual_tid_int': actual_tid_int,
'current_tid': self.current_tid,
'cp0': cp0,
'cp1': cp1,
'lda0': len(self.delta_after0),
'lda1': len(self.delta_after1),
'pid': os.getpid(),
'thread_ident': threading.current_thread(),
})

msg = ("Detected an inconsistency "
"between the RelStorage cache and the database "
"while loading an object using the delta_after0 dict. "
"Please verify the database is configured for "
"ACID compliance and that all clients are using "
"the same commit lock. "
"(oid_int=%(oid_int)r, expect_tid_int=%(expect_tid_int)r, "
"actual_tid_int=%(actual_tid_int)r, "
"current_tid=%(current_tid)r, cp0=%(cp0)r, cp1=%(cp1)r, "
"len(delta_after0)=%(lda0)r, len(delta_after1)=%(lda1)r, "
"pid=%(pid)r, thread_ident=%(thread_ident)r)"
% {
'oid_int': oid_int,
'expect_tid_int': expect_tid_int,
'actual_tid_int': actual_tid_int,
'current_tid': self.current_tid,
'cp0': cp0,
'cp1': cp1,
'lda0': len(self.delta_after0),
'lda1': len(self.delta_after1),
'pid': os.getpid(),
'thread_ident': threading.current_thread(),
})
raise AssertionError(msg)

def load(self, cursor, oid_int):
"""Load the given object from cache if possible.
Expand Down Expand Up @@ -495,7 +511,7 @@ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):
and prev_tid_int
and prev_tid_int <= self.current_tid
and new_tid_int >= self.current_tid
):
):
# All the conditions for keeping the checkpoints were met,
# so just update self.delta_after0 and self.current_tid.
m = self.delta_after0
Expand Down Expand Up @@ -560,7 +576,7 @@ def _suggest_shifted_checkpoints(self, tid_int, oversize):
checkpoint0 shifts to checkpoint1 and the tid just committed
becomes checkpoint0.
"""
cp0, cp1 = self.checkpoints
cp0, _cp1 = self.checkpoints
assert tid_int > cp0
expect = '%d %d' % self.checkpoints
if oversize:
Expand Down Expand Up @@ -776,3 +792,7 @@ def incr(self, key):
res = int(value) + 1
self._set_one(key, res)
return res

def disconnect_all(self):
# Compatibility with memcache.
pass
4 changes: 4 additions & 0 deletions relstorage/pylibmc_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ def incr(self, key):
@_catching
def flush_all(self):
return self._client.flush_all()

@_catching
def disconnect_all(self):
return self._client.disconnect_all()
7 changes: 6 additions & 1 deletion relstorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,16 @@ def zap_all(self, **kwargs):
self._cache.clear()

def release(self):
"""Release database sessions used by this storage instance.
"""
Release external resources used by this storage instance.
This includes the database sessions (connections) and any memcache
connections.
"""
with self._lock:
self._drop_load_connection()
self._drop_store_connection()
self._cache.release()

def close(self):
"""Close the storage and all instances."""
Expand Down
3 changes: 3 additions & 0 deletions relstorage/tests/fakecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ def incr(self, key):
def flush_all(self):
data.clear()

def disconnect_all(self):
# no-op
pass

0 comments on commit 65795df

Please sign in to comment.