/
IteratorStorage.py
251 lines (224 loc) · 9.81 KB
/
IteratorStorage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run tests against the iterator() interface for storages.
Any storage that supports the iterator() method should be able to pass
all these tests.
"""
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.utils import U64, p64, load_current
from transaction import Transaction
import sys
import ZODB.blob
try:
from itertools import izip as zip
except ImportError:
# Py3: zip() already returns an iterable.
pass
class IteratorCompare:
def iter_verify(self, txniter, revids, val0):
eq = self.assertEqual
oid = self._oid
val = val0
for reciter, revid in zip(txniter, revids + [None]):
eq(reciter.tid, revid)
for rec in reciter:
eq(rec.oid, oid)
eq(rec.tid, revid)
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
eq(val, val0 + len(revids))
class IteratorStorage(IteratorCompare):
def checkSimpleIteration(self):
# Store a bunch of revisions of a single object
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now iterate over all the transactions and compare carefully
txniter = self._storage.iterator()
self.iter_verify(txniter, [revid1, revid2, revid3], 11)
def checkUndoZombie(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(94))
# Get the undo information
info = self._storage.undoInfo()
tid = info[0]['id']
# Undo the creation of the object, rendering it a zombie
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.undo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Now attempt to iterator over the storage
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
# The last transaction performed an undo of the transaction that
# created object oid. (As Barry points out, the object is now in the
# George Bailey state.) Assert that the final data record contains
# None in the data attribute.
self.assertEqual(rec.oid, oid)
self.assertEqual(rec.data, None)
def checkTransactionExtensionFromIterator(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
iter = self._storage.iterator()
count = 0
for txn in iter:
self.assertEqual(txn.extension, {})
count +=1
self.assertEqual(count, 1)
def checkIterationIntraTransaction(self):
# TODO: Try this test with logging enabled. If you see something
# like
#
# ZODB FS FS21 warn: FileStorageTests.fs truncated, possibly due to
# damaged records at 4
#
# Then the code in FileIterator.next() hasn't yet been fixed.
# Should automate that check.
oid = self._storage.new_oid()
t = Transaction()
data = zodb_pickle(MinPO(0))
try:
self._storage.tpc_begin(t)
self._storage.store(oid, '\0'*8, data, '', t)
self._storage.tpc_vote(t)
# Don't do tpc_finish yet
it = self._storage.iterator()
for x in it:
pass
finally:
self._storage.tpc_finish(t)
def checkLoad_was_checkLoadEx(self):
oid = self._storage.new_oid()
self._dostore(oid, data=42)
data, tid = load_current(self._storage, oid)
self.assertEqual(zodb_unpickle(data), MinPO(42))
match = False
for txn in self._storage.iterator():
for rec in txn:
if rec.oid == oid and rec.tid == tid:
self.assertEqual(txn.tid, tid)
match = True
if not match:
self.fail("Could not find transaction with matching id")
def checkIterateRepeatedly(self):
self._dostore()
transactions = self._storage.iterator()
self.assertEqual(1, len(list(transactions)))
# The iterator can only be consumed once:
self.assertEqual(0, len(list(transactions)))
def checkIterateRecordsRepeatedly(self):
self._dostore()
it = self._storage.iterator()
tinfo = next(it)
self.assertEqual(1, len(list(tinfo)))
self.assertEqual(1, len(list(tinfo)))
if hasattr(it, 'close'):
it.close()
def checkIterateWhileWriting(self):
self._dostore()
iterator = self._storage.iterator()
# We have one transaction with 1 modified object.
txn_1 = next(iterator)
self.assertEqual(1, len(list(txn_1)))
# We store another transaction with 1 object, the already running
# iterator does not pick this up.
self._dostore()
if sys.version_info[0] < 3:
self.assertRaises(StopIteration, iterator.next)
else:
self.assertRaises(StopIteration, iterator.__next__)
class ExtendedIteratorStorage(IteratorCompare):
def checkExtendedIteration(self):
# Store a bunch of revisions of a single object
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14))
# Note that the end points are included
# Iterate over all of the transactions with explicit start/stop
txniter = self._storage.iterator(revid1, revid4)
self.iter_verify(txniter, [revid1, revid2, revid3, revid4], 11)
# Iterate over some of the transactions with explicit start
txniter = self._storage.iterator(revid3)
self.iter_verify(txniter, [revid3, revid4], 13)
# Iterate over some of the transactions with explicit stop
txniter = self._storage.iterator(None, revid2)
self.iter_verify(txniter, [revid1, revid2], 11)
# Iterate over some of the transactions with explicit start+stop
txniter = self._storage.iterator(revid2, revid3)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify an upper bound somewhere in between values
revid3a = p64((U64(revid3) + U64(revid4)) // 2)
txniter = self._storage.iterator(revid2, revid3a)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify a lower bound somewhere in between values.
# revid2 == revid1+1 is very likely on Windows. Adding 1 before
# dividing ensures that "the midpoint" we compute is strictly larger
# than revid1.
revid1a = p64((U64(revid1) + 1 + U64(revid2)) // 2)
assert revid1 < revid1a
txniter = self._storage.iterator(revid1a, revid3a)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify an empty range
txniter = self._storage.iterator(revid3, revid2)
self.iter_verify(txniter, [], 13)
# Specify a singleton range
txniter = self._storage.iterator(revid3, revid3)
self.iter_verify(txniter, [revid3], 13)
class IteratorDeepCompare:
def compare(self, storage1, storage2):
eq = self.assertEqual
iter1 = storage1.iterator()
iter2 = storage2.iterator()
for txn1, txn2 in zip(iter1, iter2):
eq(txn1.tid, txn2.tid)
eq(txn1.status, txn2.status)
eq(txn1.user, txn2.user)
eq(txn1.description, txn2.description)
eq(txn1.extension, txn2.extension)
itxn1 = iter(txn1)
itxn2 = iter(txn2)
for rec1, rec2 in zip(itxn1, itxn2):
eq(rec1.oid, rec2.oid)
eq(rec1.tid, rec2.tid)
eq(rec1.data, rec2.data)
if ZODB.blob.is_blob_record(rec1.data):
try:
fn1 = storage1.loadBlob(rec1.oid, rec1.tid)
except ZODB.POSException.POSKeyError:
self.assertRaises(
ZODB.POSException.POSKeyError,
storage2.loadBlob, rec1.oid, rec1.tid)
else:
fn2 = storage2.loadBlob(rec1.oid, rec1.tid)
self.assertTrue(fn1 != fn2)
with open(fn1, 'rb') as fp1:
with open(fn2, 'rb') as fp2:
eq(fp1.read(), fp2.read())
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
# Additionally, check that we're backwards compatible to the
# IndexError we used to raise before.
self.assertRaises(StopIteration, next, itxn1)
self.assertRaises(StopIteration, next, itxn2)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self.assertRaises(StopIteration, next, iter1)
self.assertRaises(StopIteration, next, iter2)