-
Notifications
You must be signed in to change notification settings - Fork 44
/
fixtures.py
216 lines (168 loc) · 7.79 KB
/
fixtures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import os
import contextlib
import pytest
import sqlalchemy as sa
from packaging import version
@pytest.fixture(scope='module')
def _db():
'''
A user-defined _db fixture is required to provide the plugin with a SQLAlchemy
Session object that can access the test database. If the user hasn't defined
that fixture, raise an error.
'''
msg = ("_db fixture not defined. The pytest-flask-sqlalchemy plugin " +
"requires you to define a _db fixture that returns a SQLAlchemy Session " +
"with access to your test database. For more information, see the plugin " +
"documentation: " +
"https://github.com/jeancochrane/pytest-flask-sqlalchemy#conftest-setup")
raise NotImplementedError(msg)
@pytest.fixture(scope='function')
def _transaction(request, _db, mocker):
'''
Create a transactional context for tests to run in.
'''
# Start a transaction
connection = _db.engine.connect()
transaction = connection.begin()
# Bind a session to the transaction. The empty `binds` dict is necessary
# when specifying a `bind` option, or else Flask-SQLAlchemy won't scope
# the connection properly
options = dict(bind=connection, binds={})
session = _db.create_scoped_session(options=options)
# Make sure the session, connection, and transaction can't be closed by accident in
# the codebase
connection.force_close = connection.close
transaction.force_rollback = transaction.rollback
connection.close = lambda: None
transaction.rollback = lambda: None
session.close = lambda: None
# Begin a nested transaction (any new transactions created in the codebase
# will be held until this outer transaction is committed or closed)
session.begin_nested()
# Each time the SAVEPOINT for the nested transaction ends, reopen it
@sa.event.listens_for(session, 'after_transaction_end')
def restart_savepoint(session, trans):
if trans.nested and not trans._parent.nested:
# ensure that state is expired the way
# session.commit() at the top level normally does
session.expire_all()
session.begin_nested()
# Force the connection to use nested transactions
connection.begin = connection.begin_nested
# If an object gets moved to the 'detached' state by a call to flush the session,
# add it back into the session (this allows us to see changes made to objects
# in the context of a test, even when the change was made elsewhere in
# the codebase)
@sa.event.listens_for(session, 'persistent_to_detached')
@sa.event.listens_for(session, 'deleted_to_detached')
def rehydrate_object(session, obj):
session.add(obj)
@request.addfinalizer
def teardown_transaction():
# Delete the session
session.remove()
# Rollback the transaction and return the connection to the pool
transaction.force_rollback()
connection.force_close()
return connection, transaction, session
@pytest.fixture(scope='function')
def _engine(pytestconfig, request, _transaction, mocker):
'''
Mock out direct access to the semi-global Engine object.
'''
connection, _, session = _transaction
# Make sure that any attempts to call `connect()` simply return a
# reference to the open connection
engine = mocker.MagicMock(spec=sa.engine.Engine)
engine.connect.return_value = connection
# Threadlocal engine strategies were deprecated in SQLAlchemy 1.3, which
# resulted in contextual_connect becoming a private method. See:
# https://docs.sqlalchemy.org/en/latest/changelog/migration_13.html
if version.parse(sa.__version__) < version.parse('1.3'):
engine.contextual_connect.return_value = connection
elif version.parse(sa.__version__) < version.parse('1.4'):
engine._contextual_connect.return_value = connection
# References to `Engine.dialect` should redirect to the Connection (this
# is primarily useful for the `autoload` flag in SQLAlchemy, which references
# the Engine dialect to reflect tables)
engine.dialect = connection.dialect
@contextlib.contextmanager
def begin():
'''
Open a new nested transaction on the `connection` object.
'''
with connection.begin_nested():
yield connection
# Force the engine object to use the current connection and transaction
engine.begin = begin
engine.execute = connection.execute
# Enforce nested transactions for raw DBAPI connections
def raw_connection():
# Start a savepoint
connection.execute('''SAVEPOINT raw_conn''')
# Preserve close/commit/rollback methods
connection.connection.force_close = connection.connection.close
connection.connection.force_commit = connection.connection.commit
connection.connection.force_rollback = connection.connection.rollback
# Prevent the connection from being closed accidentally
connection.connection.close = lambda: None
connection.connection.commit = lambda: None
connection.connection.set_isolation_level = lambda level: None
# If a rollback is initiated, return to the original savepoint
connection.connection.rollback = lambda: connection.execute('''ROLLBACK TO SAVEPOINT raw_conn''')
return connection.connection
engine.raw_connection = raw_connection
for mocked_engine in pytestconfig._mocked_engines:
mocker.patch(mocked_engine, new=engine)
session.bind = engine
@request.addfinalizer
def reset_raw_connection():
# Return the underlying connection to its original state if it has changed
if hasattr(connection.connection, 'force_rollback'):
connection.connection.commit = connection.connection.force_commit
connection.connection.rollback = connection.connection.force_rollback
connection.connection.close = connection.connection.force_close
return engine
@pytest.fixture(scope='function')
def _session(pytestconfig, _transaction, mocker):
'''
Mock out Session objects (a common way of interacting with the database using
the SQLAlchemy ORM) using a transactional context.
'''
_, _, session = _transaction
# Whenever the code tries to access a Flask session, use the Session object
# instead
for mocked_session in pytestconfig._mocked_sessions:
mocker.patch(mocked_session, new=session)
# Create a dummy class to mock out the sessionmakers
# (We need to do this as a class because we can't mock __call__ methods)
class FakeSessionMaker(sa.orm.Session):
def __call__(self):
return session
@classmethod
def configure(cls, *args, **kwargs):
pass
# Mock out the WorkerSession
for mocked_sessionmaker in pytestconfig._mocked_sessionmakers:
mocker.patch(mocked_sessionmaker, new_callable=FakeSessionMaker)
return session
@pytest.fixture(scope='function')
def db_session(_engine, _session, _transaction):
'''
Make sure all the different ways that we access the database in the code
are scoped to a transactional context, and return a Session object that
can interact with the database in the tests.
Use this fixture in tests when you would like to use the SQLAlchemy ORM
API, just as you might use a SQLAlchemy Session object.
'''
return _session
@pytest.fixture(scope='function')
def db_engine(_engine, _session, _transaction):
'''
Make sure all the different ways that we access the database in the code
are scoped to a transactional context, and return an alias for the
transactional Engine object that can interact with the database in the tests.
Use this fixture in tests when you would like to run raw SQL queries using the
SQLAlchemy Engine API.
'''
return _engine