/
testpostgresql.py
312 lines (255 loc) · 10.5 KB
/
testpostgresql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
##############################################################################
#
# Copyright (c) 2008 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of relstorage.adapters.postgresql"""
from __future__ import absolute_import
import logging
import time
import unittest
from ZODB.tests import StorageTestBase
from relstorage._util import timestamp_at_unixtime
from relstorage.storage import bytes8_to_int64
from relstorage.adapters.postgresql import PostgreSQLAdapter
from .util import AbstractTestSuiteBuilder
from .util import DEFAULT_DATABASE_SERVER_HOST
from . import StorageCreatingMixin
from . import TestCase
class PostgreSQLAdapterMixin(object):
def make_adapter(self, options, db=None):
return PostgreSQLAdapter(
dsn=self.__get_adapter_zconfig_dsn(db),
options=options,
)
def get_adapter_class(self):
return PostgreSQLAdapter
def __get_adapter_zconfig_dsn(self, dbname=None):
if dbname is None:
if self.keep_history:
dbname = self.base_dbname
else:
dbname = self.base_dbname + '_hf'
dsn = (
"dbname='%s' user='relstoragetest' password='relstoragetest' host='%s'"
% (dbname, DEFAULT_DATABASE_SERVER_HOST)
)
return dsn
def get_adapter_zconfig(self):
return u"""
<postgresql>
driver %s
dsn %s
</postgresql>
""" % (
self.driver_name,
self.__get_adapter_zconfig_dsn()
)
def verify_adapter_from_zconfig(self, adapter):
self.assertEqual(adapter._dsn, self.__get_adapter_zconfig_dsn())
class TestBlobFunctionality(
PostgreSQLAdapterMixin,
StorageCreatingMixin,
TestCase,
StorageTestBase.StorageTestBase):
# pylint:disable=too-many-ancestors
def test_merge_blobs_on_open(self):
from ZODB.DB import DB
from ZODB.blob import Blob
import transaction
storage = self._closing(self.make_storage(
blob_dir='blobs', shared_blob_dir=False))
db = self._closing(DB(storage))
conn = db.open()
blob = Blob()
base_chunk = b"This is my base blob."
with blob.open('w') as f:
f.write(base_chunk)
conn.root().blob = blob
transaction.commit()
# Insert some extra chunks. Get them big to be sure we loop
# properly
second_chunk = b'second chunk' * 800
with conn._storage._store_connection_pool.borrowing(commit=True) as store_connection:
cursor = store_connection.cursor
cursor.execute("""
INSERT INTO blob_chunk (zoid, chunk_num, tid, chunk)
SELECT zoid, 1, tid, lo_from_bytea(0, %s)
FROM blob_chunk WHERE chunk_num = 0;
""", (second_chunk,))
third_chunk = b'third chunk' * 900
cursor.execute("""
INSERT INTO blob_chunk (zoid, chunk_num, tid, chunk)
SELECT zoid, 2, tid, lo_from_bytea(0, %s)
FROM blob_chunk WHERE chunk_num = 0;
""", (third_chunk,))
cursor.execute('SELECT COUNT(*) FROM blob_chunk')
self.assertEqual(3, cursor.fetchone()[0])
# Now open again and find everything put together.
# But we need to use a new blob dir, because
# we changed data behind its back.
conn.close()
db.close()
storage = self._closing(self.make_storage(blob_dir='blobs2',
shared_blob_dir=False,
zap=False))
db = self._closing(DB(storage))
conn = db.open()
blob = conn.root().blob
with blob.open('r') as f:
data = f.read()
cursor = conn._storage._load_connection.cursor
cursor.execute('SELECT COUNT(*) FROM blob_chunk')
self.assertEqual(1, cursor.fetchone()[0])
self.assertEqual(data, base_chunk + second_chunk + third_chunk)
conn.close()
db.close()
REALLY_EXHAUST_SHARED_MEMORY = False
def test_zapping_with_many_blobs(self):
# https://github.com/zodb/relstorage/issues/468
# If a database has many blobs (more than 4600 by default)
# it couldn't be zapped.
from ZODB.DB import DB
from ZODB.blob import Blob
import transaction
storage = self._closing(self.make_storage(
blob_dir='blobs', shared_blob_dir=False))
db = self._closing(DB(storage))
conn = db.open()
if self.REALLY_EXHAUST_SHARED_MEMORY: # pragma: no cover
# NOTE: When actually testing the shared memory exhaustion,
# this test is slow; it takes about 45s with default
# server settings and psycopg2, and 1:50 under pg8000
# First, figure out how many blobs we need to create to exceed
# the limit and fail: max_locks_per_transaction * max_connections
cursor = conn._storage._load_connection.cursor
cursor.execute("SELECT CURRENT_SETTING('max_locks_per_transaction')")
max_locks = cursor.fetchall()[0][0]
cursor.execute("SELECT CURRENT_SETTING('max_connections')")
max_conn = cursor.fetchall()[0][0]
# max_locks * max_conn is the documented limit of the locks,
# but it seems to actually be memory based. For example, with
# max_locks = 64 (the default) and max_conn = 300 (3x the default)
# we calculate a max_blobs of 19,200. And the server easily handles that.
# However, dropping down to max_conn = 100 (the default), the server
# fails to zap the 19,200 blobs, though it does zap the 6,400 blobs fine.
# Hence the final * 3
max_blobs = int(max_locks) * int(max_conn) * 3
else:
# Choose a number to let us loop a few times.
max_blobs = 3523
blobs = []
for i in range(max_blobs):
blob = Blob()
with blob.open('w') as f:
data = str(i)
if not isinstance(data, bytes):
data = data.encode('ascii')
f.write(data)
blobs.append(blob)
conn.root().blobs = blobs
transaction.commit()
conn.close()
# Now zop, being sure to use the fast method that originally trigged
# this bug.
storage.zap_all(slow=False)
cursor = storage._load_connection.cursor
cursor.execute('SELECT COUNT(*) FROM blob_chunk')
self.assertEqual(0, cursor.fetchone()[0])
class TestGenerateTIDPG(PostgreSQLAdapterMixin,
StorageCreatingMixin,
TestCase,
StorageTestBase.StorageTestBase):
# pylint:disable=too-many-ancestors
def setUp(self):
super(TestGenerateTIDPG, self).setUp()
self._storage = self._closing(self.make_storage())
def test_extract_parts(self):
unix_time = 1564063129.1277142
query = """
SELECT EXTRACT(year FROM ts) as YEAR,
EXTRACT(month FROM ts) AS month,
EXTRACT(day FROM ts) AS day,
EXTRACT(hour FROM ts) AS hour,
EXTRACT(minute FROM ts) AS minute,
EXTRACT(seconds FROM ts) AS seconds
FROM (
SELECT timezone('UTC', to_timestamp(%s)) AS ts
) t
"""
cursor = self._storage._load_connection.cursor
cursor.execute(query, (unix_time,))
year, month, day, hour, minute, seconds = cursor.fetchone()
self.assertEqual(year, 2019)
self.assertEqual(month, 7)
self.assertEqual(day, 25)
self.assertEqual(hour, 13) # If this is not 13, the time_zone is incorrect
self.assertEqual(minute, 58)
self.assertEqual(
round(float(seconds), 6),
49.127714)
def test_known_time(self):
now = 1564054182.277615
gmtime = (2019, 7, 25, 11, 29, 42, 3, 206, 0)
self.assertEqual(
time.gmtime(now),
gmtime
)
ts_now = timestamp_at_unixtime(now)
self.assertEqual(
ts_now.raw(),
b'\x03\xd1Oq\xb4bn\x00'
)
self.test_current_time(now)
def test_current_time(self, now=None):
if now is None:
now = time.time()
storage = self._storage
ts_now = timestamp_at_unixtime(now)
expected_tid_int = bytes8_to_int64(ts_now.raw())
cursor = storage._load_connection.cursor
cursor.execute('SELECT make_tid_for_epoch(%s)', (now,))
tid, = cursor.fetchall()[0]
self.assertEqual(
tid,
expected_tid_int
)
# Timing shows that we spend 6.9s opening database connections to a
# local PostgreSQL 11 server when using Python 3.7 and psycopg2 2.8
# during a total test run of 2:27. I had thought that maybe connection
# pooling would speed the test run up, but that doesn't seem to be the
# case.
class PostgreSQLTestSuiteBuilder(AbstractTestSuiteBuilder):
__name__ = 'PostgreSQL'
def __init__(self):
from relstorage.adapters.postgresql import drivers
super(PostgreSQLTestSuiteBuilder, self).__init__(
drivers,
PostgreSQLAdapterMixin,
extra_test_classes=(TestBlobFunctionality, TestGenerateTIDPG)
)
def _compute_large_blob_size(self, use_small_blobs):
if use_small_blobs:
# Avoid creating 2GB blobs to be friendly to neighbors
# and to run fast (2GB blobs take about 4 minutes on Travis
# CI as-of June 2016).
# RS 3.0 no longer needs to chunk.
large_blob_size = 20 * 1024 * 1024
else:
# Something bigger than 2GB (signed 32 bit int)
large_blob_size = (1 << 31) + 200 * 1024 * 1024
return large_blob_size
def test_suite():
return PostgreSQLTestSuiteBuilder().test_suite()
if __name__ == '__main__':
logging.basicConfig()
logging.getLogger("zc.lockfile").setLevel(logging.CRITICAL)
unittest.main(defaultTest="test_suite")