Skip to content

Commit

Permalink
fix: async engine bug related to engine reuse in a different async lo…
Browse files Browse the repository at this point in the history
…op. (#166)
  • Loading branch information
DanCardin committed Sep 16, 2022
1 parent 3292520 commit 3254bd8
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 157 deletions.
223 changes: 125 additions & 98 deletions CHANGELOG.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/source/relational/ordered-actions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ library know that the included SQL statements are safe to "cache" in order to
reduce database creation costs. For that reason, you should prefer
a ``StaticStatements`` over a ``Statements`` where possible.

For example, the creation of temp tables or other transaction-specific operations,
are places where a static statement might be inappropriate.
For example, the execution of DDL for which there is not a supported SQLALchemy abstraction, or
other transaction-specific operations, are places where a static statement might be inappropriate.

.. code-block:: python
:caption: tests/test_something.py
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pytest-mock-resources"
version = "2.5.0"
version = "2.5.1"
description = "A pytest plugin for easily instantiating reproducible mock resources."
authors = [
"Omar Khan <oakhan3@gmail.com>",
Expand Down
20 changes: 18 additions & 2 deletions src/pytest_mock_resources/fixture/database/relational/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class EngineManager:
session: Union[bool, Session] = False
default_schema: Optional[str] = None
static_actions: Iterable[StaticAction] = ()
actions_share_transaction: Optional[bool] = None

_ddl_created: Dict[MetaData, bool] = field(default_factory=dict)

Expand Down Expand Up @@ -131,13 +132,21 @@ def manage_sync(self):
try:
self.run_actions(session)
commit(session)

if self.actions_share_transaction is False:
self.engine.dispose()

yield session
finally:
session.close()
else:
with self.engine.begin() as conn:
self.run_actions(conn)
commit(conn)

if self.actions_share_transaction is False:
self.engine.dispose()

yield self.engine

finally:
Expand All @@ -160,12 +169,19 @@ async def manage_async(self, session=None):

async with session_factory(bind=engine) as session:
await session.run_sync(self.run_actions)
await session.commit()
if not self.actions_share_transaction:
await session.commit()
await session.close()

yield session
else:
async with engine.begin() as conn:
await conn.run_sync(self.run_actions)
await conn.execute(text("COMMIT"))
await conn.commit()

if not self.actions_share_transaction:
await engine.dispose()

yield engine
finally:
await engine.dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def create_postgres_fixture(
createdb_template="template1",
engine_kwargs=None,
template_database=True,
actions_share_transaction=None,
):
"""Produce a Postgres fixture.
Expand All @@ -73,6 +74,13 @@ def create_postgres_fixture(
template_database: Defaults to True. When True, amortizes the cost of performing database
setup through `ordered_actions`, by performing them once into a postgres "template"
database, then creating all subsequent per-test databases from that template.
actions_share_transaction: When True, the transaction used by `ordered_actions` context
will be the same as the one handed to the test function. This is required in order
to support certain usages of `ordered_actions, such as the creation of temp tables
through a `Statements` object. By default, this behavior is enabled for synchronous
fixtures for backwards compatibility; and disabled by default for
asynchronous fixtures (the way v2-style/async features work in SQLAlchemy can lead
to bad default behavior).
"""
fixture_id = generate_fixture_id(enabled=template_database, name="pg")

Expand All @@ -90,6 +98,7 @@ def _create_engine_manager(config):
engine_kwargs=engine_kwargs or {},
session=session,
fixture_id=fixture_id,
actions_share_transaction=actions_share_transaction,
)

@pytest.fixture(scope=scope)
Expand Down Expand Up @@ -125,6 +134,7 @@ def create_engine_manager(
engine_kwargs,
createdb_template="template1",
fixture_id=None,
actions_share_transaction=None,
):
normalized_actions = normalize_actions(ordered_actions)
static_actions, dynamic_actions = bifurcate_actions(normalized_actions)
Expand Down Expand Up @@ -180,6 +190,7 @@ def create_engine_manager(
session=session,
tables=tables,
default_schema="public",
actions_share_transaction=actions_share_transaction,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def create_redshift_fixture(
createdb_template="template1",
engine_kwargs=None,
template_database=True,
actions_share_transaction=None,
):
"""Produce a Redshift fixture.
Expand All @@ -59,6 +60,13 @@ def create_redshift_fixture(
template_database: Defaults to True. When True, amortizes the cost of performing database
setup through `ordered_actions`, by performing them once into a postgres "template"
database, then creating all subsequent per-test databases from that template.
actions_share_transaction: When True, the transaction used by `ordered_actions` context
will be the same as the one handed to the test function. This is required in order
to support certain usages of `ordered_actions, such as the creation of temp tables
through a `Statements` object. By default, this behavior is enabled for synchronous
fixtures for backwards compatibility; and disabled by default for
asynchronous fixtures (the way v2-style/async features work in SQLAlchemy can lead
to bad default behavior).
"""

from pytest_mock_resources.fixture.database.relational.redshift.udf import REDSHIFT_UDFS
Expand All @@ -81,6 +89,7 @@ def _create_engine_manager(config):
engine_kwargs=engine_kwargs or {},
session=session,
fixture_id=fixture_id,
actions_share_transaction=actions_share_transaction,
)

@pytest.fixture(scope=scope)
Expand Down
9 changes: 9 additions & 0 deletions tests/fixture/database/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,12 @@ def test_bad_actions(postgres):
async def test_basic_postgres_fixture_async(postgres_async):
async with postgres_async.connect() as conn:
await conn.execute(text("select 1"))


@skip_if_not_sqlalchemy2
def test_engine_reuse(postgres_async, event_loop):
async def execute(async_engine):
async with async_engine.connect() as conn:
await conn.execute(text("select 1"))

event_loop.run_until_complete(execute(postgres_async))
167 changes: 113 additions & 54 deletions tests/fixture/database/test_ordered_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest
from sqlalchemy import Column, ForeignKey, Integer, String, text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import relationship

from pytest_mock_resources import create_postgres_fixture, Rows, Statements
Expand Down Expand Up @@ -33,43 +34,87 @@ class Object(Base):


rows = Rows(User(name="Harold"), User(name="Gump"))
additional_rows = Rows(User(name="Perrier"), User(name="Mug"))

row_dependant_statements = Statements(
"CREATE TEMP TABLE user1 as SELECT DISTINCT CONCAT(name, 1) as name FROM stuffs.user"
)

additional_rows = Rows(User(name="Perrier"), User(name="Mug"))
class Test_non_shared_transaction_session:
"""Assert the temp table is not accessible when not using the shared transaction option."""

temp_table = Statements(
"CREATE TEMP TABLE user1 as SELECT DISTINCT CONCAT(name, 1) as name FROM stuffs.user"
)

non_shared_transaction_session = create_postgres_fixture(
rows, temp_table, additional_rows, session=True, actions_share_transaction=False
)

def session_function(session):
session.add(User(name="Fake Name", objects=[Object(name="Boots")]))
@pytest.mark.postgres
def test_session_is_not_shared(self, non_shared_transaction_session):
with pytest.raises(ProgrammingError):
non_shared_transaction_session.execute(text("SELECT * FROM user1"))


postgres_ordered_actions = create_postgres_fixture(
rows, row_dependant_statements, additional_rows, session=True
)
class Test_non_shared_transaction_engine:
"""Assert the temp table is not accessible when not using the shared transaction option."""

postgres_session_function = create_postgres_fixture(Base, session_function, session=True)
temp_table = Statements(
"CREATE TEMP TABLE user1 as SELECT DISTINCT CONCAT(name, 1) as name FROM stuffs.user"
)

postgres = create_postgres_fixture(
rows, temp_table, additional_rows, actions_share_transaction=False
)

# Run the test 5 times to ensure fixture is stateless
@pytest.mark.parametrize("run", range(5))
def test_ordered_actions(postgres_ordered_actions, run):
execute = postgres_ordered_actions.execute(text("SELECT * FROM user1"))
result = sorted([row[0] for row in execute])
assert ["Gump1", "Harold1"] == result
@pytest.mark.postgres
def test_engine_is_not_shared(self, postgres):
with pytest.raises(ProgrammingError):
with postgres.begin() as conn:
conn.execute(text("SELECT * FROM user1"))


# Run the test 5 times to ensure fixture is stateless
@pytest.mark.parametrize("run", range(5))
def test_session_function(postgres_session_function, run):
execute = postgres_session_function.execute(text("SELECT * FROM stuffs.object"))
owner_id = sorted([row[2] for row in execute])[0]
execute = postgres_session_function.execute(
text("SELECT * FROM stuffs.user where id = {id}".format(id=owner_id))
class Test_ordered_actions:
"""Assert a temp table created in a Statements is accessible to a session based fixture.
Run the test 5 times to ensure fixture is stateless
"""

temp_table = Statements(
"CREATE TEMP TABLE user1 as SELECT DISTINCT CONCAT(name, 1) as name FROM stuffs.user"
)

postgres_ordered_actions = create_postgres_fixture(
rows, temp_table, additional_rows, session=True
)
result = [row[1] for row in execute]
assert result == ["Fake Name"]

@pytest.mark.parametrize("run", range(5))
@pytest.mark.postgres
def test_ordered_actions(self, postgres_ordered_actions, run):
execute = postgres_ordered_actions.execute(text("SELECT * FROM user1"))
result = sorted([row[0] for row in execute])
assert ["Gump1", "Harold1"] == result


class Test_postgres_session_function:
"""Assert a "function" action which operates on a session is handed a session.
Run the test 5 times to ensure fixture is stateless
"""

def session_function(session):
session.add(User(name="Fake Name", objects=[Object(name="Boots")]))

postgres_session_function = create_postgres_fixture(Base, session_function, session=True)

@pytest.mark.parametrize("run", range(5))
@pytest.mark.postgres
def test_session_function(self, postgres_session_function, run):
execute = postgres_session_function.execute(text("SELECT * FROM stuffs.object"))
owner_id = sorted([row[2] for row in execute])[0]
execute = postgres_session_function.execute(
text("SELECT * FROM stuffs.user where id = {id}".format(id=owner_id))
)
result = [row[1] for row in execute]
assert result == ["Fake Name"]


postgres_metadata_only = create_postgres_fixture(Base.metadata, session=True)
Expand All @@ -82,44 +127,58 @@ def test_metadata_only(postgres_metadata_only):
assert [] == result


postgres_ordered_actions_async = create_postgres_fixture(
rows, row_dependant_statements, additional_rows, async_=True
)

class Test_postgres_ordered_actions_async:
temp_table = Statements(
"CREATE TEMP TABLE user1 as SELECT DISTINCT CONCAT(name, 1) as name FROM stuffs.user"
)

def async_session_function(session):
session.add(User(name="Fake Name", objects=[Object(name="Boots")]))
postgres_ordered_actions_async = create_postgres_fixture(
rows,
temp_table,
additional_rows,
async_=True,
actions_share_transaction=True,
)

@pytest.mark.postgres
@pytest.mark.asyncio
@pytest.mark.parametrize("run", range(5))
@skip_if_not_sqlalchemy2
async def test_ordered_actions_aysnc_shares_transaction(
self, postgres_ordered_actions_async, run
):
async with postgres_ordered_actions_async.begin() as conn:
execute = await conn.execute(text("SELECT * FROM user1"))

postgres_session_function_async = create_postgres_fixture(
Base, async_session_function, async_=True, session=True
)
result = sorted([row[0] for row in execute])
assert ["Gump1", "Harold1"] == result


# Run the test 5 times to ensure fixture is stateless
@pytest.mark.asyncio
@pytest.mark.parametrize("run", range(5))
@skip_if_not_sqlalchemy2
async def test_ordered_actions_aysnc_shares_transaction(postgres_ordered_actions_async, run):
async with postgres_ordered_actions_async.begin() as conn:
execute = await conn.execute(text("SELECT * FROM user1"))
class Test_session_function_async:
"""
result = sorted([row[0] for row in execute])
assert ["Gump1", "Harold1"] == result
Run the test more than once (i.e. 5 times) to ensure fixture is stateless.
"""

def async_session_function(session):
session.add(User(name="Fake Name", objects=[Object(name="Boots")]))

# Run the test 5 times to ensure fixture is stateless
@pytest.mark.asyncio
@pytest.mark.parametrize("run", range(5))
@skip_if_not_sqlalchemy2
async def test_session_function_async(postgres_session_function_async, run):
execute = await postgres_session_function_async.execute(text("SELECT * FROM stuffs.object"))
owner_id = sorted([row[2] for row in execute])[0]
execute = await postgres_session_function_async.execute(
text("SELECT * FROM stuffs.user where id = {id}".format(id=owner_id))
postgres_session_function_async = create_postgres_fixture(
Base, async_session_function, async_=True, session=True
)
result = [row[1] for row in execute]
assert result == ["Fake Name"]

@pytest.mark.asyncio
@pytest.mark.postgres
@pytest.mark.parametrize("run", range(5))
@skip_if_not_sqlalchemy2
async def test_session_function_async(self, postgres_session_function_async, run):
execute = await postgres_session_function_async.execute(text("SELECT * FROM stuffs.object"))
owner_id = sorted([row[2] for row in execute])[0]
execute = await postgres_session_function_async.execute(
text("SELECT * FROM stuffs.user where id = {id}".format(id=owner_id))
)
result = [row[1] for row in execute]
assert result == ["Fake Name"]


engine_function = create_postgres_fixture(
Expand Down

0 comments on commit 3254bd8

Please sign in to comment.