Skip to content

Commit

Permalink
Resolve PT012 in google provider tests (apache#38471)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahar1 authored and Mathia Haure-Touze committed Apr 4, 2024
1 parent bb7892d commit 9477c16
Show file tree
Hide file tree
Showing 24 changed files with 635 additions and 883 deletions.
23 changes: 0 additions & 23 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -385,29 +385,6 @@ combine-as-imports = true
"tests/providers/databricks/operators/test_databricks.py" = ["PT012"]
"tests/providers/databricks/operators/test_databricks_repos.py" = ["PT012"]
"tests/providers/databricks/sensors/test_databricks_partition.py" = ["PT012"]
"tests/providers/google/cloud/hooks/test_bigquery.py" = ["PT012"]
"tests/providers/google/cloud/hooks/test_cloud_storage_transfer_service.py" = ["PT012"]
"tests/providers/google/cloud/hooks/test_dataflow.py" = ["PT012"]
"tests/providers/google/cloud/hooks/test_dataprep.py" = ["PT012"]
"tests/providers/google/cloud/hooks/test_kubernetes_engine.py" = ["PT012"]
"tests/providers/google/cloud/hooks/test_pubsub.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_bigtable.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_cloud_sql.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_compute.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_datafusion.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_dataproc.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_functions.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_kubernetes_engine.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_mlengine.py" = ["PT012"]
"tests/providers/google/cloud/operators/test_spanner.py" = ["PT012"]
"tests/providers/google/cloud/sensors/test_datafusion.py" = ["PT012"]
"tests/providers/google/cloud/sensors/test_dataproc.py" = ["PT012"]
"tests/providers/google/cloud/sensors/test_gcs.py" = ["PT012"]
"tests/providers/google/cloud/sensors/test_pubsub.py" = ["PT012"]
"tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py" = ["PT012"]
"tests/providers/google/cloud/utils/test_credentials_provider.py" = ["PT012"]
"tests/providers/google/common/hooks/test_base_google.py" = ["PT012"]
"tests/providers/sftp/hooks/test_sftp.py" = ["PT012"]
"tests/providers/sftp/operators/test_sftp.py" = ["PT012"]
"tests/providers/sftp/sensors/test_sftp.py" = ["PT012"]
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ def test_validate_value(self):
assert _validate_value("case_2", 0, int) is None

def test_duplication_check(self):
key_one = True
with pytest.raises(
ValueError,
match=r"Values of key_one param are duplicated. api_resource_configs contained key_one param in"
r" `query` config and key_one was also provided with arg to run_query\(\) method. "
r"Please remove duplicates.",
):
key_one = True
_api_resource_configs_duplication_check("key_one", key_one, {"key_one": False})
assert _api_resource_configs_duplication_check("key_one", key_one, {"key_one": True}) is None

Expand All @@ -367,11 +367,11 @@ def test_validate_src_fmt_configs(self):
valid_configs = ["test_config_known", "compatibility_val"]
backward_compatibility_configs = {"compatibility_val": "val"}

src_fmt_configs = {"test_config_unknown": "val"}
with pytest.raises(
ValueError, match="test_config_unknown is not a valid src_fmt_configs for type test_format."
):
# This config should raise a value error.
src_fmt_configs = {"test_config_unknown": "val"}
_validate_src_fmt_configs(
source_format, src_fmt_configs, valid_configs, backward_compatibility_configs
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ def test_pass_name_on_create_job(
httplib2.Response({"status": TEST_HTTP_ERR_CODE}), TEST_HTTP_ERR_CONTENT
)

get_transfer_job.return_value = TEST_RESULT_STATUS_DELETED
with pytest.raises(HttpError):
# check status DELETED generates new job name
get_transfer_job.return_value = TEST_RESULT_STATUS_DELETED
self.gct_hook.create_transfer_job(body=body)

# check status DISABLED changes to status ENABLED
Expand Down Expand Up @@ -436,7 +436,7 @@ def test_wait_for_transfer_job_failed(self, mock_get_conn, mock_sleep, mock_proj

with pytest.raises(AirflowException):
self.gct_hook.wait_for_transfer_job({PROJECT_ID: TEST_PROJECT_ID, NAME: "transferJobs/test-job"})
assert list_method.called
assert list_method.called

@mock.patch(
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id",
Expand Down
63 changes: 42 additions & 21 deletions tests/providers/google/cloud/hooks/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
from __future__ import annotations

import copy
import logging
import re
import shlex
import subprocess
from asyncio import Future
from typing import Any
from unittest import mock
Expand Down Expand Up @@ -1904,28 +1906,47 @@ def callback(job_id):

@mock.patch("subprocess.Popen")
@mock.patch("select.select")
def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen):
mock_logging = MagicMock()
mock_logging.info = MagicMock()
mock_logging.warning = MagicMock()
mock_proc = MagicMock()
mock_proc.stderr = MagicMock()
mock_proc.stderr.readlines = MagicMock(return_value=["test\n", "error\n"])
mock_stderr_fd = MagicMock()
mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)
mock_proc_poll = MagicMock()
mock_select.return_value = [[mock_stderr_fd]]

def poll_resp_error():
mock_proc.return_code = 1
return True

mock_proc_poll.side_effect = [None, poll_resp_error]
mock_proc.poll = mock_proc_poll
def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, caplog):
fake_logger = logging.getLogger("fake-logger")

cmd = ["test", "cmd"]
mock_proc = MagicMock(name="FakeProc")
fake_stderr_fd = MagicMock(name="FakeStderr")
fake_stdout_fd = MagicMock(name="FakeStdout")

mock_proc.stderr = fake_stderr_fd
mock_proc.stdout = fake_stdout_fd
fake_stderr_fd.readline.side_effect = [
b"test-stderr",
StopIteration,
b"error-stderr",
StopIteration,
b"other-stderr",
]
fake_stdout_fd.readline.side_effect = [b"test-stdout", StopIteration]
mock_select.side_effect = [
([fake_stderr_fd], None, None),
(None, None, None),
([fake_stderr_fd], None, None),
]
mock_proc.poll.side_effect = [None, True]
mock_proc.returncode = 1
mock_popen.return_value = mock_proc
with pytest.raises(Exception):
run_beam_command(cmd=["test", "cmd"], log=mock_logging)
mock_logging.info.assert_called_once_with("Running command: %s", "test cmd")

with pytest.raises(AirflowException, match="Apache Beam process failed with return code 1"):
run_beam_command(cmd=["test", "cmd"], log=fake_logger)

mock_popen.assert_called_once_with(
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, cwd=None
)
info_messages = [rt[2] for rt in caplog.record_tuples if rt[0] == "fake-logger" and rt[1] == 20]
assert "Running command: test cmd" in info_messages
assert "test-stdout" in info_messages

warn_messages = [rt[2] for rt in caplog.record_tuples if rt[0] == "fake-logger" and rt[1] == 30]
assert "test-stderr" in warn_messages
assert "error-stderr" in warn_messages
assert "other-stderr" in warn_messages


@pytest.fixture
Expand Down
26 changes: 13 additions & 13 deletions tests/providers/google/cloud/hooks/test_dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def test_get_jobs_for_job_group_should_retry_after_four_errors(self, mock_get_re
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_get_jobs_for_job_group_raise_error_after_five_calls(self, mock_get_request):
self.hook.get_jobs_for_job_group.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.get_jobs_for_job_group.retry.sleep = mock.Mock()
self.hook.get_jobs_for_job_group(JOB_ID)
assert "HTTPError" in str(ctx.value)
assert mock_get_request.call_count == 5
Expand Down Expand Up @@ -176,8 +176,8 @@ def test_get_job_group_should_retry_after_four_errors(self, mock_get_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_get_job_group_raise_error_after_five_calls(self, mock_get_request):
self.hook.get_job_group.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.get_job_group.retry.sleep = mock.Mock()
self.hook.get_job_group(JOB_ID, EMBED, INCLUDE_DELETED)
assert "HTTPError" in str(ctx.value)
assert mock_get_request.call_count == 5
Expand Down Expand Up @@ -231,8 +231,8 @@ def test_run_job_group_should_retry_after_four_errors(self, mock_get_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_run_job_group_raise_error_after_five_calls(self, mock_get_request):
self.hook.run_job_group.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.run_job_group.retry.sleep = mock.Mock()
self.hook.run_job_group(body_request=DATA)
assert "HTTPError" in str(ctx.value)
assert mock_get_request.call_count == 5
Expand Down Expand Up @@ -285,8 +285,8 @@ def test_get_job_group_status_four_errors(self, mock_get_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_get_job_group_status_five_calls(self, mock_get_request):
self.hook.get_job_group_status.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.get_job_group_status.retry.sleep = mock.Mock()
self.hook.get_job_group_status(job_group_id=JOB_ID)
assert "HTTPError" in str(ctx.value)
assert mock_get_request.call_count == 5
Expand Down Expand Up @@ -353,8 +353,8 @@ def test_create_imported_dataset_four_errors(self, mock_post_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_create_imported_dataset_five_calls(self, mock_post_request):
self.hook.create_imported_dataset.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.create_imported_dataset.retry.sleep = mock.Mock()
self.hook.create_imported_dataset(body_request=self._create_imported_dataset_body_request)
assert "HTTPError" in str(ctx.value)
assert mock_post_request.call_count == 5
Expand Down Expand Up @@ -408,8 +408,8 @@ def test_create_wrangled_dataset_four_errors(self, mock_post_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_create_wrangled_dataset_five_calls(self, mock_post_request):
self.hook.create_wrangled_dataset.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.create_wrangled_dataset.retry.sleep = mock.Mock()
self.hook.create_wrangled_dataset(body_request=self._create_wrangled_dataset_body_request)
assert "HTTPError" in str(ctx.value)
assert mock_post_request.call_count == 5
Expand Down Expand Up @@ -463,8 +463,8 @@ def test_create_output_objects_four_errors(self, mock_post_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_create_output_objects_five_calls(self, mock_post_request):
self.hook.create_output_object.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.create_output_object.retry.sleep = mock.Mock()
self.hook.create_output_object(body_request=self._create_output_object_body_request)
assert "HTTPError" in str(ctx.value)
assert mock_post_request.call_count == 5
Expand Down Expand Up @@ -518,8 +518,8 @@ def test_create_write_settings_four_errors(self, mock_post_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_create_write_settings_five_calls(self, mock_post_request):
self.hook.create_write_settings.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.create_write_settings.retry.sleep = mock.Mock()
self.hook.create_write_settings(body_request=self._create_write_settings_body_request)
assert "HTTPError" in str(ctx.value)
assert mock_post_request.call_count == 5
Expand Down Expand Up @@ -572,8 +572,8 @@ def test_delete_imported_dataset_four_errors(self, mock_delete_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_delete_imported_dataset_five_calls(self, mock_delete_request):
self.hook.delete_imported_dataset.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.delete_imported_dataset.retry.sleep = mock.Mock()
self.hook.delete_imported_dataset(dataset_id=self._imported_dataset_id)
assert "HTTPError" in str(ctx.value)
assert mock_delete_request.call_count == 5
Expand Down Expand Up @@ -655,8 +655,8 @@ def test_create_flow_should_retry_after_four_errors(self, mock_post_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_create_flow_raise_error_after_five_calls(self, mock_post_request):
self.hook.create_flow.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.create_flow.retry.sleep = mock.Mock()
self.hook.create_flow(body_request=self._create_flow_body_request)
assert "HTTPError" in str(ctx.value)
assert mock_post_request.call_count == 5
Expand Down Expand Up @@ -712,8 +712,8 @@ def test_copy_flow_should_retry_after_four_errors(self, mock_get_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_copy_flow_raise_error_after_five_calls(self, mock_get_request):
self.hook.copy_flow.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.copy_flow.retry.sleep = mock.Mock()
self.hook.copy_flow(flow_id=self._flow_id)
assert "HTTPError" in str(ctx.value)
assert mock_get_request.call_count == 5
Expand Down Expand Up @@ -768,8 +768,8 @@ def test_delete_flow_should_retry_after_four_errors(self, mock_get_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_delete_flow_raise_error_after_five_calls(self, mock_get_request):
self.hook.delete_flow.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.delete_flow.retry.sleep = mock.Mock()
self.hook.delete_flow(flow_id=self._flow_id)
assert "HTTPError" in str(ctx.value)
assert mock_get_request.call_count == 5
Expand Down Expand Up @@ -835,8 +835,8 @@ def test_run_flow_should_retry_after_four_errors(self, mock_get_request):
side_effect=[HTTPError(), HTTPError(), HTTPError(), HTTPError(), HTTPError()],
)
def test_run_flow_raise_error_after_five_calls(self, mock_get_request):
self.hook.run_flow.retry.sleep = mock.Mock()
with pytest.raises(RetryError) as ctx:
self.hook.run_flow.retry.sleep = mock.Mock()
self.hook.run_flow(
flow_id=self._flow_id,
body_request={},
Expand Down
9 changes: 4 additions & 5 deletions tests/providers/google/cloud/hooks/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,9 @@ def test_delete_cluster_not_found(self, wait_mock, log_mock):
def test_delete_cluster_error(self, wait_mock, mock_project_id):
# To force an error
self.gke_hook._client.delete_cluster.side_effect = AirflowException("400")

with pytest.raises(AirflowException):
self.gke_hook.delete_cluster(name="a-cluster")
wait_mock.assert_not_called()
wait_mock.assert_not_called()


class TestGKEHookCreate:
Expand Down Expand Up @@ -292,7 +291,7 @@ def test_create_cluster_error(self, wait_mock):

with pytest.raises(AirflowException):
self.gke_hook.create_cluster(mock_cluster_proto)
wait_mock.assert_not_called()
wait_mock.assert_not_called()

@mock.patch(GKE_STRING.format("GKEHook.log"))
@mock.patch(GKE_STRING.format("GKEHook.wait_for_operation"))
Expand All @@ -305,7 +304,7 @@ def test_create_cluster_already_exists(self, wait_mock, log_mock):

with pytest.raises(AlreadyExists):
self.gke_hook.create_cluster(cluster={}, project_id=TEST_GCP_PROJECT_ID)
wait_mock.assert_not_called()
wait_mock.assert_not_called()


class TestGKEHookGet:
Expand Down Expand Up @@ -390,7 +389,7 @@ def test_wait_for_response_exception(self, time_mock):

with pytest.raises(GoogleCloudError):
self.gke_hook.wait_for_operation(mock_op)
assert time_mock.call_count == 1
assert time_mock.call_count == 1

@mock.patch(GKE_STRING.format("GKEHook.get_operation"))
@mock.patch(GKE_STRING.format("time.sleep"))
Expand Down
38 changes: 19 additions & 19 deletions tests/providers/google/cloud/hooks/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,16 @@ def test_pull_fails_on_exception(self, mock_service, exception):

with pytest.raises(PubSubException):
self.pubsub_hook.pull(project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, max_messages=10)
pull_method.assert_called_once_with(
request=dict(
subscription=EXPANDED_SUBSCRIPTION,
max_messages=10,
return_immediately=False,
),
retry=DEFAULT,
timeout=None,
metadata=(),
)
pull_method.assert_called_once_with(
request=dict(
subscription=EXPANDED_SUBSCRIPTION,
max_messages=10,
return_immediately=False,
),
retry=DEFAULT,
timeout=None,
metadata=(),
)

@mock.patch(PUBSUB_STRING.format("PubSubHook.subscriber_client"))
def test_acknowledge_by_ack_ids(self, mock_service):
Expand Down Expand Up @@ -539,15 +539,15 @@ def test_acknowledge_fails_on_exception(self, mock_service, exception):
self.pubsub_hook.acknowledge(
project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, ack_ids=["1", "2", "3"]
)
ack_method.assert_called_once_with(
request=dict(
subscription=EXPANDED_SUBSCRIPTION,
ack_ids=["1", "2", "3"],
),
retry=DEFAULT,
timeout=None,
metadata=(),
)
ack_method.assert_called_once_with(
request=dict(
subscription=EXPANDED_SUBSCRIPTION,
ack_ids=["1", "2", "3"],
),
retry=DEFAULT,
timeout=None,
metadata=(),
)

@pytest.mark.parametrize(
"messages",
Expand Down
Loading

0 comments on commit 9477c16

Please sign in to comment.