Skip to content

Commit

Permalink
test: Test for connections without login
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Apr 29, 2022
1 parent be6abc3 commit fa9d991
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
8 changes: 2 additions & 6 deletions airflow_dbt_python/hooks/dbt.py
Expand Up @@ -818,12 +818,8 @@ def get_target_from_connection(
else:
params["user"] = user

try:
conn_type = params.pop("conn_type")
except KeyError:
pass
else:
params["type"] = conn_type
conn_type = params.pop("conn_type")
params["type"] = conn_type

extra = conn.extra_dejson
if "dbname" not in extra:
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Expand Up @@ -180,7 +180,7 @@ def profiles_file(tmp_path_factory, database):

@pytest.fixture(scope="session")
def airflow_conns(database):
"""Craete Airflow connections for testing.
"""Create Airflow connections for testing.
We create them by setting AIRFLOW_CONN_{CONN_ID} env variables. Only postgres
connections are set for now as our testing database is postgres.
Expand Down
39 changes: 39 additions & 0 deletions tests/hooks/dbt/test_dbt_hook_base.py
@@ -1,5 +1,6 @@
import pytest

from airflow.exceptions import AirflowException
from airflow_dbt_python.hooks.backends import DbtLocalFsBackend
from airflow_dbt_python.hooks.dbt import DbtHook

Expand Down Expand Up @@ -114,3 +115,41 @@ def test_dbt_hook_get_target_from_connection_non_existent(conn_id):
"""Test None is returned when Airflow connections do not exist."""
hook = DbtHook()
assert hook.get_target_from_connection(conn_id) is None


@pytest.fixture
def no_user_airflow_conn(database):
"""Create an Airflow connection without a user."""
from airflow import settings
from airflow.models.connection import Connection

uri = f"postgres://{database.host}:{database.port}/public?dbname={database.dbname}"
conn_id = "dbt_test"

session = settings.Session()
existing = session.query(Connection).filter_by(conn_id=conn_id).first()
if existing is not None:
# Connections may exist from previous test run.
session.delete(existing)
session.commit()

connection = Connection(conn_id=conn_id, uri=uri)
session.add(connection)

session.commit()

yield conn_id

session.close()


def test_dbt_hook_get_target_from_empty_connection(no_user_airflow_conn, database):
"""Test fetching Airflow connections."""
hook = DbtHook()

extra_target = hook.get_target_from_connection(no_user_airflow_conn)

assert no_user_airflow_conn in extra_target
assert extra_target[no_user_airflow_conn].get("type") == "postgres"
assert extra_target[no_user_airflow_conn].get("user") is None
assert extra_target[no_user_airflow_conn]["dbname"] == database.dbname

0 comments on commit fa9d991

Please sign in to comment.