/
interfaces.py
1520 lines (1152 loc) · 54.3 KB
/
interfaces.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interfaces provided by RelStorage database adapters"""
from __future__ import absolute_import
from ZODB.POSException import StorageError
from ZODB.POSException import ReadConflictError
from ZODB.POSException import ConflictError
from zope.interface import Attribute
from zope.interface import Interface
# pylint:disable=inherit-non-class,no-method-argument,no-self-argument
# pylint:disable=too-many-ancestors,too-many-lines
from relstorage.interfaces import Tuple
from relstorage.interfaces import Object
from relstorage.interfaces import Bool
from relstorage.interfaces import Factory
from relstorage.interfaces import IException
###
# Abstractions to support multiple databases.
###
class IDBDialect(Interface):
"""
Handles converting from our internal "standard" SQL queries to
something database specific.
"""
# TODO: Fill this in.
class IDBDriver(Interface):
"""
An abstraction over the information needed for RelStorage to work
with an arbitrary DB-API driver.
"""
__name__ = Attribute("The name of this driver")
disconnected_exceptions = Tuple(
description=(u"A tuple of exceptions this driver can raise on any operation if it is "
u"disconnected from the database."),
value_type=Factory(IException)
)
close_exceptions = Tuple(
description=(u"A tuple of exceptions that we can ignore when we try to "
u"close the connection to the database. Often this is the same "
u"or an extension of `disconnected_exceptions`."
u"These exceptions may also be ignored on rolling back the connection, "
u"if we are otherwise completely done with it and prepared to drop it. "),
value_type=Factory(IException),
)
lock_exceptions = Tuple(
description=u"A tuple of exceptions",
value_type=Factory(IException),
) # XXX: Document
use_replica_exceptions = Tuple(
description=(u"A tuple of exceptions raised by connecting "
u"that should cause us to try a replica."),
value_type=Factory(IException)
)
Binary = Attribute("A callable.")
dialect = Object(IDBDialect, description=u"The IDBDialect for this driver.")
cursor_arraysize = Attribute(
"The value to assign to each new cursor's ``arraysize`` attribute.")
def connect(*args, **kwargs):
"""
Create and return a new connection object.
This connection, and all objects created from it such as cursors,
should be used within a single thread only.
"""
def cursor(connection, server_side=False):
"""
Create and return a new cursor sharing the state of the given
*connection*.
The cursor should be closed when it is no longer needed. The
cursor should be considered forward-only (no backward
scrolling) and ephemeral (results go away when the attached
transaction is committed or rolled back).
For compatibility, previous cursors should not have
outstanding results pending when this is called and while the
returned cursor is used (not all drivers permit multiple
active cursors).
If *server_side* is true (not the default), request that the
driver creates a cursor that will **not** buffer the complete
results of a query on the client. Instead, the results should
be streamed from the server in batches. This can reduce the
maximum amount of memory needed to handle results, if done
carefully.
For compatibility, server_side cursors can only be used
to execute a single query.
Most drivers (``psycopg2``, ``psycopg2cffi``, ``pg8000``,
``mysqlclient``) default to buffering the entire results
client side before returning from the ``execute`` method. This
can reduce latency and increase overall throughput, but at the
cost of memory, especially if the results will be copied into
different data structures.
Not all drivers support server-side cursors; they will ignore
that request. At this writing, this includes ``pg8000``. Some
drivers (at this writing, only ``gevent MySQLdb``) always use
server-side cursors. The ``cx_Oracle`` driver is unevaluated.
``psycopg2`` and ``psycopg2cffi`` both iterate in chunks of
``cur.itersize`` by default. PyMySQL seems to iterate one row at a time.
``mysqlclient`` defaults to also iterating one row at a time, but
we patch that to operate in chunks of ``cur.arraysize``.
"""
def binary_column_as_state_type(db_column_data):
"""
Turn *db_column_data* into something that's a valid pickle
state.
Valid pickle states should be acceptable to
`io.BytesIO` and `pickle.UnPickler`.
*db_column_dat* came from a column of data declared to be of the
type that we store state information in (e.g., a BLOB on MySQL
or Oracle).
"""
def binary_column_as_bytes(db_column_data):
"""
Turn *db_column_data* into a `bytes` object.
Use this when the specific type must be known,
for example to prefix or suffix additional byte values
like that produced by `p64`.
"""
class IDBDriverFactory(Interface):
"""
Information about, and a way to get, an `IDBDriver`
implementation.
"""
driver_name = Attribute("The name of this driver produced by this factory.")
def check_availability():
"""
Return a boolean indicating whether a call to this factory
will return a driver (True) or will raise an error (False).
"""
def __call__(): # pylint:disable=signature-differs
"""
Return a new `IDBDriver` as represented by this factory.
If it is not possible to do this, for example because the
module cannot be imported, raise an `DriverNotAvailableError`.
"""
class DriverNotAvailableError(Exception):
"""
Raised when a requested driver isn't available.
"""
#: The name of the requested driver
driver_name = None
#: The `IDBDriverOptions` that was asked for the driver.
driver_options = None
def __init__(self, driver_name, driver_options=None):
super(DriverNotAvailableError, self).__init__(driver_name)
self.driver_name = driver_name
self.driver_options = driver_options
def _format_drivers(self):
driver_factories = getattr(self.driver_options,
'known_driver_factories',
lambda: ())()
return ' '.join(
'%r (Module: %r; Available: %s)' % (
factory.driver_name,
# This attribute isn't in the interface,
# it's an extension from AbstractModuleDriver
getattr(factory, 'MODULE_NAME', '<unknown>'),
factory.check_availability()
)
for factory in driver_factories
)
def __str__(self):
return '%s: Driver %r is not available. Options: %s.' % (
type(self).__name__, self.driver_name, self._format_drivers()
)
__repr__ = __str__
class UnknownDriverError(DriverNotAvailableError):
"""
Raised when a driver that isn't registered at all is requested.
"""
class NoDriversAvailableError(DriverNotAvailableError):
"""
Raised when there are no drivers available.
"""
def __init__(self, driver_name='auto', driver_options=None):
super(NoDriversAvailableError, self).__init__(driver_name, driver_options)
class IDBDriverOptions(Interface):
"""
Implemented by a module to provide alternative drivers.
"""
database_type = Attribute("A string naming the type of database. Informational only.")
def select_driver(driver_name=None):
"""
Choose and return an `IDBDriver`.
The *driver_name* of "auto" is equivalent to a *driver_name* of
`None` and means to choose the highest priority available driver.
"""
def known_driver_factories():
"""
Return an iterable of the potential `IDBDriverFactory`
objects that can be used by `select_driver`.
Each driver factory may or may not be available.
The driver factories are returned in priority order, with the highest priority
driver being first.
"""
###
# Creating and managing DB-API 2.0 connections.
# (https://www.python.org/dev/peps/pep-0249/)
###
class IConnectionManager(Interface):
"""
Open and close database connections.
This is a low-level interface; most operations should instead
use a pre-existing :class:`IManagedDBConnection`.
"""
isolation_load = Attribute("Default load isolation level.")
isolation_store = Attribute("Default store isolation level.")
isolation_read_committed = Attribute("Read committed.")
isolation_serializable = Attribute("Serializable.")
def open(
isolation=None,
deferrable=False,
read_only=False,
replica_selector=None,
application_name=None,
**kwargs):
"""Open a database connection and return (conn, cursor)."""
def close(conn=None, cursor=None):
"""
Close a connection and cursor, ignoring certain errors.
Return a True value if the connection was closed cleanly;
return a false value if an error was ignored.
"""
def rollback_and_close(conn, cursor):
"""
Rollback the connection and close it, ignoring certain errors.
Certain database drivers, such as MySQLdb using the SSCursor, require
all cursors to be closed before rolling back (otherwise it generates a
ProgrammingError: 2014 "Commands out of sync").
This method abstracts that.
:return: A true value if the connection was closed without ignoring any exceptions;
if an exception was ignored, returns a false value.
"""
def rollback(conn, cursor):
"""
Like `rollback_and_close`, but without the close, and letting
errors pass.
If an error does happen, then the connection and cursor are closed
before this method returns.
"""
def rollback_quietly(conn, cursor):
"""
Like `rollback_and_close`, but without the close.
:return: A true value if the connection was rolled back without ignoring any exceptions;
if an exception was ignored, returns a false value (and the connection and cursor
are closed before this method returns).
"""
def open_and_call(callback):
"""Call a function with an open connection and cursor.
If the function returns, commits the transaction and returns the
result returned by the function.
If the function raises an exception, aborts the transaction
then propagates the exception.
"""
def open_for_load():
"""
Open a connection for loading objects.
This connection is read only, and presents a consistent view
of the database as of the time the first statement is
executed. It should be opened in ``REPEATABLE READ`` or higher
isolation level. It must not be in autocommit.
:return: ``(conn, cursor)``
"""
def restart_load(conn, cursor, needs_rollback=True):
"""
Reinitialize a connection for loading objects.
This gets called when polling the database, so it needs to be
quick.
Raise one of self.disconnected_exceptions if the database has
disconnected.
"""
def open_for_store(**open_args):
"""
Open and initialize a connection for storing objects.
This connection is read/write, and its view of the database
needs to be consistent for each statement, but should read a
fresh snapshot on each statement for purposes of conflict
resolution and cooperation with other store connections. It
should be opened in ``READ COMMITTED`` isolation level,
without autocommit. (Opening in ``REPEATABLE READ`` or higher,
with a single snapshot, could reduce the use of locks, but
increases the risk of serialization errors and having
transactions rollback; we could handle that by raising
``ConflictError`` and letting the application retry, but only
if we did that before ``tpc_finish``, and not all test cases
can handle that either.)
This connection will take locks on rows in the state tables,
and hold them during the commit process.
A connection opened by this method is the only type of
connection that can hold the commit lock.
:return: ``(conn, cursor)``
"""
def restart_store(conn, cursor, needs_rollback=True):
"""
Rollback and reuse a store connection.
Raise one of self.disconnected_exceptions if the database
has disconnected.
You can set *needs_rollback* to false if you're certain
the connection does not need rolled back.
"""
def open_for_pre_pack():
"""
Open a connection to be used for the pre-pack phase.
This connection will make many different queries; each one
must be consistent unto itself, but they do not all have to be
consistent with each other. This is because the *first* query
this object makes establishes a base state, and we will
manually discard later changes seen in future queries.
It will read from the state tables and write to the pack tables;
it will not write to the state tables, nor hold the commit lock.
It may hold share locks on state rows temporarily.
This connection may be open for a long period of time, and
will be committed as appropriate between queries. It is
acceptable for this connection to be in autocommit mode, if
required, but it is preferred for it not to be. This should be
opened in ``READ COMMITTED`` isolation level.
:return: ``(conn, cursor)``
"""
def cursor_for_connection(conn):
"""
If the cursor returned by an open method was discarded
for state management purposes, use this to get a new cursor.
"""
def add_on_store_opened(f):
"""
Add a callable(cursor, restart=bool) for when a store connection
is opened.
.. versionadded:: 2.1a1
"""
def add_on_load_opened(f):
"""
Add a callable (cursor, restart=bool) for when a load connection is opened.
.. versionadded:: 2.1a1
"""
class IManagedDBConnection(Interface):
"""
A managed DB connection consists of a DB-API ``connection`` object
and a single DB-API ``cursor`` from that connection.
This encapsulates proper use of ``IConnectionManager``, including
handling disconnections and re-connecting at appropriate times.
It is not allowed to use multiple cursors from a connection at the
same time; not all drivers properly support that.
If the DB-API connection is not open, presumed to be good, and
previously accessed, this object has a false value.
"Restarting" a connection means to bring it to a current view of
the database. Typically this means a rollback so that a new
transaction can begin with a new MVCC snapshot.
"""
cursor = Attribute("The DB-API cursor to use. Read-only.")
connection = Attribute("The DB-API connection to use. Read-only.")
def __bool__():
"""
Return true if the database connection is believed to be ready to use.
"""
def __nonzero__():
"""
Same as __bool__ for Python 2.
"""
def drop():
"""
Unconditionally drop (close) the database connection.
"""
def rollback_quietly():
"""
Rollback the connection and return a true value on success.
When this completes, the connection will be in a neutral state,
not idle in a transaction.
If an error occurs during rollback, the connection is dropped
and a false value is returned.
"""
def isolated_connection():
"""
Context manager that opens a new, distinct connection and
returns its cursor.
No matter what happens in the ``with`` block, the connection will be
dropped afterwards.
"""
def restart_and_call(f, *args, **kw):
"""
Restart the connection (roll it back) and call a function
after doing this.
This may drop and re-connect the connection if necessary.
:param callable f:
The function to call: ``f(conn, cursor, *args, **kwargs)``.
May be called up to twice if it raises a disconnected exception
on the first try.
:return: The return value of ``f``.
"""
class IManagedLoadConnection(IManagedDBConnection):
"""
A managed connection intended for loading.
"""
class IManagedStoreConnection(IManagedDBConnection):
"""
A managed connection intended for storing data.
"""
class IReplicaSelector(Interface):
"""Selects a database replica"""
def current():
"""Get the current replica.
Return a string. For PostgreSQL and MySQL, the string is
either a host:port specification or host name. For Oracle,
the string is a DSN.
"""
def next():
"""Return the next replica to try.
Return None if there are no more replicas defined.
"""
class IDatabaseIterator(Interface):
"""Iterate over the available data in the database"""
def iter_objects(cursor, tid):
"""Iterate over object states in a transaction.
Yields (oid, prev_tid, state) for each object state.
"""
def iter_transactions(cursor):
"""
Iterate over the transaction log, newest first.
Skips packed transactions. Yields (tid, username, description,
extension) for each transaction.
"""
def iter_transactions_range(cursor, start=None, stop=None):
"""
Return an indexable object over the transactions in the given range, oldest
first.
Includes packed transactions.
Has an object with the properties ``tid_int``, ``username``
(bytes) ``description`` (bytes) ``extension`` (bytes) and
``packed`` (boolean) for each transaction.
"""
def iter_object_history(cursor, oid):
"""
Iterate over an object's history.
Yields an object with the properties ``tid_int``, ``username``
(bytes) ``description`` (bytes) ``extension`` (bytes) and
``pickle_size`` (int) for each transaction.
:raises KeyError: if the object does not exist
"""
class ILocker(Interface):
"""Acquire and release the commit and pack locks."""
def lock_current_objects(cursor, read_current_oid_ints, shared_locks_block):
"""
Lock the objects being modified in the current transaction
exclusively, plus the relevant rows for the objects whose OIDs
are contained in *read_current_oid_ints* with a read lock.
The exclusive locks should always be taken in a blocking fashion;
the shared read locks should be taken without blocking (raising an
exception if blocking would occur) if possible, unless *shared_locks_block*
is set to True.
See :meth:`IRelStorageAdapter.lock_objects_and_detect_conflicts`
for a description of the expected behaviour.
This should be done as part of the voting phase of TPC, before
taking out the final commit lock.
Returns nothing.
Typically this will be followed by a call to
:meth:`detect_conflict`.
"""
def hold_commit_lock(cursor, ensure_current=True, nowait=False):
"""
Acquire the commit lock.
If *ensure_current* is True (the default), other tables may be
locked as well, to ensure the most current data is available.
When using row level locks, *ensure_current* is always
implicit.
With *nowait* set to True, only try to obtain the lock without
waiting and return a boolean indicating if the lock was
successful. **Note:** this parameter is deprecated and will be removed
in the future; it is not currently used.
Should raise `UnableToAcquireCommitLockError` if the lock can not
be acquired before a configured timeout.
"""
def release_commit_lock(cursor):
"""Release the commit lock"""
def hold_pack_lock(cursor):
"""Try to acquire the pack lock.
Raise UnableToAcquirePackUndoLockError if packing or undo is already in progress.
"""
def release_pack_lock(cursor):
"""Release the pack lock."""
class IObjectMover(Interface):
"""Move object states to/from the database and within the database."""
def load_current(cursor, oid):
"""
Returns the current state and integer tid for an object.
*oid* is an integer. Returns (None, None) if object does not
exist.
"""
def load_currents(cursor, oids):
"""
Returns the oid integer, state, and integer tid for all the specified
objects.
*oids* is an iterable of integers. If any objects do no exist,
they are ignored.
"""
def load_revision(cursor, oid, tid):
"""Returns the state for an object on a particular transaction.
Returns None if no such state exists.
"""
def exists(cursor, oid):
"""Returns a true value if the given object exists."""
def load_before(cursor, oid, tid):
"""Returns the state and tid of an object before transaction tid.
Returns (None, None) if no earlier state exists.
"""
def get_object_tid_after(cursor, oid, tid):
"""Returns the tid of the next change after an object revision.
Returns None if no later state exists.
"""
def current_object_tids(cursor, oids):
"""
Returns the current {oid_int: tid_int} for specified object ids.
Note that this may be a BTree mapping, not a dictionary.
"""
def on_store_opened(cursor, restart=False):
"""Create the temporary table for storing objects.
This method may be None, meaning no store connection
initialization is required.
"""
def make_batcher(cursor, row_limit):
"""Return an object to be used for batch store operations.
*row_limit* is the maximum number of rows to queue before
calling the database.
"""
def store_temps(cursor, state_oid_tid_iter):
"""
Store many objects in the temporary table.
*batcher* is an object returned by :meth:`make_batcher`.
*state_oid_tid_iter* is an iterable providing tuples
``(data, oid_int, prev_tid_int)``. It is guaranteed that the
``oid_int`` values will be distinct. It is further guaranteed that
this method will not be called more than once in a given transaction;
further updates to the temporary table will be made using
``replace_temps``, which is also only called once.
"""
def restore(cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
batcher is an object returned by self.make_batcher().
"""
def detect_conflict(cursor):
"""
Find all conflicts in the data about to be committed.
If there is a conflict, returns a sequence of
``(oid, committed_tid, attempted_committed_tid, committed_state)``.
This method should be called during the ``tpc_vote`` phase of a transaction,
with :meth:`ILocker.lock_current_objects` held.
"""
def replace_temps(cursor, state_oid_tid_iter):
"""
Replace all objects in the temporary table with new data from
*state_oid_tid_iter*.
This happens after conflict resolution. The param is as for
``store_temps``.
Implementations should try to perform this in as few database operations
as possible.
"""
def move_from_temp(cursor, tid, txn_has_blobs):
"""
Move the temporarily stored objects to permanent storage.
*tid* is the integer tid of the transaction being committed.
Returns nothing.
The steps should be as follows:
- If we are preserving history, then ``INSERT`` into
``object_state`` the values stored in ``temp_store``,
remembering to coalesce the
``LENGTH(temp_store.state)``.
- Otherwise, when we are not preserving history,
``INSERT`` missing rows from ``object_state`` into
``temp_store``, and ``UPDATE`` rows that were already
there. (This is best done with an upsert). If blobs are
involved, then ``DELETE`` from ``blob_chunk`` where the
OID is in ``temp_store``.
- For both types of storage, ``INSERT`` into
``blob_chunk`` the values from ``temp_blob_chunk``. In a
history-free storage, this may be combined with the last
step in an ``UPSERT``.
"""
def update_current(cursor, tid):
"""
Update the current object pointers.
*tid* is the integer tid of the transaction being committed.
Returns nothing. This does nothing when the storage is history
free.
When the storage preserves history, all the objects in
``object_state`` having the given *tid* should have their
(oid, *tid*) stored into ``current_object``. This can be done
with a single upsert.
XXX: Why do we need to look at ``object_state``? Is there a
reason we can't look at the smaller ``temp_store``? Conflict
resolution maybe?
"""
def download_blob(cursor, oid, tid, filename):
"""Download a blob into a file.
Returns the size of the blob file in bytes.
"""
def upload_blob(cursor, oid, tid, filename):
"""Upload a blob from a file.
If tid is None, upload to the temporary table.
"""
class IOIDAllocator(Interface):
"""
Allocate OIDs and control future allocation.
The cursor passed here must be from a
:meth:`store connection <IConnectionManager.open_for_store>`.
"""
def new_oids(cursor):
"""
Return a new :class:`list` of new, unused integer OIDs.
The list should be contiguous and must be in sorted order from
highest to lowest. It must never contain 0.
"""
def set_min_oid(cursor, oid_int):
"""
Ensure the next OID (the rightmost value from
:meth:`new_oids`) is greater than the given *oid_int*.
"""
class IPackUndo(Interface):
"""Perform pack and undo operations"""
def verify_undoable(cursor, undo_tid):
"""Raise UndoError if it is not safe to undo the specified txn.
"""
def undo(cursor, undo_tid, self_tid):
"""Undo a transaction.
Parameters: "undo_tid", the integer tid of the transaction to undo,
and "self_tid", the integer tid of the current transaction.
Returns the states copied forward by the undo operation as a
list of (oid, old_tid).
May raise UndoError.
"""
def fill_object_refs(conn, cursor, get_references):
"""Update the object_refs table by analyzing new transactions.
"""
def choose_pack_transaction(pack_point):
"""Return the transaction before or at the specified pack time.
Returns None if there is nothing to pack.
"""
def pre_pack(pack_tid, get_references):
"""Decide what to pack.
pack_tid specifies the most recent transaction to pack.
get_references is a function that accepts a stored object state
and returns a set of OIDs that state refers to.
"""
def pack(pack_tid, packed_func=None):
"""Pack. Requires the information provided by pre_pack.
packed_func, if provided, will be called for every object state
packed, just after the object is removed. The function must
accept two parameters, oid and tid (64 bit integers).
"""
def deleteObject(cursor, oid_int, tid_int):
"""
Delete the revision of *oid_int* in transaction *tid_int*.
This method marks an object as deleted via a new object
revision. Subsequent attempts to load current data for the
object will fail with a POSKeyError, but loads for
non-current data will suceed if there are previous
non-delete records. The object will be removed from the
storage when all not-delete records are removed.
The serial argument must match the most recently committed
serial for the object. This is a seat belt.
--- Documentation for ``IExternalGC``
In history-free databases there is no such thing as a delete record, so
this should remove the single
revision of *oid_int* (which *should* be checked to verify it
is at *tid_int*), leading all access to *oid_int* in the
future to throw ``POSKeyError``.
In history preserving databases, this means to set the state for the object
at the transaction to NULL, signifying that it's been deleted. A subsequent
pack operation is required to actually remove these deleted items.
"""
class IPoller(Interface):
"""Poll for new data"""
def poll_invalidations(conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
conn and cursor must have been created previously by
open_for_load(). prev_polled_tid is the tid returned at the
last poll, or None if this is the first poll. If ignore_tid is
not None, changes committed in that transaction will not be
included in the list of changed OIDs.
If the database has disconnected, this method should raise one
of the exceptions listed in the disconnected_exceptions
attribute of the associated IConnectionManager.
Returns (changes, new_polled_tid), where changes is either an
iterable of (oid, tid) that have changed, or None to indicate that
the changes are too complex to list. new_polled_tid is never
None. Important: You must consume the changes iterable, and you must
not make any other queries until you do.
This method may raise :class:`StaleConnectionError` (a
``ReadConflictError``) if the database has reverted to an
earlier transaction, which can happen in an asynchronously
replicated database. This exception is one that is transient
and most transaction middleware will catch it and retry the
transaction.
"""
def list_changes(cursor, after_tid, last_tid):
"""
Return the ``(oid, tid)`` values changed in a range of
transactions.
The returned iterable (which is not guaranteed to have a
defined ``len``) must include the latest changes in the range
*after_tid* < ``tid`` <= *last_tid*.
The ``oid`` values returned will be distinct: each ``oid``
will have been changed in exactly one ``tid``.
Important: You must consume the returned iterable in its entirety,
and you must not make any other queries until you do.
"""
class ISchemaInstaller(Interface):
"""Install the schema in the database, clear it, or uninstall it"""
def prepare():
"""
Create the database schema if it does not already exist.
Perform any migration steps needed, and call :meth:`verify`
before returning.
"""
def verify():
"""
Ensure that the schema that's installed can be used by this
RelStorage.
If it cannot, for example it's history-preserving and we were configured
to be history-free, raise an exception.
"""
def zap_all():
"""Clear all data out of the database."""
def drop_all():
"""Drop all tables and sequences."""
class IScriptRunner(Interface):
"""Run database-agnostic SQL scripts.
Using an IScriptRunner is appropriate for batch operations and
uncommon operations that can be slow, but is not appropriate
for performance-critical code.
"""
script_vars = Attribute(
"""A mapping providing replacements for parts of scripts.
Used for making scripts compatible with databases using
different parameter styles.
""")
def run_script_stmt(cursor, generic_stmt, generic_params=()):
"""Execute a statement from a script with the given parameters.
generic_params should be either an empty tuple (no parameters) or
a map.
The input statement is generic and will be transformed