/
_manager.py
153 lines (124 loc) · 5.01 KB
/
_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""A TransactionManager controls transaction boundaries.
It coordinates application code and resource managers, so that they
are associated with the right transaction.
"""
import thread
import weakref
from transaction._transaction import Transaction
# We have to remember sets of synch objects, especially Connections.
# But we don't want mere registration with a transaction manager to
# keep a synch object alive forever; in particular, it's common
# practice not to explicitly close Connection objects, and keeping
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
#
# Therefore we use "weak sets" internally. The implementation here
# implements just enough of Python's sets.Set interface for our needs.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# Return a list of all the objects in the collection.
# Because a weak dict is used internally, iteration
# is dicey (the underlying dict may change size during
# iteration, due to gc or activity from other threads).
# as_list() attempts to be safe.
def as_list(self):
return self.data.values()
class TransactionManager(object):
def __init__(self):
self._txn = None
self._synchs = WeakSet()
def begin(self):
if self._txn is not None:
self._txn.abort()
self._txn = Transaction(self._synchs.as_list(), self)
return self._txn
def get(self):
if self._txn is None:
self._txn = Transaction(self._synchs.as_list(), self)
return self._txn
def free(self, txn):
assert txn is self._txn
self._txn = None
def registerSynch(self, synch):
self._synchs.add(synch)
def unregisterSynch(self, synch):
self._synchs.remove(synch)
def commit(self, sub=False):
self.get().commit(sub)
def abort(self, sub=False):
self.get().abort(sub)
class ThreadTransactionManager(TransactionManager):
"""Thread-aware transaction manager.
Each thread is associated with a unique transaction.
"""
def __init__(self):
# _threads maps thread ids to transactions
self._txns = {}
# _synchs maps a thread id to a WeakSet of registered synchronizers.
# The set elements are passed to the Transaction constructor,
# because the latter needs to call the synchronizers when it commits.
self._synchs = {}
def begin(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is not None:
txn.abort()
synchs = self._synchs.get(tid)
if synchs is not None:
synchs = synchs.as_list()
txn = self._txns[tid] = Transaction(synchs, self)
return txn
def get(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is None:
synchs = self._synchs.get(tid)
if synchs is not None:
synchs = synchs.as_list()
txn = self._txns[tid] = Transaction(synchs, self)
return txn
def free(self, txn):
tid = thread.get_ident()
assert txn is self._txns.get(tid)
del self._txns[tid]
def registerSynch(self, synch):
tid = thread.get_ident()
ws = self._synchs.get(tid)
if ws is None:
ws = self._synchs[tid] = WeakSet()
ws.add(synch)
def unregisterSynch(self, synch):
tid = thread.get_ident()
ws = self._synchs[tid]
ws.remove(synch)