Skip to content

Commit

Permalink
tests: Create test connections via Airflow ORM
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Apr 29, 2022
1 parent 3beadbf commit b37ea17
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions tests/conftest.py
Expand Up @@ -11,6 +11,8 @@
from moto import mock_s3
from pytest_postgresql.janitor import DatabaseJanitor

from airflow import settings
from airflow.models.connection import Connection
from airflow_dbt_python.hooks.dbt import DbtHook

PROFILES = """
Expand Down Expand Up @@ -176,6 +178,41 @@ def profiles_file(tmp_path_factory, database):
return p


@pytest.fixture(scope="session")
def airflow_conns(database):
"""Craete Airflow connections for testing.
We create them by setting AIRFLOW_CONN_{CONN_ID} env variables. Only postgres
connections are set for now as our testing database is postgres.
"""
uris = (
f"postgres://{database.user}:{database.password}@{database.host}:{database.port}/public?dbname={database.dbname}",
f"postgres://{database.user}:{database.password}@{database.host}:{database.port}/public",
)
ids = (
"dbt_test_postgres_1",
database.dbname,
)
session = settings.Session()

connections = []
for conn_id, uri in zip(ids, uris):
existing = session.query(Connection).filter_by(conn_id=conn_id).first()
if existing is not None:
# Connections may exist from previous test run.
session.delete(existing)
session.commit()
connections.append(Connection(conn_id=conn_id, uri=uri))

session.add_all(connections)

session.commit()

yield ids

session.close()


@pytest.fixture(scope="session")
def dbt_project_dir(tmp_path_factory):
"""A temporary directory to store dbt test files."""
Expand Down Expand Up @@ -484,27 +521,3 @@ def test_files(tmp_path_factory, dbt_project_file):
f1.unlink()
f2.unlink()
f3.unlink()


@pytest.fixture()
def airflow_conns(database):
"""Craete Airflow connections for testing.
We create them by setting AIRFLOW_CONN_{CONN_ID} env variables. Only postgres
connections are set for now as our testing database is postgres.
"""
uris = (
f"postgres://{database.user}:{database.password}@{database.host}:{database.port}/public?dbname={database.dbname}",
f"postgres://{database.user}:{database.password}@{database.host}:{database.port}/public",
)
ids = (
"dbt_test_postgres_1",
database.dbname,
)
for conn_id, uri in zip(ids, uris):
os.environ[f"AIRFLOW_CONN_{conn_id.upper()}"] = uri

yield ids

for conn_id in ids:
os.environ.pop(f"AIRFLOW_CONN_{conn_id.upper()}")

0 comments on commit b37ea17

Please sign in to comment.