This repository has been archived by the owner on Mar 28, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
client.py
95 lines (76 loc) · 3.1 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import contextlib
import os
import platform
import warnings
from cliquet import logger
from cliquet.storage import exceptions
from cliquet.utils import sqlalchemy
class PostgreSQLClient(object):
def __init__(self, engine):
self._engine = engine
# # Register ujson, globally for all futur cursors
# with self.connect() as cursor:
# psycopg2.extras.register_json(cursor,
# globally=True,
# loads=json.loads)
@contextlib.contextmanager
def connect(self, readonly=False):
"""Pulls a connection from the pool when context is entered and
returns it when context is exited.
A COMMIT is performed on the current transaction if everything went
well. Otherwise transaction is ROLLBACK, and everything cleaned up.
XXX: Committing should not happen here but using a global transaction
manager like `pyramid-tm`.
"""
connection = None
trans = None
try:
# Pull from pool.
connection = self._engine.connect()
if not readonly:
trans = connection.begin()
# Start context
yield connection
# Success
if not readonly:
trans.commit()
# Give back to pool.
connection.close()
except sqlalchemy.exc.SQLAlchemyError as e:
logger.error(e)
if trans:
trans.rollback()
if connection:
connection.close()
raise exceptions.BackendError(original=e)
_ENGINES = {}
def create_from_config(config, prefix=''):
"""Create a PostgreSQLClient client using settings in the provided config.
"""
if sqlalchemy is None:
message = ("PostgreSQL dependencies missing. "
"Refer to installation section in documentation.")
raise ImportWarning(message)
settings = config.get_settings().copy()
url = settings[prefix + 'url']
if url in _ENGINES:
msg = ("Reuse existing PostgreSQL connection. "
"Parameters %s* will be ignored." % prefix)
warnings.warn(msg)
return PostgreSQLClient(_ENGINES[url])
# Initialize SQLAlchemy engine.
poolclass_key = prefix + 'poolclass'
settings.setdefault(poolclass_key, 'sqlalchemy.pool.QueuePool')
settings[poolclass_key] = config.maybe_dotted(settings[poolclass_key])
settings.pop(prefix + 'max_fetch_size', None)
# There seems to be a problem with the pool implementation using PyPy.
# XXX: Disable pooling at least during tests to avoid stalled tests.
pypy_on_travis = (platform.python_implementation().lower() == 'pypy' and
os.getenv('TRAVIS', False))
if pypy_on_travis:
warnings.warn('Disable pooling with PyPy on TravisCI')
settings = dict([(poolclass_key, sqlalchemy.pool.StaticPool)])
engine = sqlalchemy.engine_from_config(settings, prefix=prefix, url=url)
# Store one engine per URI.
_ENGINES[url] = engine
return PostgreSQLClient(engine)