Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce weak references #165

Merged
merged 6 commits into from Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 90 additions & 17 deletions sqlitedict.py
Expand Up @@ -34,6 +34,7 @@
import time
import traceback
from base64 import b64decode, b64encode
import weakref

from threading import Thread

Expand Down Expand Up @@ -67,6 +68,51 @@ def reraise(tp, value, tb=None):

logger = logging.getLogger(__name__)

#
# There's a thread that holds the actual SQL connection (SqliteMultithread).
# We communicate with this thread via queues (request and responses).
# The requests can either be SQL commands or one of the "special" commands
# below:
#
# _REQUEST_CLOSE: request that the SQL connection be closed
# _REQUEST_COMMIT: request that any changes be committed to the DB
#
# Responses are either SQL records (e.g. results of a SELECT) or the magic
# _RESPONSE_NO_MORE command, which indicates nothing else will ever be written
# to the response queue.
#
_REQUEST_CLOSE = '--close--'
_REQUEST_COMMIT = '--commit--'
_RESPONSE_NO_MORE = '--no more--'

#
# We work with weak references for better memory efficiency.
# Dereferencing, checking the referent queue still exists, and putting to it
# is boring and repetitive, so we have a _put function to handle it for us.
#
_PUT_OK, _PUT_REFERENT_DESTROYED, _PUT_NOOP = 0, 1, 2


def _put(queue_reference, item):
if queue_reference is not None:
queue = queue_reference()
if queue is None:
#
# We got a reference to a queue, but that queue no longer exists
#
retval = _PUT_REFERENT_DESTROYED
else:
queue.put(item)
retval = _PUT_OK

del queue
return retval

#
# We didn't get a reference to a queue, so do nothing (no-op).
#
return _PUT_NOOP


def open(*args, **kwargs):
"""See documentation of the SqliteDict class."""
Expand Down Expand Up @@ -429,16 +475,22 @@ def run(self):

self._sqlitedict_thread_initialized = True

res = None
res_ref = None
while True:
req, arg, res, outer_stack = self.reqs.get()
if req == '--close--':
assert res, ('--close-- without return queue', res)
#
# req: an SQL command or one of the --magic-- commands we use internally
# arg: arguments for the command
# res_ref: a weak reference to the queue into which responses must be placed
# outer_stack: the outer stack, for producing more informative traces in case of error
#
req, arg, res_ref, outer_stack = self.reqs.get()

if req == _REQUEST_CLOSE:
assert res_ref, ('--close-- without return queue', res_ref)
break
elif req == '--commit--':
elif req == _REQUEST_COMMIT:
conn.commit()
if res:
res.put('--no more--')
_put(res_ref, _RESPONSE_NO_MORE)
else:
try:
cursor.execute(req, arg)
Expand Down Expand Up @@ -477,17 +529,25 @@ def run(self):
'SqliteDict instance to show the outer stack.'
)

if res:
if res_ref:
for rec in cursor:
res.put(rec)
res.put('--no more--')
if _put(res_ref, rec) == _PUT_REFERENT_DESTROYED:
#
# The queue we are sending responses to got garbage
# collected. Nobody is listening anymore, so we
# stop sending responses.
#
break

_put(res_ref, _RESPONSE_NO_MORE)

if self.autocommit:
conn.commit()

self.log.debug('received: %s, send: --no more--', req)
conn.close()
res.put('--no more--')

_put(res_ref, _RESPONSE_NO_MORE)

def check_raise_error(self):
"""
Expand Down Expand Up @@ -520,6 +580,10 @@ def check_raise_error(self):
def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.

:param req: The request (an SQL command)
:param arg: Arguments to the SQL command
:param res: A queue in which to place responses as they become available
"""
self._wait_for_initialization()
self.check_raise_error()
Expand All @@ -532,7 +596,16 @@ def execute(self, req, arg=None, res=None):
# so often.
stack = traceback.extract_stack()[:-1]

self.reqs.put((req, arg or tuple(), res, stack))
#
# We pass a weak reference to the response queue instead of a regular
# reference, because we want the queues to be garbage-collected
# more aggressively.
#
res_ref = None
if res:
res_ref = weakref.ref(res)

self.reqs.put((req, arg or tuple(), res_ref, stack))

def executemany(self, req, items):
for item in items:
Expand All @@ -552,7 +625,7 @@ def select(self, req, arg=None):
while True:
rec = res.get()
self.check_raise_error()
if rec == '--no more--':
if rec == _RESPONSE_NO_MORE:
break
yield rec

Expand All @@ -569,10 +642,10 @@ def commit(self, blocking=True):
# blocking=False. This ensures any available exceptions for any
# previous statement are thrown before returning, and that the
# data has actually persisted to disk!
self.select_one('--commit--')
self.select_one(_REQUEST_COMMIT)
else:
# otherwise, we fire and forget as usual.
self.execute('--commit--')
self.execute(_REQUEST_COMMIT)

def close(self, force=False):
if force:
Expand All @@ -581,12 +654,12 @@ def close(self, force=False):
# can't process the request. Instead, push the close command to the requests
# queue directly. If run() is still alive, it will exit gracefully. If not,
# then there's nothing we can do anyway.
self.reqs.put(('--close--', None, Queue(), None))
self.reqs.put((_REQUEST_CLOSE, None, weakref.ref(Queue()), None))
else:
# we abuse 'select' to "iter" over a "--close--" statement so that we
# can confirm the completion of close before joining the thread and
# returning (by semaphore '--no more--'
self.select_one('--close--')
self.select_one(_REQUEST_CLOSE)
self.join()

def _wait_for_initialization(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_autocommit.py
Expand Up @@ -6,7 +6,7 @@

def test():
"Verify autocommit just before program exits."
assert os.system('PYTHONPATH=. %s tests/autocommit.py' % sys.executable) == 0
assert os.system('env PYTHONPATH=. %s tests/autocommit.py' % sys.executable) == 0
# The above script relies on the autocommit feature working correctly.
# Now, let's check if it actually worked.
d = sqlitedict.SqliteDict('tests/db/autocommit.sqlite')
Expand Down
35 changes: 35 additions & 0 deletions tests/test_core.py
Expand Up @@ -3,6 +3,7 @@
import unittest
import tempfile
import os
from unittest.mock import patch

# local
import sqlitedict
Expand Down Expand Up @@ -66,6 +67,40 @@ def test_commit_nonblocking(self):
d['key'] = 'value'
d.commit(blocking=False)

def test_cancel_iterate(self):
import time

class EndlessKeysIterator:
def __init__(self) -> None:
self.value = 0

def __iter__(self):
return self

def __next__(self):
self.value += 1
return [self.value]

with patch('sqlitedict.sqlite3') as mock_sqlite3:
ki = EndlessKeysIterator()
cursor = mock_sqlite3.connect().cursor()
cursor.__iter__.return_value = ki

with SqliteDict(autocommit=True) as d:
for i, k in enumerate(d.keys()):
assert i + 1 == k
if k > 100:
break
assert ki.value > 101

# Release GIL, let background threads run.
# Don't use gc.collect because this is simulate user code.
time.sleep(0.01)

current = ki.value
time.sleep(1)
assert current == ki.value, 'Will not read more after iterate stop'


class NamedSqliteDictCreateOrReuseTest(TempSqliteDictTest):
"""Verify default flag='c', and flag='n' of SqliteDict()."""
Expand Down