/
Connection.py
1458 lines (1200 loc) · 54.3 KB
/
Connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Database connection support
$Id$"""
import logging
import sys
import tempfile
import threading
import warnings
import os
import time
from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
from zope.interface import implementer
import transaction
import ZODB
from ZODB.blob import SAVEPOINT_SUFFIX
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils
import six
global_reset_counter = 0
def resetCaches():
"""Causes all connection caches to be reset as connections are reopened.
Zope's refresh feature uses this. When you reload Python modules,
instances of classes continue to use the old class definitions.
To use the new code immediately, the refresh feature asks ZODB to
clear caches by calling resetCaches(). When the instances are
loaded by subsequent connections, they will use the new class
definitions.
"""
global global_reset_counter
global_reset_counter += 1
def className(obj):
cls = type(obj)
return "%s.%s" % (cls.__module__, cls.__name__)
@implementer(IConnection,
ISavepointDataManager,
IPersistentDataManager,
ISynchronizer)
class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects."""
_code_timestamp = 0
##########################################################################
# Connection methods, ZODB.IConnection
def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
"""Create a new Connection."""
self._log = logging.getLogger('ZODB.Connection')
self._debug_info = ()
self._db = db
self.large_record_size = db.large_record_size
# historical connection
self.before = before
# Multi-database support
self.connections = {self._db.database_name: self}
storage = db.storage
if IMVCCStorage.providedBy(storage):
# Use a connection-specific storage instance.
self._mvcc_storage = True
storage = storage.new_instance()
else:
self._mvcc_storage = False
self._normal_storage = self._storage = storage
self.new_oid = db.new_oid
self._savepoint_storage = None
# Do we need to join a txn manager?
self._needs_to_join = True
self.transaction_manager = None
self.opened = None # time.time() when DB.open() opened us
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
# Cache which can ghostify (forget the state of) objects not
# recently used. Its API is roughly that of a dict, with
# additional gc-related and invalidation-related methods.
self._cache = PickleCache(self, cache_size, cache_size_bytes)
# The pre-cache is used by get to avoid infinite loops when
# objects immediately load their state whern they get their
# persistent data set.
self._pre_cache = {}
# List of all objects (not oids) registered as modified by the
# persistence machinery, or by add(), or whose access caused a
# ReadConflictError (just to be able to clean them up from the
# cache on abort with the other modified objects). All objects
# of this list are either in _cache or in _added.
self._registered_objects = []
# ids and serials of objects for which readCurrent was called
# in a transaction.
self._readCurrent = {}
# Dict of oid->obj added explicitly through add(). Used as a
# preliminary cache until commit time when objects are all moved
# to the real _cache. The objects are moved to _creating at
# commit time.
self._added = {}
# During commit this is turned into a list, which receives
# objects added as a side-effect of storing a modified object.
self._added_during_commit = None
# During commit, all objects go to either _modified or _creating:
# Dict of oid->flag of new objects (without serial), either
# added by add() or implicitly added (discovered by the
# serializer during commit). The flag is True for implicit
# adding. Used during abort to remove created objects from the
# _cache, and by persistent_id to check that a new object isn't
# reachable from multiple databases.
self._creating = {}
# List of oids of modified objects, which have to be invalidated
# in the cache on abort and in other connections on finish.
self._modified = []
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
# that case, the GIL can't save us.
# Note: since that was written, it was officially declared that the
# type of an oid is str. TODO: remove the related now-unnecessary
# critical sections (if any -- this needs careful thought).
self._inv_lock = threading.Lock()
self._invalidated = set()
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
# experienced ReadConflictError. Any time we raise ReadConflictError,
# the oid should be added to this set, and we should be sure that the
# object is registered. Because it's registered, Connection.commit()
# will raise ReadConflictError again (because the oid is in
# _conflicts).
self._conflicts = {}
# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments
# to pass to _importDuringCommit().
self._import = None
self._reader = ObjectReader(self, self._cache, self._db.classFactory)
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
marker = object()
oid = getattr(obj, "_p_oid", marker)
if oid is marker:
raise TypeError("Only first-class persistent objects may be"
" added to a Connection.", obj)
elif obj._p_jar is None:
assert obj._p_oid is None
oid = obj._p_oid = self.new_oid()
obj._p_jar = self
if self._added_during_commit is not None:
self._added_during_commit.append(obj)
self._register(obj)
# Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been
# registered with the transaction.
self._added[oid] = obj
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
def get(self, oid):
"""Return the persistent object with oid 'oid'."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
obj = self._cache.get(oid, None)
if obj is not None:
return obj
obj = self._added.get(oid, None)
if obj is not None:
return obj
obj = self._pre_cache.get(oid, None)
if obj is not None:
return obj
p, serial = self._storage.load(oid, '')
obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before
# it is added to the cache and it's state refers to it.
# (This will typically be the case for non-ghostifyable objects,
# like persistent caches.)
self._pre_cache[oid] = obj
self._cache.new_ghost(oid, obj)
self._pre_cache.pop(oid)
return obj
def cacheMinimize(self):
"""Deactivate all unmodified objects in the cache.
"""
for connection in six.itervalues(self.connections):
connection._cache.minimize()
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self):
"""Reduce cache size to target size.
"""
for connection in six.itervalues(self.connections):
connection._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
"""Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f)
def close(self, primary=True):
"""Close the Connection."""
if not self._needs_to_join:
# We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to "
"a transaction")
if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
for f in self.__onCloseCallbacks:
try:
f()
except: # except what?
f = getattr(f, 'im_self', f)
self._log.exception("Close callback failed for %s", f)
self.__onCloseCallbacks = None
self._debug_info = ()
if self.opened and self.transaction_manager is not None:
# transaction_manager could be None if one of the __onCloseCallbacks
# closed the DB already, .e.g, ZODB.connection() does this.
self.transaction_manager.unregisterSynch(self)
if self._mvcc_storage:
self._storage.sync(force=False)
if primary:
for connection in self.connections.values():
if connection is not self:
connection.close(False)
# Return the connection to the pool.
if self.opened is not None:
self._db._returnToPool(self)
# _returnToPool() set self.opened to None.
# However, we can't assert that here, because self may
# have been reused (by another thread) by the time we
# get back here.
else:
self.opened = None
am = self._db._activity_monitor
if am is not None:
am.closedConnection(self)
# Drop transaction manager to release resources and help prevent errors
self.transaction_manager = None
def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
def isReadOnly(self):
"""Returns True if this connection is read only."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
return self.before is not None or self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None:
# This is a historical connection. Invalidations are irrelevant.
return
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
elif (tid is not None) and (tid < self._txn_time):
raise AssertionError("invalidations out of order, %r < %r"
% (tid, self._txn_time))
self._invalidated.update(oids)
finally:
self._inv_lock.release()
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
@property
def root(self):
"""Return the database root object."""
return RootConvenience(self.get(z64))
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open(
transaction_manager=self.transaction_manager,
before=self.before,
)
self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def _implicitlyAdding(self, oid):
"""Are we implicitly adding an object within the current transaction
This is used in a check to avoid implicitly adding an object
to a database in a multi-database situation.
See serialize.ObjectWriter.persistent_id.
"""
return (self._creating.get(oid, 0)
or
((self._savepoint_storage is not None)
and
self._savepoint_storage.creating.get(oid, 0)
)
)
def sync(self):
"""Manually update the view on the database."""
self.transaction_manager.abort()
self._storage_sync()
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info
def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args
def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored."""
res = self._load_count, self._store_count
if clear:
self._load_count = 0
self._store_count = 0
return res
# Connection methods
##########################################################################
##########################################################################
# Data manager (ISavepointDataManager) methods
def abort(self, transaction):
"""Abort a transaction and forget all changes."""
# The order is important here. We want to abort registered
# objects before we process the cache. Otherwise, we may un-add
# objects added in savepoints. If they've been modified since
# the savepoint, then they won't have _p_oid or _p_jar after
# they've been unadded. This will make the code in _abort
# confused.
self._abort()
if self._savepoint_storage is not None:
self._abort_savepoint()
self._invalidate_creating()
self._tpc_cleanup()
def _abort(self):
"""Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
if oid in self._added:
del self._added[oid]
if self._cache.get(oid) is not None:
del self._cache[oid]
del obj._p_jar
del obj._p_oid
if obj._p_changed:
obj._p_changed = False
else:
# Note: If we invalidate a non-ghostifiable object
# (i.e. a persistent class), the object will
# immediately reread its state. That means that the
# following call could result in a call to
# self.setstate, which, of course, must succeed.
# In general, it would be better if the read could be
# delayed until the start of the next transaction. If
# we read at the end of a transaction and if the
# object was invalidated during this transaction, then
# we'll read non-current data, which we'll discard
# later in transaction finalization. Unfortnately, we
# can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a
# reread is pretty low.
self._cache.invalidate(oid)
def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
self._needs_to_join = True
self._registered_objects = []
self._creating.clear()
# Process pending invalidations.
def _flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._cache.invalidate(list(self._cache.cache_data.keys()))
elif invalidated:
self._cache.invalidate(invalidated)
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()
self._cache.invalidate(invalidated)
# Now is a good time to collect some garbage.
self._cache.incrgc()
def tpc_begin(self, transaction):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
self._creating.clear()
self._normal_storage.tpc_begin(transaction)
def commit(self, transaction):
"""Commit changes to an object"""
if self._savepoint_storage is not None:
# We first checkpoint the current changes to the savepoint
self.savepoint()
# then commit all of the savepoint changes at once
self._commit_savepoint(transaction)
# No need to call _commit since savepoint did.
else:
self._commit(transaction)
for oid, serial in six.iteritems(self._readCurrent):
try:
self._storage.checkCurrentSerialInTransaction(
oid, serial, transaction)
except ConflictError:
self._cache.invalidate(oid)
raise
def _commit(self, transaction):
"""Commit changes to an object"""
if self.before is not None:
raise ReadOnlyHistoryError()
if self._import:
# We are importing an export file. We alsways do this
# while making a savepoint so we can copy export data
# directly to our storage, typically a TmpStore.
self._importDuringCommit(transaction, *self._import)
self._import = None
# Just in case an object is added as a side-effect of storing
# a modified object. If, for example, a __getstate__() method
# calls add(), the newly added objects will show up in
# _added_during_commit. This sounds insane, but has actually
# happened.
self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
if oid in self._conflicts:
raise ReadConflictError(object=obj)
if obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed and oid not in self._creating:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
else:
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
# And new objects that are registered after any referrer are
# already processed.
continue
self._store_objects(ObjectWriter(obj), transaction)
for obj in self._added_during_commit:
self._store_objects(ObjectWriter(obj), transaction)
self._added_during_commit = None
def _store_objects(self, writer, transaction):
for obj in writer:
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
if ((serial == z64)
and
((self._savepoint_storage is None)
or (oid not in self._savepoint_storage.creating)
or self._savepoint_storage.creating[oid]
)
):
# obj is a new object
# Because obj was added, it is now in _creating, so it
# can be removed from _added. If oid wasn't in
# adding, then we are adding it implicitly.
implicitly_adding = self._added.pop(oid, None) is None
self._creating[oid] = implicitly_adding
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
if len(p) >= self.large_record_size:
warnings.warn(large_object_message % (obj.__class__, len(p)))
if isinstance(obj, Blob):
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Storing Blobs in %s is not supported." %
repr(self._storage))
if obj.opened():
raise ValueError("Can't commit with opened blobs.")
blobfilename = obj._uncommitted()
if blobfilename is None:
assert serial is not None # See _uncommitted
self._modified.pop() # not modified
continue
s = self._storage.storeBlob(oid, serial, p, blobfilename,
'', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, '', transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
try:
self._cache[oid] = obj
except:
# Dang, I bet it's wrapped:
# TODO: Deprecate, then remove, this.
if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
raise
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
self._handle_serial(oid, s)
def _handle_serial(self, oid, serial=ResolvedSerial, change=True):
# if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this.
self._readCurrent.pop(oid, None)
if not serial:
return
assert isinstance(serial, bytes), serial
obj = self._cache.get(oid, None)
if obj is None:
return
if serial == ResolvedSerial:
del obj._p_changed # transition from changed to ghost
else:
self._warn_about_returned_serial()
if change:
obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial
def tpc_abort(self, transaction):
if self._import:
self._import = None
if self._savepoint_storage is not None:
self._abort_savepoint()
self._storage.tpc_abort(transaction)
# Note: If we invalidate a non-ghostifiable object (i.e. a
# persistent class), the object will immediately reread its
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
self._cache.invalidate(self._modified)
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
if obj._p_changed:
obj._p_changed = False
del obj._p_oid
del obj._p_jar
self._tpc_cleanup()
def _invalidate_creating(self, creating=None):
"""Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = {}
for oid in creating:
o = self._cache.get(oid)
if o is not None:
del self._cache[oid]
if o._p_changed:
o._p_changed = False
del o._p_jar
del o._p_oid
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try:
vote = self._storage.tpc_vote
except AttributeError:
return
try:
s = vote(transaction)
except ReadConflictError as v:
if v.oid:
self._cache.invalidate(v.oid)
raise
if s:
if type(next(iter(s))) is bytes:
for oid in s:
self._handle_serial(oid)
return
self._warn_about_returned_serial()
for oid, serial in s:
self._handle_serial(oid, serial)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert type(serial) is bytes, repr(serial)
for oid_iterator in self._modified, self._creating:
for oid in oid_iterator:
obj = self._cache.get(oid)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
else:
self._warn_about_returned_serial()
self._tpc_cleanup()
def _warn_about_returned_serial(self):
# Do not warn about own implementations of ZODB.
# We're aware and the user can't do anything about it.
if self._normal_storage.__module__.startswith("_ZODB."):
self._warn_about_returned_serial = lambda: None
else:
warnings.warn(
"In ZODB 5+, the new API for the returned value of"
" store/tpc_vote/tpc_finish will be mandatory."
" See IStorage for more information.",
DeprecationWarning, 2)
Connection._warn_about_returned_serial = lambda self: None
def sortKey(self):
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
# Data manager (ISavepointDataManager) methods
##########################################################################
##########################################################################
# Transaction-manager synchronization -- ISynchronizer
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
# Call the underlying storage's sync() method (if any), and process
# pending invalidations regardless. Of course this should only be
# called at transaction boundaries.
def _storage_sync(self, *ignored):
self._readCurrent.clear()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations()
afterCompletion = _storage_sync
newTransaction = _storage_sync
# Transaction-manager synchronization -- ISynchronizer
##########################################################################
##########################################################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'."""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setstate(self, obj):
"""Turns the ghost 'obj' into a real object by loading its state from
the database."""
oid = obj._p_oid
if self.opened is None:
msg = ("Shouldn't load state for %s %s "
"when the connection is closed"
% (className(obj), oid_repr(oid)))
try:
raise ConnectionStateError(msg)
except:
self._log.exception(msg)
raise
try:
self._setstate(obj, oid)
except ConflictError:
raise
except:
self._log.exception("Couldn't load state for %s %s",
className(obj), oid_repr(oid))
raise
def _setstate(self, obj, oid):
# Helper for setstate(), which provides logging of failures.
# We accept the oid param, which must be the same as obj._p_oid,
# as a performance optimization for the pure-Python persistent implementation
# where accessing an attribute involves __getattribute__ calls
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, among the cases to consider:
# - Try MVCC
# - Raise ConflictError.
if self.before is not None:
# Load data that was current before the time we have.
before = self.before
t = self._storage.loadBefore(oid, before)
if t is None:
raise POSKeyError() # historical connection!
p, serial, end = t
else:
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (oid in self._invalidated):
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(oid, '')
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
# Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(oid, serial)
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start