Skip to content

Commit

Permalink
Run tests for Providers also for Airflow 2.8
Browse files Browse the repository at this point in the history
This is a follow-up on apache#39513 to add support for running Provider
tests against Airlfow 2.8 installed from PyPI.

This change includes:

* simplifying the way how we specify provider exclusions in tests
* update to the unit test documentation describing testing
* updating to latest pytest tooling in case older airflow version
  is installed (pulling the correct versions and correct packages
  for pytest extensions)
* implementing 2.8 compatibility for the conftest/test common code
* implementing 2.8 compatibility for provider tests that relied
  on 2.9+ behaviours
  • Loading branch information
potiuk committed May 23, 2024
1 parent 482c0be commit 7adad37
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 171 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/check-providers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,10 @@ jobs:
Remove incompatible Airflow
${{ matrix.airflow-version }}:Python ${{ matrix.python-version }} provider packages
run: |
rm -vf ${{ matrix.remove-providers }}
working-directory: ./dist
for provider in ${{ matrix.remove-providers }}; do
echo "Removing incompatible provider: ${provider}"
rm -vf dist/apache_airflow_providers_${provider/./_}*
done
if: matrix.remove-providers != ''
- name: "Download airflow package: wheel"
run: |
Expand Down
6 changes: 6 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,12 @@ function determine_airflow_to_use() {
python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
# Some packages might leave legacy typing module which causes test issues
pip uninstall -y typing || true
# Upgrade pytest and pytest extensions to latest version if they have been accidentally
# downgraded by constraints
pip install --upgrade pytest pytest aiofiles aioresponses pytest-asyncio pytest-custom-exit-code \
pytest-icdiff pytest-instafail pytest-mock pytest-rerunfailures pytest-timeouts \
pytest-xdist pytest requests_mock time-machine \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-main/constraints-${PYTHON_MAJOR_MINOR_VERSION}.txt
fi

if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && "${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
Expand Down
13 changes: 8 additions & 5 deletions contributing-docs/testing/unit_tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ if the providers still work when installed for older airflow versions.

.. note::

For now it's done for 2.9.1 version only.
For now it's done for 2.9.1 and 2.8.4 version only.

Those tests can be used to test compatibility of the providers with past and future releases of airflow.
For example it could be used to run latest provider versions with released or main
Expand Down Expand Up @@ -1148,7 +1148,11 @@ This can be reproduced locally building providers from tag/commit of the airflow
breeze release-management generate-constraints --airflow-constraints-mode constraints-source-providers --answer yes
3. Enter breeze environment, installing selected airflow version and the provider packages prepared from main
4. Remove providers that are not compatible with Airflow version installed by default. You can look up
the incompatible providers in the ``BASE_PROVIDERS_COMPATIBILITY_CHECKS`` constant in the
``./dev/breeze/src/airflow_breeze/global_constants.py`` file.

5. Enter breeze environment, installing selected airflow version and the provider packages prepared from main

.. code-block::bash
Expand All @@ -1158,13 +1162,13 @@ This can be reproduced locally building providers from tag/commit of the airflow
--providers-skip-constraints \
--mount-sources tests
4. You can then run tests as usual:
6. You can then run tests as usual:

.. code-block::bash
pytest tests/providers/<provider>/test.py
5. Iterate with the tests
7. Iterate with the tests

The tests are run using:

Expand All @@ -1182,7 +1186,6 @@ Rebuilding single provider package can be done using this command:
breeze release-management prepare-provider-packages \
--version-suffix-for-pypi dev0 --package-format wheel <provider>
Note that some of the tests if written without taking care about the compatibility, might not work with older
versions of Airflow - this is because of refactorings, renames, and tests relying on internals of Airflow that
are not part of the public API. We deal with it in one of the following ways:
Expand Down
78 changes: 39 additions & 39 deletions dev/breeze/doc/images/output_testing_db-tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_testing_db-tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
31bee62efc24fa61aa868a0643e0db6b
17d0216889e996fe5fd813e0f1c76af6
72 changes: 36 additions & 36 deletions dev/breeze/doc/images/output_testing_non-db-tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_testing_non-db-tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
543f9814b475e511749fdebf29d16298
7f335b6d8225b8fb373b698c38bb86cf
92 changes: 46 additions & 46 deletions dev/breeze/doc/images/output_testing_tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_testing_tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3c3217a7eceaa77718af4876622e1b0f
68d630703517818c928daf2afe847a79
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def _verify_parallelism_parameters(
)
option_skip_providers = click.option(
"--skip-providers",
help="Coma separated list of providers to skip when running tests",
help="Space-separated list of provider ids to skip when running tests",
type=str,
default="",
envvar="SKIP_PROVIDERS",
Expand Down Expand Up @@ -749,7 +749,7 @@ def _run_test_command(
if skip_providers:
ignored_path_list = [
f"--ignore=tests/providers/{provider_id.replace('.','/')}"
for provider_id in skip_providers.split(",")
for provider_id in skip_providers.split(" ")
]
extra_pytest_args = (*extra_pytest_args, *ignored_path_list)
if run_in_parallel:
Expand Down
19 changes: 6 additions & 13 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import Iterable

from airflow_breeze.utils.host_info_utils import Architecture
from airflow_breeze.utils.path_utils import AIRFLOW_SOURCES_ROOT
Expand Down Expand Up @@ -465,29 +464,23 @@ def get_airflow_extras():
CHICKEN_EGG_PROVIDERS = " ".join([])


def _exclusion(providers: Iterable[str]) -> str:
return " ".join(
[f"apache_airflow_providers_{provider.replace('.', '_').replace('-','_')}*" for provider in providers]
)


BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str]] = [
BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str | list[str]]] = [
{
"python-version": "3.8",
"airflow-version": "2.7.1",
"remove-providers": _exclusion(["common.io", "fab"]),
"remove-providers": "common.io fab",
"run-tests": "false",
},
{
"python-version": "3.8",
"airflow-version": "2.8.0",
"remove-providers": _exclusion(["fab"]),
"run-tests": "false",
"airflow-version": "2.8.4",
"remove-providers": "fab",
"run-tests": "true",
},
{
"python-version": "3.8",
"airflow-version": "2.9.1",
"remove-providers": _exclusion([]),
"remove-providers": "",
"run-tests": "true",
},
]
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ combine-as-imports = true
"airflow/api/auth/backend/kerberos_auth.py" = ["E402"]
"airflow/security/kerberos.py" = ["E402"]
"airflow/security/utils.py" = ["E402"]
"tests/providers/common/io/xcom/test_backend.py" = ["E402"]
"tests/providers/elasticsearch/log/elasticmock/__init__.py" = ["E402"]
"tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py" = ["E402"]
"tests/providers/google/cloud/hooks/vertex_ai/test_batch_prediction_job.py" = ["E402"]
Expand Down
6 changes: 6 additions & 0 deletions scripts/docker/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ function determine_airflow_to_use() {
python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
# Some packages might leave legacy typing module which causes test issues
pip uninstall -y typing || true
# Upgrade pytest and pytest extensions to latest version if they have been accidentally
# downgraded by constraints
pip install --upgrade pytest pytest aiofiles aioresponses pytest-asyncio pytest-custom-exit-code \
pytest-icdiff pytest-instafail pytest-mock pytest-rerunfailures pytest-timeouts \
pytest-xdist pytest requests_mock time-machine \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-main/constraints-${PYTHON_MAJOR_MINOR_VERSION}.txt
fi

if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && "${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
Expand Down
17 changes: 14 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,10 +1005,14 @@ def create_dag(
with_dagrun_type=DagRunType.SCHEDULED,
**kwargs,
):
op_kwargs = {}
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

if AIRFLOW_V_2_9_PLUS:
op_kwargs["task_display_name"] = task_display_name
with dag_maker(dag_id, **kwargs) as dag:
op = EmptyOperator(
task_id=task_id,
task_display_name=task_display_name,
max_active_tis_per_dag=max_active_tis_per_dag,
max_active_tis_per_dagrun=max_active_tis_per_dagrun,
executor_config=executor_config or {},
Expand All @@ -1019,6 +1023,7 @@ def create_dag(
email=email,
pool=pool,
trigger_rule=trigger_rule,
**op_kwargs,
)
if with_dagrun_type is not None:
dag_maker.create_dagrun(run_type=with_dagrun_type)
Expand Down Expand Up @@ -1170,11 +1175,17 @@ def reset_logging_config():
def suppress_info_logs_for_dag_and_fab():
import logging

from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

dag_logger = logging.getLogger("airflow.models.dag")
dag_logger.setLevel(logging.WARNING)

fab_logger = logging.getLogger("airflow.providers.fab.auth_manager.security_manager.override")
fab_logger.setLevel(logging.WARNING)
if AIRFLOW_V_2_9_PLUS:
fab_logger = logging.getLogger("airflow.providers.fab.auth_manager.security_manager.override")
fab_logger.setLevel(logging.WARNING)
else:
fab_logger = logging.getLogger("airflow.www.fab_security")
fab_logger.setLevel(logging.WARNING)


@pytest.fixture(scope="module", autouse=True)
Expand Down
1 change: 0 additions & 1 deletion tests/providers/common/io/operators/test_file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def test_get_openlineage_facets_on_start():

expected_input = Dataset(namespace=f"s3://{src_bucket}", name=src_key)
expected_output = Dataset(namespace=f"s3://{dst_bucket}", name=dst_key)

op = FileTransferOperator(
task_id="test",
src=f"s3://{src_bucket}/{src_key}",
Expand Down
17 changes: 14 additions & 3 deletions tests/providers/common/io/xcom/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@

import pytest

from airflow.exceptions import AirflowOptionalProviderFeatureException
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

pytestmark = [
pytest.mark.db_test,
pytest.mark.skipif(not AIRFLOW_V_2_9_PLUS, reason="Tests for Airflow 2.9.0+ only"),
]


import airflow.models.xcom
from airflow.models.xcom import BaseXCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend

try:
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
except AirflowOptionalProviderFeatureException:
pass
from airflow.utils import timezone
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils import db
from tests.test_utils.config import conf_vars

pytestmark = pytest.mark.db_test


@pytest.fixture(autouse=True)
def reset_db():
Expand Down
38 changes: 23 additions & 15 deletions tests/providers/google/cloud/log/test_stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
from __future__ import annotations

import logging
from contextlib import nullcontext
from unittest import mock
from urllib.parse import parse_qs, urlsplit

import pytest
from google.cloud.logging import Resource
from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse, LogEntry

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.providers.google.cloud.log.stackdriver_task_handler import StackdriverTaskHandler
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs

Expand Down Expand Up @@ -81,21 +84,26 @@ def test_should_use_configured_log_name(mock_client, mock_get_creds_and_project_
mock_get_creds_and_project_id.return_value = ("creds", "project_id")

try:
with conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_base_log_folder"): "stackdriver://host/path",
}
):
importlib.reload(airflow_local_settings)
settings.configure_logging()

logger = logging.getLogger("airflow.task")
handler = logger.handlers[0]
assert isinstance(handler, StackdriverTaskHandler)
with mock.patch.object(handler, "transport_type") as transport_type_mock:
logger.error("foo")
transport_type_mock.assert_called_once_with(mock_client.return_value, "path")
# this is needed for Airflow 2.8 and below where default settings are triggering warning on
# extra "name" in the configuration of stackdriver handler. As of Airflow 2.9 this warning is not
# emitted.
context_manager = nullcontext() if AIRFLOW_V_2_9_PLUS else pytest.warns(RemovedInAirflow3Warning)
with context_manager:
with conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_base_log_folder"): "stackdriver://host/path",
}
):
importlib.reload(airflow_local_settings)
settings.configure_logging()

logger = logging.getLogger("airflow.task")
handler = logger.handlers[0]
assert isinstance(handler, StackdriverTaskHandler)
with mock.patch.object(handler, "transport_type") as transport_type_mock:
logger.error("foo")
transport_type_mock.assert_called_once_with(mock_client.return_value, "path")
finally:
importlib.reload(airflow_local_settings)
settings.configure_logging()
Expand Down
8 changes: 6 additions & 2 deletions tests/providers/smtp/notifications/test_smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance):
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="DAG dag - Task op - Run ID test in State None",
html_content=f"""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Run ID:</td>\n <td>test</td>\n </tr>\n <tr>\n <td>Try:</td>\n <td>{NUM_TRY} of 1</td>\n </tr>\n <tr>\n <td>Task State:</td>\n <td>None</td>\n </tr>\n <tr>\n <td>Host:</td>\n <td></td>\n </tr>\n <tr>\n <td>Log Link:</td>\n <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n </tr>\n <tr>\n <td>Mark Success Link:</td>\n <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n </tr>\n \n </table>\n</body>\n</html>""",
html_content=mock.ANY,
smtp_conn_id="smtp_default",
files=None,
cc=None,
Expand All @@ -142,6 +142,8 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance):
mime_charset="utf-8",
custom_headers=None,
)
content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"]
assert f"{NUM_TRY} of 1" in content

@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
Expand All @@ -163,7 +165,7 @@ def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="SLA Missed for DAG test_notifier - Task op",
html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Dag:</td>\n <td>test_notifier</td>\n </tr>\n <tr>\n <td>Task List:</td>\n <td>[]</td>\n </tr>\n <tr>\n <td>Blocking Task List:</td>\n <td>[]</td>\n </tr>\n <tr>\n <td>SLAs:</td>\n <td>[(\'test_notifier\', \'op\', \'2018-01-01T00:00:00+00:00\')]</td>\n </tr>\n <tr>\n <td>Blocking TI\'s</td>\n <td>[]</td>\n </tr>\n \n </table>\n</body>\n</html>""",
html_content=mock.ANY,
smtp_conn_id="smtp_default",
files=None,
cc=None,
Expand All @@ -172,6 +174,8 @@ def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
mime_charset="utf-8",
custom_headers=None,
)
content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"]
assert "Task List:" in content

@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance):
Expand Down
11 changes: 10 additions & 1 deletion tests/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@
TaskOutletDatasetReference,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role

try:
from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role
except ImportError:
# Handle Pre-airflow 2.9 case where FAB was part of the core airflow
from airflow.auth.managers.fab.models import ( # type: ignore[no-redef]
Permission,
Resource,
assoc_permission_role,
)
from airflow.security.permissions import RESOURCE_DAG_PREFIX
from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections, reflect_tables
from airflow.utils.session import create_session
Expand Down

0 comments on commit 7adad37

Please sign in to comment.