Skip to content

Commit

Permalink
Updates DAGs to use kwargs for email
Browse files Browse the repository at this point in the history
  • Loading branch information
jermnelson committed May 17, 2024
1 parent d474428 commit fe93228
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 35 deletions.
12 changes: 2 additions & 10 deletions libsys_airflow/dags/vendor/data_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from libsys_airflow.plugins.vendor.download import ftp_download_task
from libsys_airflow.plugins.vendor.archive import archive_task
from libsys_airflow.plugins.vendor.paths import download_path
from libsys_airflow.plugins.vendor.emails import email_args, files_fetched_email_task
from libsys_airflow.plugins.vendor.emails import files_fetched_email_task

logger = logging.getLogger(__name__)

Expand All @@ -29,7 +29,6 @@
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
**email_args(),
)

with DAG(
Expand Down Expand Up @@ -94,11 +93,4 @@ def setup():
params["vendor_interface_uuid"],
)

files_fetched_email_task(
params["vendor_uuid"],
params["vendor_interface_name"],
params["vendor_code"],
params["vendor_interface_uuid"],
downloaded_files,
params["environment"],
)
files_fetched_email_task(downloaded_files, **params)
30 changes: 5 additions & 25 deletions libsys_airflow/dags/vendor/default_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
data_import_branch_task,
)
from libsys_airflow.plugins.vendor.emails import (
email_args,
file_loaded_email_task,
file_not_loaded_email_task,
)
Expand Down Expand Up @@ -41,7 +40,6 @@
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
**email_args(),
)

with DAG(
Expand Down Expand Up @@ -154,34 +152,16 @@ def setup():
)

file_loaded_sensor = file_loaded_sensor_task(
params["vendor_interface_uuid"], filename, data_import["job_execution_id"]
)

job_summary = job_summary_task(data_import["job_execution_id"])

file_loaded_email_task(
params["vendor_uuid"],
params["vendor_code"],
params["vendor_interface_name"],
params["vendor_interface_uuid"],
params["filename"],
data_import["job_execution_id"],
params["download_path"],
filename,
params["start_time"],
processed_params["records_count"],
job_summary["srs_stats"],
job_summary["instance_stats"],
params["environment"],
)

file_not_loaded_email = file_not_loaded_email_task(**params)
params["vendor_interface_name"],
job_summary = job_summary_task(data_import["job_execution_id"])

params["vendor_code"],
params["vendor_interface_uuid"],
filename,
params["environment"],
)
file_loaded_email_task({**processed_params, **params})

file_not_loaded_email = file_not_loaded_email_task({**processed_params, **params})

data_import_branch >> data_import >> file_loaded_sensor >> job_summary
data_import_branch >> file_not_loaded_email

0 comments on commit fe93228

Please sign in to comment.