/
FileStorage.py
2115 lines (1794 loc) · 71.2 KB
/
FileStorage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Storage implementation using a log written to a single file.
"""
from __future__ import print_function
import binascii
import contextlib
import errno
import logging
import os
import threading
import time
from struct import pack
from struct import unpack
from persistent.TimeStamp import TimeStamp
from six import string_types as STRING_TYPES
from zc.lockfile import LockFile
from zope.interface import alsoProvides
from zope.interface import implementer
from ZODB.blob import BlobStorageMixin
from ZODB.blob import link_or_copy
from ZODB.blob import remove_committed
from ZODB.blob import remove_committed_dir
from ZODB.BaseStorage import BaseStorage
from ZODB.BaseStorage import DataRecord as _DataRecord
from ZODB.BaseStorage import TransactionRecord as _TransactionRecord
from ZODB.ConflictResolution import ConflictResolvingStorage
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.FileStorage.format import CorruptedDataError
from ZODB.FileStorage.format import CorruptedError
from ZODB.FileStorage.format import DATA_HDR
from ZODB.FileStorage.format import DATA_HDR_LEN
from ZODB.FileStorage.format import DataHeader
from ZODB.FileStorage.format import FileStorageFormatter
from ZODB.FileStorage.format import TRANS_HDR
from ZODB.FileStorage.format import TRANS_HDR_LEN
from ZODB.FileStorage.format import TxnHeader
from ZODB.FileStorage.fspack import FileStoragePacker
from ZODB.interfaces import IBlobStorageRestoreable
from ZODB.interfaces import IExternalGC
from ZODB.interfaces import IStorage
from ZODB.interfaces import IStorageCurrentRecordIteration
from ZODB.interfaces import IStorageIteration
from ZODB.interfaces import IStorageRestoreable
from ZODB.interfaces import IStorageUndoable
from ZODB.POSException import ConflictError
from ZODB.POSException import MultipleUndoErrors
from ZODB.POSException import POSKeyError
from ZODB.POSException import ReadOnlyError
from ZODB.POSException import StorageError
from ZODB.POSException import StorageSystemError
from ZODB.POSException import StorageTransactionError
from ZODB.POSException import UndoError
from ZODB.fsIndex import fsIndex
from ZODB.utils import as_bytes
from ZODB.utils import as_text
from ZODB.utils import cp
from ZODB.utils import mktemp
from ZODB.utils import p64
from ZODB.utils import u64
from ZODB.utils import z64
from ZODB._compat import Pickler
from ZODB._compat import loads
from ZODB._compat import decodebytes
from ZODB._compat import encodebytes
from ZODB._compat import _protocol
from ZODB._compat import FILESTORAGE_MAGIC
# Not all platforms have fsync
fsync = getattr(os, "fsync", None)
packed_version = FILESTORAGE_MAGIC
logger = logging.getLogger('ZODB.FileStorage')
def panic(message, *data):
logger.critical(message, *data)
raise CorruptedTransactionError(message % data)
class FileStorageError(StorageError):
pass
class PackError(FileStorageError):
pass
class FileStorageFormatError(FileStorageError):
"""Invalid file format
The format of the given file is not valid.
"""
class CorruptedFileStorageError(FileStorageError,
StorageSystemError):
"""Corrupted file storage."""
class CorruptedTransactionError(CorruptedFileStorageError):
pass
class FileStorageQuotaError(FileStorageError,
StorageSystemError):
"""File storage quota exceeded."""
# Intended to be raised only in fspack.py, and ignored here.
class RedundantPackWarning(FileStorageError):
pass
class TempFormatter(FileStorageFormatter):
"""Helper class used to read formatted FileStorage data."""
def __init__(self, afile):
self._file = afile
@implementer(
IStorage,
IStorageRestoreable,
IStorageIteration,
IStorageUndoable,
IStorageCurrentRecordIteration,
IExternalGC,
)
class FileStorage(
FileStorageFormatter,
BlobStorageMixin,
ConflictResolvingStorage,
BaseStorage,
):
# Set True while a pack is in progress; undo is blocked for the duration.
_pack_is_in_progress = False
def __init__(self, file_name, create=False, read_only=False, stop=None,
quota=None, pack_gc=True, pack_keep_old=True, packer=None,
blob_dir=None):
if read_only:
self._is_read_only = True
if create:
raise ValueError("can't create a read-only file")
elif stop is not None:
raise ValueError("time-travel only supported in read-only mode")
if stop is None:
stop = b'\377'*8
# Lock the database and set up the temp file.
if not read_only:
# Create the lock file
self._lock_file = LockFile(file_name + '.lock')
self._tfile = open(file_name + '.tmp', 'w+b')
self._tfmt = TempFormatter(self._tfile)
else:
self._tfile = None
self._file_name = os.path.abspath(file_name)
self._pack_gc = pack_gc
self.pack_keep_old = pack_keep_old
if packer is not None:
self.packer = packer
BaseStorage.__init__(self, file_name)
index, tindex = self._newIndexes()
self._initIndex(index, tindex)
# Now open the file
self._file = None
if not create:
try:
self._file = open(file_name, read_only and 'rb' or 'r+b')
except IOError as exc:
if exc.errno == errno.EFBIG:
# The file is too big to open. Fail visibly.
raise
if exc.errno == errno.ENOENT:
# The file doesn't exist. Create it.
create = 1
# If something else went wrong, it's hard to guess
# what the problem was. If the file does not exist,
# create it. Otherwise, fail.
if os.path.exists(file_name):
raise
else:
create = 1
if self._file is None and create:
if os.path.exists(file_name):
os.remove(file_name)
self._file = open(file_name, 'w+b')
self._file.write(packed_version)
self._files = FilePool(self._file_name)
r = self._restore_index()
if r is not None:
self._used_index = 1 # Marker for testing
index, start, ltid = r
self._initIndex(index, tindex)
self._pos, self._oid, tid = read_index(
self._file, file_name, index, tindex, stop,
ltid=ltid, start=start, read_only=read_only,
)
else:
self._used_index = 0 # Marker for testing
self._pos, self._oid, tid = read_index(
self._file, file_name, index, tindex, stop,
read_only=read_only,
)
self._save_index()
self._ltid = tid
# self._pos should always point just past the last
# transaction. During 2PC, data is written after _pos.
# invariant is restored at tpc_abort() or tpc_finish().
self._ts = tid = TimeStamp(tid)
t = time.time()
t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
if tid > t:
seconds = tid.timeTime() - t.timeTime()
complainer = logger.warning
if seconds > 30 * 60: # 30 minutes -- way screwed up
complainer = logger.critical
complainer("%s Database records %d seconds in the future",
file_name, seconds)
self._quota = quota
if blob_dir:
self.blob_dir = os.path.abspath(blob_dir)
if create and os.path.exists(self.blob_dir):
remove_committed_dir(self.blob_dir)
self._blob_init(blob_dir)
alsoProvides(self, IBlobStorageRestoreable)
else:
self.blob_dir = None
self._blob_init_no_blobs()
def copyTransactionsFrom(self, other):
if self.blob_dir:
return BlobStorageMixin.copyTransactionsFrom(self, other)
else:
return BaseStorage.copyTransactionsFrom(self, other)
def _initIndex(self, index, tindex):
self._index=index
self._tindex=tindex
self._index_get=index.get
def __len__(self):
return len(self._index)
def _newIndexes(self):
# hook to use something other than builtin dict
return fsIndex(), {}
_saved = 0
def _save_index(self):
"""Write the database index to a file to support quick startup."""
if self._is_read_only:
return
index_name = self.__name__ + '.index'
tmp_name = index_name + '.index_tmp'
self._index.save(self._pos, tmp_name)
try:
try:
os.remove(index_name)
except OSError:
pass
os.rename(tmp_name, index_name)
except: pass
self._saved += 1
def _clear_index(self):
index_name = self.__name__ + '.index'
if os.path.exists(index_name):
try:
os.remove(index_name)
except OSError:
pass
def _sane(self, index, pos):
"""Sanity check saved index data by reading the last undone trans
Basically, we read the last not undone transaction and
check to see that the included records are consistent
with the index. Any invalid record records or inconsistent
object positions cause zero to be returned.
"""
r = self._check_sanity(index, pos)
if not r:
logger.warning("Ignoring index for %s", self._file_name)
return r
def _check_sanity(self, index, pos):
if pos < 100:
return 0 # insane
self._file.seek(0, 2)
if self._file.tell() < pos:
return 0 # insane
ltid = None
max_checked = 5
checked = 0
while checked < max_checked:
self._file.seek(pos - 8)
rstl = self._file.read(8)
tl = u64(rstl)
pos = pos - tl - 8
if pos < 4:
return 0 # insane
h = self._read_txn_header(pos)
if not ltid:
ltid = h.tid
if h.tlen != tl:
return 0 # inconsistent lengths
if h.status == 'u':
continue # undone trans, search back
if h.status not in ' p':
return 0 # insane
if tl < h.headerlen():
return 0 # insane
tend = pos + tl
opos = pos + h.headerlen()
if opos == tend:
continue # empty trans
while opos < tend and checked < max_checked:
# Read the data records for this transaction
h = self._read_data_header(opos)
if opos + h.recordlen() > tend or h.tloc != pos:
return 0
if index.get(h.oid, 0) != opos:
return 0 # insane
checked += 1
opos = opos + h.recordlen()
return ltid
def _restore_index(self):
"""Load database index to support quick startup."""
# Returns (index, pos, tid), or None in case of error.
# The index returned is always an instance of fsIndex. If the
# index cached in the file is a Python dict, it's converted to
# fsIndex here, and, if we're not in read-only mode, the .index
# file is rewritten with the converted fsIndex so we don't need to
# convert it again the next time.
file_name=self.__name__
index_name=file_name+'.index'
if os.path.exists(index_name):
try:
info = fsIndex.load(index_name)
except:
logger.exception('loading index')
return None
else:
return None
index = info.get('index')
pos = info.get('pos')
if index is None or pos is None:
return None
pos = int(pos)
if (isinstance(index, dict) or
(isinstance(index, fsIndex) and
isinstance(index._data, dict))):
# Convert dictionary indexes to fsIndexes *or* convert fsIndexes
# which have a dict `_data` attribute to a new fsIndex (newer
# fsIndexes have an OOBTree as `_data`).
newindex = fsIndex()
newindex.update(index)
index = newindex
if not self._is_read_only:
# Save the converted index.
f = open(index_name, 'wb')
p = Pickler(f, _protocol)
info['index'] = index
p.dump(info)
f.close()
# Now call this method again to get the new data.
return self._restore_index()
tid = self._sane(index, pos)
if not tid:
return None
return index, pos, tid
def close(self):
self._file.close()
self._files.close()
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self._tfile:
self._tfile.close()
try:
self._save_index()
except:
# Log the error and continue
logger.exception("Error saving index on close()")
def getSize(self):
return self._pos
def _lookup_pos(self, oid):
try:
return self._index[oid]
except KeyError:
raise POSKeyError(oid)
except TypeError:
raise TypeError("invalid oid %r" % (oid,))
def load(self, oid, version=''):
"""Return pickle data and serial number."""
assert not version
with self._files.get() as _file:
pos = self._lookup_pos(oid)
h = self._read_data_header(pos, oid, _file)
if h.plen:
data = _file.read(h.plen)
return data, h.tid
elif h.back:
# Get the data from the backpointer, but tid from
# current txn.
data = self._loadBack_impl(oid, h.back, _file=_file)[0]
return data, h.tid
else:
raise POSKeyError(oid)
def loadSerial(self, oid, serial):
with self._lock:
pos = self._lookup_pos(oid)
while 1:
h = self._read_data_header(pos, oid)
if h.tid == serial:
break
pos = h.prev
if h.tid < serial or not pos:
raise POSKeyError(oid)
if h.plen:
return self._file.read(h.plen)
else:
return self._loadBack_impl(oid, h.back)[0]
def loadBefore(self, oid, tid):
with self._files.get() as _file:
pos = self._lookup_pos(oid)
end_tid = None
while True:
h = self._read_data_header(pos, oid, _file)
if h.tid < tid:
break
pos = h.prev
end_tid = h.tid
if not pos:
return None
if h.back:
data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
return data, h.tid, end_tid
else:
return _file.read(h.plen), h.tid, end_tid
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise ReadOnlyError()
if transaction is not self._transaction:
raise StorageTransactionError(self, transaction)
assert not version
with self._lock:
if oid > self._oid:
self.set_max_oid(oid)
old = self._index_get(oid, 0)
committed_tid = None
pnv = None
if old:
h = self._read_data_header(old, oid)
committed_tid = h.tid
if oldserial != committed_tid:
data = self.tryToResolveConflict(oid, committed_tid,
oldserial, data)
pos = self._pos
here = pos + self._tfile.tell() + self._thl
self._tindex[oid] = here
new = DataHeader(oid, self._tid, old, pos, 0, len(data))
self._tfile.write(new.asString())
self._tfile.write(data)
# Check quota
if self._quota is not None and here > self._quota:
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
if old and oldserial != committed_tid:
return ResolvedSerial
else:
return self._tid
def deleteObject(self, oid, oldserial, transaction):
if self._is_read_only:
raise ReadOnlyError()
if transaction is not self._transaction:
raise StorageTransactionError(self, transaction)
with self._lock:
old = self._index_get(oid, 0)
if not old:
raise POSKeyError(oid)
h = self._read_data_header(old, oid)
committed_tid = h.tid
if oldserial != committed_tid:
raise ConflictError(
oid=oid, serials=(committed_tid, oldserial))
pos = self._pos
here = pos + self._tfile.tell() + self._thl
self._tindex[oid] = here
new = DataHeader(oid, self._tid, old, pos, 0, 0)
self._tfile.write(new.asString())
self._tfile.write(z64)
# Check quota
if self._quota is not None and here > self._quota:
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
def _data_find(self, tpos, oid, data):
# Return backpointer for oid. Must call with the lock held.
# This is a file offset to oid's data record if found, else 0.
# The data records in the transaction at tpos are searched for oid.
# If a data record for oid isn't found, returns 0.
# Else if oid's data record contains a backpointer, that
# backpointer is returned.
# Else oid's data record contains the data, and the file offset of
# oid's data record is returned. This data record should contain
# a pickle identical to the 'data' argument.
# Unclear: If the length of the stored data doesn't match len(data),
# an exception is raised. If the lengths match but the data isn't
# the same, 0 is returned. Why the discrepancy?
self._file.seek(tpos)
h = self._file.read(TRANS_HDR_LEN)
tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
status = as_text(status)
self._file.read(ul + dl + el)
tend = tpos + tl + 8
pos = self._file.tell()
while pos < tend:
h = self._read_data_header(pos)
if h.oid == oid:
# Make sure this looks like the right data record
if h.plen == 0:
# This is also a backpointer. Gotta trust it.
return pos
if h.plen != len(data):
# The expected data doesn't match what's in the
# backpointer. Something is wrong.
logger.error("Mismatch between data and"
" backpointer at %d", pos)
return 0
_data = self._file.read(h.plen)
if data != _data:
return 0
return pos
pos += h.recordlen()
self._file.seek(pos)
return 0
def restore(self, oid, serial, data, version, prev_txn, transaction):
# A lot like store() but without all the consistency checks. This
# should only be used when we /know/ the data is good, hence the
# method name. While the signature looks like store() there are some
# differences:
#
# - serial is the serial number of /this/ revision, not of the
# previous revision. It is used instead of self._tid, which is
# ignored.
#
# - Nothing is returned
#
# - data can be None, which indicates a George Bailey object
# (i.e. one who's creation has been transactionally undone).
#
# prev_txn is a backpointer. In the original database, it's possible
# that the data was actually living in a previous transaction. This
# can happen for transactional undo and other operations, and is used
# as a space saving optimization. Under some circumstances the
# prev_txn may not actually exist in the target database (i.e. self)
# for example, if it's been packed away. In that case, the prev_txn
# should be considered just a hint, and is ignored if the transaction
# doesn't exist.
if self._is_read_only:
raise ReadOnlyError()
if transaction is not self._transaction:
raise StorageTransactionError(self, transaction)
if version:
raise TypeError("Versions are no-longer supported")
with self._lock:
if oid > self._oid:
self.set_max_oid(oid)
prev_pos = 0
if prev_txn is not None:
prev_txn_pos = self._txn_find(prev_txn, 0)
if prev_txn_pos:
prev_pos = self._data_find(prev_txn_pos, oid, data)
old = self._index_get(oid, 0)
# Calculate the file position in the temporary file
here = self._pos + self._tfile.tell() + self._thl
# And update the temp file index
self._tindex[oid] = here
if prev_pos:
# If there is a valid prev_pos, don't write data.
data = None
if data is None:
dlen = 0
else:
dlen = len(data)
# Write the recovery data record
new = DataHeader(oid, serial, old, self._pos, 0, dlen)
self._tfile.write(new.asString())
# Finally, write the data or a backpointer.
if data is None:
if prev_pos:
self._tfile.write(p64(prev_pos))
else:
# Write a zero backpointer, which indicates an
# un-creation transaction.
self._tfile.write(z64)
else:
self._tfile.write(data)
def supportsUndo(self):
return 1
def _clear_temp(self):
self._tindex.clear()
if self._tfile is not None:
self._tfile.seek(0)
def _begin(self, tid, u, d, e):
self._nextpos = 0
self._thl = TRANS_HDR_LEN + len(u) + len(d) + len(e)
if self._thl > 65535:
# one of u, d, or e may be > 65535
# We have to check lengths here because struct.pack
# doesn't raise an exception on overflow!
if len(u) > 65535:
raise FileStorageError('user name too long')
if len(d) > 65535:
raise FileStorageError('description too long')
if len(e) > 65535:
raise FileStorageError('too much extension data')
def tpc_vote(self, transaction):
with self._lock:
if transaction is not self._transaction:
raise StorageTransactionError(
"tpc_vote called with wrong transaction")
dlen = self._tfile.tell()
if not dlen:
return # No data in this trans
self._tfile.seek(0)
user, descr, ext = self._ude
self._file.seek(self._pos)
tl = self._thl + dlen
try:
h = TxnHeader(self._tid, tl, "c", len(user),
len(descr), len(ext))
h.user = user
h.descr = descr
h.ext = ext
self._file.write(h.asString())
cp(self._tfile, self._file, dlen)
self._file.write(p64(tl))
self._file.flush()
except:
# Hm, an error occurred writing out the data. Maybe the
# disk is full. We don't want any turd at the end.
self._file.truncate(self._pos)
self._files.flush()
raise
self._nextpos = self._pos + (tl + 8)
def tpc_finish(self, transaction, f=None):
with self._files.write_lock():
with self._lock:
if transaction is not self._transaction:
raise StorageTransactionError(
"tpc_finish called with wrong transaction")
try:
if f is not None:
f(self._tid)
u, d, e = self._ude
self._finish(self._tid, u, d, e)
self._clear_temp()
finally:
self._ude = None
self._transaction = None
self._commit_lock_release()
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
# data, so we don't bother writing anything to the file.
if self._nextpos:
# Clear the checkpoint flag
self._file.seek(self._pos+16)
self._file.write(as_bytes(self._tstatus))
try:
# At this point, we may have committed the data to disk.
# If we fail from here, we're in bad shape.
self._finish_finish(tid)
except:
# Ouch. This is bad. Let's try to get back to where we were
# and then roll over and die
logger.critical("Failure in _finish. Closing.", exc_info=True)
self.close()
raise
def _finish_finish(self, tid):
# This is a separate method to allow tests to replace it with
# something broken. :)
self._file.flush()
if fsync is not None:
fsync(self._file.fileno())
self._pos = self._nextpos
self._index.update(self._tindex)
self._ltid = tid
self._blob_tpc_finish()
def _abort(self):
if self._nextpos:
self._file.truncate(self._pos)
self._files.flush()
self._nextpos=0
self._blob_tpc_abort()
def _undoDataInfo(self, oid, pos, tpos):
"""Return the tid, data pointer, and data for the oid record at pos
"""
if tpos:
pos = tpos - self._pos - self._thl
tpos = self._tfile.tell()
h = self._tfmt._read_data_header(pos, oid)
afile = self._tfile
else:
h = self._read_data_header(pos, oid)
afile = self._file
if h.oid != oid:
raise UndoError("Invalid undo transaction id", oid)
if h.plen:
data = afile.read(h.plen)
else:
data = ''
pos = h.back
if tpos:
self._tfile.seek(tpos) # Restore temp file to end
return h.tid, pos, data
def getTid(self, oid):
with self._lock:
pos = self._lookup_pos(oid)
h = self._read_data_header(pos, oid)
if h.plen == 0 and h.back == 0:
# Undone creation
raise POSKeyError(oid)
return h.tid
def _transactionalUndoRecord(self, oid, pos, tid, pre):
"""Get the undo information for a data record
'pos' points to the data header for 'oid' in the transaction
being undone. 'tid' refers to the transaction being undone.
'pre' is the 'prev' field of the same data header.
Return a 3-tuple consisting of a pickle, data pointer, and
current position. If the pickle is true, then the data
pointer must be 0, but the pickle can be empty *and* the
pointer 0.
"""
copy = 1 # Can we just copy a data pointer
# First check if it is possible to undo this record.
tpos = self._tindex.get(oid, 0)
ipos = self._index.get(oid, 0)
tipos = tpos or ipos
if tipos != pos:
# Eek, a later transaction modified the data, but,
# maybe it is pointing at the same data we are.
ctid, cdataptr, cdata = self._undoDataInfo(oid, ipos, tpos)
if cdataptr != pos:
# We aren't sure if we are talking about the same data
try:
if (
# The current record wrote a new pickle
cdataptr == tipos
or
# Backpointers are different
self._loadBackPOS(oid, pos) !=
self._loadBackPOS(oid, cdataptr)
):
if pre and not tpos:
copy = 0 # we'll try to do conflict resolution
else:
# We bail if:
# - We don't have a previous record, which should
# be impossible.
raise UndoError("no previous record", oid)
except KeyError:
# LoadBack gave us a key error. Bail.
raise UndoError("_loadBack() failed", oid)
# Return the data that should be written in the undo record.
if not pre:
# There is no previous revision, because the object creation
# is being undone.
return "", 0, ipos
if copy:
# we can just copy our previous-record pointer forward
return "", pre, ipos
try:
bdata = self._loadBack_impl(oid, pre)[0]
except KeyError:
# couldn't find oid; what's the real explanation for this?
raise UndoError("_loadBack() failed for %s", oid)
try:
data = self.tryToResolveConflict(oid, ctid, tid, bdata, cdata)
return data, 0, ipos
except ConflictError:
pass
raise UndoError("Some data were modified by a later transaction", oid)
# undoLog() returns a description dict that includes an id entry.
# The id is opaque to the client, but contains the transaction id.
# The transactionalUndo() implementation does a simple linear
# search through the file (from the end) to find the transaction.
def undoLog(self, first=0, last=-20, filter=None):
if last < 0:
# -last is supposed to be the max # of transactions. Convert to
# a positive index. Should have x - first = -last, which
# means x = first - last. This is spelled out here because
# the normalization code was incorrect for years (used +1
# instead -- off by 1), until ZODB 3.4.
last = first - last
with self._lock:
if self._pack_is_in_progress:
raise UndoError(
'Undo is currently disabled for database maintenance.<p>')
us = UndoSearch(self._file, self._pos, first, last, filter)
while not us.finished():
# Hold lock for batches of 20 searches, so default search
# parameters will finish without letting another thread run.
for i in range(20):
if us.finished():
break
us.search()
# Give another thread a chance, so that a long undoLog()
# operation doesn't block all other activity.
self._lock_release()
self._lock_acquire()
return us.results
def undo(self, transaction_id, transaction):
"""Undo a transaction, given by transaction_id.
Do so by writing new data that reverses the action taken by
the transaction.
Usually, we can get by with just copying a data pointer, by
writing a file position rather than a pickle. Sometimes, we
may do conflict resolution, in which case we actually copy
new data that results from resolution.
"""
if self._is_read_only:
raise ReadOnlyError()
if transaction is not self._transaction:
raise StorageTransactionError(self, transaction)
with self._lock:
# Find the right transaction to undo and call _txn_undo_write().
tid = decodebytes(transaction_id + b'\n')
assert len(tid) == 8
tpos = self._txn_find(tid, 1)
tindex = self._txn_undo_write(tpos)
self._tindex.update(tindex)
return self._tid, tindex.keys()
def _txn_find(self, tid, stop_at_pack):
pos = self._pos
while pos > 39:
self._file.seek(pos - 8)
pos = pos - u64(self._file.read(8)) - 8
self._file.seek(pos)
h = self._file.read(TRANS_HDR_LEN)
_tid = h[:8]
if _tid == tid:
return pos
if stop_at_pack:
# check the status field of the transaction header
if h[16] == b'p':
break
raise UndoError("Invalid transaction id")
def _txn_undo_write(self, tpos):
# a helper function to write the data records for transactional undo
otloc = self._pos
here = self._pos + self._tfile.tell() + self._thl
base = here - self._tfile.tell()
# Let's move the file pointer back to the start of the txn record.
th = self._read_txn_header(tpos)
if th.status != " ":
raise UndoError('non-undoable transaction')
tend = tpos + th.tlen
pos = tpos + th.headerlen()
tindex = {}
# keep track of failures, cause we may succeed later
failures = {}
# Read the data records for this transaction
while pos < tend:
h = self._read_data_header(pos)
if h.oid in failures:
del failures[h.oid] # second chance!
assert base + self._tfile.tell() == here, (here, base,
self._tfile.tell())
try:
p, prev, ipos = self._transactionalUndoRecord(
h.oid, pos, h.tid, h.prev)
except UndoError as v:
# Don't fail right away. We may be redeemed later!
failures[h.oid] = v
else:
if self.blob_dir and not p and prev:
try:
up, userial = self._loadBackTxn(h.oid, prev)
except POSKeyError:
pass # It was removed, so no need to copy data
else:
if self.is_blob_record(up):
# We're undoing a blob modification operation.