-
Notifications
You must be signed in to change notification settings - Fork 112
/
database.py
395 lines (303 loc) · 13.2 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
"""
database.py
Copyright (C) 2008 Nikolaus Rath <Nikolaus@rath.org>
This program can be distributed under the terms of the GNU LGPL.
"""
from __future__ import unicode_literals
import logging
from contextlib import contextmanager
import apsw
import time
import thread
from random import randrange
__all__ = [ "ConnectionManager", 'WrappedConnection', 'NoUniqueValueError' ]
log = logging.getLogger("database")
class ConnectionManager(object):
"""Manage access to database.
This class manages access to the SQLite database. Its main objective
is to ensure that every thread works with a thread-local connection.
This allows to rely on SQLite to take care of locking procedures
and ensures that one can uniquely retrieve the last inserted rowid and the
number of rows affected by the last statement.
Note that threading.local() does not work when the threads are
not started by threading.Thread() but some C library (like fuse).
The python implementation in _threading_local does work, but
it is not clear if and when local objects are being destroyed.
Therefore we maintain a pool of connections that are
shared between all threads.
Instead of storing the connections directly in the pool, we
actually store a cursor from each connection. That way we
don't always have to allocate a new cursor whenever we
allocate the connection. The connection itself can always
be retrieved from the cursor using getconnection().
Attributes:
-----------
:retrytime: In case the database is locked by another thread,
we wait for the lock to be released for at most
`retrytime` milliseconds.
:pool: List of available cursors (one for each database connection)
:provided: Dict of currently provided ConnectionWrapper instances
:dbfile: Filename of the database
:initsql: SQL commands that are executed whenever a new
connection is created.
"""
def __init__(self, dbfile, initsql=None, retrytime=10000):
'''Initialize object.
If `initsql` is specified, it is executed as an SQL command
whenever a new connection is created (you can use it e.g. to
set specific pragmas for all connections).
'''
self.dbfile = dbfile
self.initsql = initsql
self.retrytime = retrytime
self.pool = list()
self.provided = dict()
# http://code.google.com/p/apsw/issues/detail?id=59
apsw.enablesharedcache(False)
@contextmanager
def __call__(self):
'''Provide a WrappedConnection instance.
This context manager acquires a connection from the pool and
returns a WrappedConnection instance. If this function is
called again by the same thread in the managed block, it will
always return the same WrappedConnection instance.
'''
try:
wconn = self.provided[thread.get_ident()]
except KeyError:
pass
else:
yield wconn
return
conn = self._pop_conn()
try:
wconn = WrappedConnection(conn, self.retrytime)
self.provided[thread.get_ident()] = wconn
try:
yield wconn
finally:
del self.provided[thread.get_ident()]
finally:
self._push_conn(conn)
@contextmanager
def transaction(self):
'''Provide WrappedConnection and initiate transaction.
This context manager acquires a connection from the pool
and immediately sets a savepoint. It provides a WrappedConnection
instance. If the managed block evaluates
without exceptions, the savepoint is committed at the end.
Otherwise it is rolled back.
If this function is
called again in the same thread inside the managed block, it will
always return the same WrappedConnection instance, but still
start a new, inner transaction.
'''
with self() as wconn:
with wconn.transaction():
yield wconn
def _pop_conn(self):
'''Return database connection from the pool
'''
try:
conn = self.pool.pop()
except IndexError:
# Need to create a new connection
log.debug("Creating new db connection (active conns: %d)...",
len(self.provided))
conn = apsw.Connection(self.dbfile)
conn.setbusytimeout(self.retrytime)
# We store a cursor instead
conn = conn.cursor()
if self.initsql:
conn.execute(self.initsql)
return conn
def _push_conn(self, conn):
'''Put the a database connection back into the pool
'''
self.pool.append(conn)
def get_val(self, *a, **kw):
"""Acquire WrappedConnection and run its get_val method.
"""
with self() as conn:
return conn.get_val(*a, **kw)
def get_row(self, *a, **kw):
""""Acquire WrappedConnection and run its get_row method.
"""
with self() as conn:
return conn.get_row(*a, **kw)
def execute(self, *a, **kw):
""""Acquire WrappedConnection and run its execute method.
"""
with self() as conn:
return conn.execute(*a, **kw)
class WrappedConnection(object):
'''This class wraps an APSW connection object. It should be
used instead of any native APSW cursors.
It provides methods to directly execute SQL commands and
creates apsw cursors dynamically.
WrappedConnections are not thread safe. They can be passed between
threads, but must not be called concurrently.
WrappedConnection also takes care of converting bytes objects into
buffer objects and back, so that they are stored as BLOBS
in the database. If you want to store TEXT, you need to
supply unicode objects instead. (This functionality is
only needed under Python 2.x, under Python 3.x the apsw
module already behaves in the correct way).
Attributes
----------
:conn: apsw connection object
:cur: default cursor, to be used for all queries
that do not return a ResultSet (i.e., that finalize
the cursor when they return)
:retrytime: Maximum time to wait for other threads to release a
database lock.
:savepoint_cnt: Keeps track of the current number of encapsulated
savepoints. We use a running number instead of e.g.
the address of a local object so that the apsw statement
cache does not overflow.
'''
def __init__(self, conn, retrytime):
self.conn = conn.getconnection()
self.cur = conn
self.retrytime = retrytime
self.savepoint_cnt = 0
@contextmanager
def transaction(self):
'''Initiate a transaction
This context manager creates a savepoint. If the managed block evaluates
without exceptions, the savepoint is committed at the end.
Otherwise it is rolled back.
If there is no enclosing transaction, a BEGIN IMMEDIATE transaction
is started before the saveblock.
'''
self.savepoint_cnt += 1
name = 's3ql-%d' % self.savepoint_cnt
# NOTE: If you ever add a version of this function that starts a DEFERRED transaction
# instead, you have to make sure that the two different kinds of transactions
# cannot be nested. Once a DEFERRED (== read only) transaction is started, the thread
# holds a SHARED lock and must not try to obtain a RESERVED lock or deadlocks
# will occur. In other words, once the caller has asked for a DEFERRED transaction,
# any further attempts to set SAVEPOINTS have to produce errors.
if self.savepoint_cnt == 1:
self._execute(self.cur, 'BEGIN IMMEDIATE')
self._execute(self.cur, "SAVEPOINT '%s'" % name)
# pylint bug
#pylint: disable-msg=C0321
try:
yield
except:
self._execute(self.cur, "ROLLBACK TO '%s'" % name)
raise
finally:
self._execute(self.cur, "RELEASE '%s'" % name)
self.savepoint_cnt -= 1
if self.savepoint_cnt == 0:
self._execute(self.cur, 'COMMIT')
def query(self, *a, **kw):
'''Execute the given SQL statement. Return ResultSet.
Transforms buffer() to bytes() and vice versa.
'''
return ResultSet(self._execute(self.conn.cursor(), *a, **kw))
def execute(self, *a, **kw):
'''Execute the given SQL statement. Return number of affected rows.
'''
self._execute(self.cur, *a, **kw)
return self.changes()
def rowid(self, *a, **kw):
"""Execute SQL statement and return last inserted rowid.
"""
self._execute(self.cur, *a, **kw)
return self.conn.last_insert_rowid()
def _execute(self, cur, statement, bindings=None):
'''Execute the given SQL statement with the given cursor
Note that in shared cache mode we may get an SQLITE_LOCKED
error, which is not handled by the busy handler. Therefore
we have to emulate this behavior.
'''
# There really aren't too many branches in this method
#pylint: disable-msg=R0912
# Convert bytes to buffer
if isinstance(bindings, dict):
newbindings = dict()
for key in bindings:
if isinstance(bindings[key], bytes):
newbindings[key] = buffer(bindings[key])
else:
newbindings[key] = bindings[key]
elif isinstance(bindings, (list, tuple)):
newbindings = [ ( val if not isinstance(val, bytes) else buffer(val) )
for val in bindings ]
else:
newbindings = bindings
waited = 0
step = 1
#log.debug(statement)
while True:
curtime = time.time()
try:
if bindings is not None:
return cur.execute(statement, newbindings)
else:
return cur.execute(statement)
except apsw.LockedError:
if waited > self.retrytime:
raise # We don't wait any longer
time.sleep(step / 1000)
waited += step
step = randrange(step+1, 2*(step+1), 1)
except apsw.BusyError:
if time.time() - curtime < self.retrytime/1000:
log.warn('SQLite detected deadlock condition!')
raise
def get_val(self, *a, **kw):
"""Executes a select statement and returns first element of first row.
If there is no result row, raises StopIteration. If there is more
than one row, raises NoUniqueValueError.
"""
return self.get_row(*a, **kw)[0]
def get_list(self, *a, **kw):
"""Executes a select statement and returns result list.
"""
return list(self.query(*a, **kw))
def get_row(self, *a, **kw):
"""Executes a select statement and returns first row.
If there are no result rows, raises StopIteration. If there is more
than one result row, raises RuntimeError.
"""
res = ResultSet(self._execute(self.cur, *a, **kw))
row = res.next()
try:
res.next()
except StopIteration:
# Fine, we only wanted one row
pass
else:
raise NoUniqueValueError()
return row
def last_rowid(self):
"""Return rowid most recently inserted in the current thread.
"""
return self.conn.last_insert_rowid()
def changes(self):
"""Return number of rows affected by most recent sql statement in current thread.
"""
return self.conn.changes()
class NoUniqueValueError(Exception):
'''Raised if get_val or get_row was called with a query
that generated more than one result row.
'''
def __str__(self):
return 'Query generated more than 1 result row'
class ResultSet(object):
'''Iterator over the result of an SQL query
This class automatically converts back from buffer() to bytes(). When
all results have been retrieved, the connection is returned back to
the pool.
'''
def __init__(self, cur):
self.cur = cur
def __iter__(self):
return self
def next(self):
return [ ( col if not isinstance(col, buffer) else bytes(col) )
for col in self.cur.next() ]