Skip to content

Commit

Permalink
Merge pull request #993 from sul-dlss/t985-fix-gobi-dag
Browse files Browse the repository at this point in the history
Refactors transmission tasks and test for GOBI selection and transmission DAGs
  • Loading branch information
jgreben committed May 17, 2024
2 parents 376910f + 590978f commit 0e0ced7
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 68 deletions.
15 changes: 9 additions & 6 deletions libsys_airflow/dags/data_exports/gobi_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from libsys_airflow.plugins.data_exports.instance_ids import (
fetch_record_ids,
save_ids_to_fs,
)

from libsys_airflow.plugins.data_exports.marc.gobi import gobi_list_from_marc_files

from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
Expand Down Expand Up @@ -84,10 +85,12 @@
},
)

send_to_vendor = TriggerDagRunOperator(
task_id="send_ybp_records",
trigger_dag_id="send_ybp_records",
conf={"marc_file_list": "{{ ti.xcom_pull('tbd') }}"},
generate_isbn_list = PythonOperator(
task_id="generate_isbn_lists",
python_callable=gobi_list_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
)

finish_processing_marc = EmptyOperator(
Expand All @@ -97,4 +100,4 @@

fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
fetch_marc_records >> transform_marc_record >> transform_marc_fields
transform_marc_fields >> send_to_vendor >> finish_processing_marc
transform_marc_fields >> generate_isbn_list >> finish_processing_marc
10 changes: 2 additions & 8 deletions libsys_airflow/dags/data_exports/gobi_transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
archive_transmitted_data_task,
)

from libsys_airflow.plugins.data_exports.marc.gobi import gobi_list_from_marc_files

logger = logging.getLogger(__name__)

default_args = {
Expand All @@ -39,15 +37,11 @@ def send_gobi_records():

gather_files = gather_files_task(vendor="gobi")

generate_isbn_list = gobi_list_from_marc_files(gather_files)

transmit_data = transmit_data_ftp_task(
"ftp-ftp.ybp.com-stanford", generate_isbn_list
)
transmit_data = transmit_data_ftp_task("ftp-ftp.ybp.com-stanford", gather_files)

archive_data = archive_transmitted_data_task(transmit_data['success'])

start >> gather_files >> generate_isbn_list >> transmit_data >> archive_data >> end
start >> gather_files >> transmit_data >> archive_data >> end


send_gobi_records()
9 changes: 6 additions & 3 deletions libsys_airflow/plugins/data_exports/marc/gobi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
def gobi_list_from_marc_files(marc_file_list: str):
gobi_lists = []
gobi_transformer = GobiTransformer()
for marc_file in ast.literal_eval(marc_file_list):
gobi_lists.append(gobi_transformer.generate_list(marc_file=marc_file))
marc_list = ast.literal_eval(marc_file_list)
for file in marc_list['updates']:
gobi_lists.append(gobi_transformer.generate_list(marc_file=file))

return gobi_lists


class GobiTransformer(Transformer):
def generate_list(self, marc_file) -> pathlib.Path:
# marc_path is data-export-files/gobi/marc-files/updates/YYYYMMDD.mrc
marc_path = pathlib.Path(marc_file)
gobi_list_name = marc_path.stem
gobi_path = pathlib.Path(marc_path.parent.parent) / f"stf.{gobi_list_name}.txt"
# gobi_path is data-export-files/gobi/marc-files/updates/stf.YYYMMDD.txt
gobi_path = pathlib.Path(marc_path.parent) / f"stf.{gobi_list_name}.txt"

with marc_path.open('rb') as fo:
marc_records = [record for record in pymarc.MARCReader(fo)]
Expand Down
10 changes: 6 additions & 4 deletions libsys_airflow/plugins/data_exports/transmission_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ def gather_files_task(**kwargs) -> dict:
"""
Gets files to send to vendor:
Looks for all the files in the data-export-files/{vendor}/marc-files folder
File glob patterns include "**/" to get the deletes, new, and updates folders
Regardless of date stamp
"""
logger.info("Gathering files to transmit")
airflow = kwargs.get("airflow", "/opt/airflow")
vendor = kwargs["vendor"]
params = kwargs.get("params", {})
bucket = params.get("bucket", {})
marc_filepath = Path(airflow) / f"data-export-files/{vendor}/marc-files/"
file_glob_pattern = "**/*.mrc"
if vendor == "full-dump":
marc_filepath = S3Path(f"/{bucket}/data-export-files/{vendor}/marc-files/")
else:
marc_filepath = Path(airflow) / f"data-export-files/{vendor}/marc-files/"

if vendor == "gobi":
file_glob_pattern = "**/*.txt"
marc_filelist = []
for f in marc_filepath.glob("**/*.mrc"):
for f in marc_filepath.glob(file_glob_pattern):
if f.stat().st_size == 0:
continue
marc_filelist.append(str(f))
Expand Down
16 changes: 8 additions & 8 deletions tests/data_exports/test_gobi_exports_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890123|print|325099\n"
Expand Down Expand Up @@ -153,7 +153,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890123|ebook|325099\n"
Expand Down Expand Up @@ -204,7 +204,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == ""
Expand Down Expand Up @@ -255,7 +255,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890|print|325099\n"
Expand Down Expand Up @@ -305,7 +305,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890123|print|325099\n"
Expand Down Expand Up @@ -359,7 +359,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890123|print|325099\n"
Expand Down Expand Up @@ -410,7 +410,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890123|print|325099\n"
Expand Down Expand Up @@ -461,7 +461,7 @@ def mock_folio_get(*args):
transformer = gobi_transformer.GobiTransformer()
transformer.generate_list(marc_file)

gobi_file = pathlib.Path(marc_file.parent.parent / f"stf.{file_date}.txt")
gobi_file = pathlib.Path(marc_file.parent / f"stf.{file_date}.txt")

with gobi_file.open('r+') as fo:
assert fo.readline() == "1234567890123|print|325099\n"
Expand Down
123 changes: 84 additions & 39 deletions tests/data_exports/test_transmission_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import httpx

from http import HTTPStatus
from unittest import mock

from airflow.models import Connection

Expand All @@ -17,10 +16,35 @@
)


@pytest.fixture(params=["pod", "gobi"])
def mock_vendor_marc_files(tmp_path, request):
airflow = tmp_path / "airflow"
vendor = request.param
vendor_dir = airflow / f"data-export-files/{vendor}/"
marc_file_dir = vendor_dir / "marc-files" / "updates"
marc_file_dir.mkdir(parents=True)
setup_files = {
"filenames": [
"2024022914.mrc",
"2024030114.mrc",
"2024030214.mrc",
"2024030214.txt",
]
}
files = []
for i, x in enumerate(setup_files['filenames']):
file = pathlib.Path(f"{marc_file_dir}/{x}")
file.touch()
if i in [0, 3, 4]:
file.write_text("hello world")
files.append(str(file))
return {"file_list": files, "s3": False}


@pytest.fixture
def mock_file_system(tmp_path):
airflow = tmp_path / "airflow"
vendor_dir = airflow / "data-export-files/oclc/"
vendor_dir = airflow / "data-export-files/some-vendor/"
marc_file_dir = vendor_dir / "marc-files" / "updates"
marc_file_dir.mkdir(parents=True)
instance_id_dir = vendor_dir / "instanceids" / "updates"
Expand All @@ -31,12 +55,15 @@ def mock_file_system(tmp_path):
return [airflow, marc_file_dir, instance_id_dir, archive_dir]


# Mock xcom messages dict
@pytest.fixture
def mock_marc_files(mock_file_system):
marc_file_dir = mock_file_system[1]
setup_marc_files = {
"marc_files": ["2024022914.mrc", "2024030114.mrc", "2024030214.mrc"]
"marc_files": [
"2024022914.mrc",
"2024030114.mrc",
"2024030214.mrc",
]
}
marc_files = []
for i, x in enumerate(setup_marc_files['marc_files']):
Expand Down Expand Up @@ -89,10 +116,11 @@ def mock_httpx_failure():
)


def test_gather_files_task(mock_file_system, mock_marc_files):
airflow = mock_file_system[0]
marc_files = gather_files_task.function(airflow=airflow, vendor="oclc")
assert marc_files["file_list"][0] == mock_marc_files["file_list"][0]
@pytest.mark.parametrize("mock_vendor_marc_files", ["pod"], indirect=True)
def test_gather_files_task(tmp_path, mock_vendor_marc_files):
airflow = tmp_path / "airflow"
marc_files = gather_files_task.function(airflow=airflow, vendor="pod")
assert marc_files["file_list"][0] == mock_vendor_marc_files["file_list"][0]


def test_gather_full_dump_files(mocker):
Expand All @@ -103,6 +131,14 @@ def test_gather_full_dump_files(mocker):
assert marc_files["s3"]


@pytest.mark.parametrize("mock_vendor_marc_files", ["gobi"], indirect=True)
def test_gather_gobi_files(tmp_path, mock_vendor_marc_files):
airflow = tmp_path / "airflow"
marc_files = gather_files_task.function(airflow=airflow, vendor="gobi")
assert marc_files["file_list"][0] == mock_vendor_marc_files["file_list"][-1]
assert len(marc_files["file_list"]) == 1


def test_transmit_data_ftp_task(
mocker, mock_ftphook_connection, mock_marc_files, caplog
):
Expand All @@ -114,15 +150,17 @@ def test_transmit_data_ftp_task(
return_value=mock_ftphook_connection,
)

with mock.patch.dict("os.environ", AIRFLOW_VAR_OKAPI_URL="http://okapi-prod"):
transmit_data = transmit_data_ftp_task.function(
"ftp-example.com", mock_marc_files
)
assert len(transmit_data["success"]) == 3
assert "Start transmission of file" in caplog.text
assert ftp_hook.store_file.called_with(
"/remote/path/dir/2024022914.mrc", "2024022914.mrc"
)
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.is_production",
return_value=True,
)

transmit_data = transmit_data_ftp_task.function("ftp-example.com", mock_marc_files)
assert len(transmit_data["success"]) == 3
assert "Start transmission of file" in caplog.text
assert ftp_hook.store_file.called_with(
"/remote/path/dir/2024022914.mrc", "2024022914.mrc"
)


def test_transmit_data_task(
Expand All @@ -136,13 +174,15 @@ def test_transmit_data_task(
"libsys_airflow.plugins.data_exports.transmission_tasks.Connection.get_connection_from_secrets",
return_value=mock_httpx_connection,
)

with mock.patch.dict("os.environ", AIRFLOW_VAR_OKAPI_URL="http://okapi-prod"):
transmit_data = transmit_data_http_task.function(
mock_marc_files, files_params="upload[files][]", params={"vendor": "pod"}
)
assert len(transmit_data["success"]) == 3
assert "Transmit data to pod" in caplog.text
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.is_production",
return_value=True,
)
transmit_data = transmit_data_http_task.function(
mock_marc_files, files_params="upload[files][]", params={"vendor": "pod"}
)
assert len(transmit_data["success"]) == 3
assert "Transmit data to pod" in caplog.text


def test_transmit_data_from_s3_task(
Expand All @@ -156,16 +196,19 @@ def test_transmit_data_from_s3_task(
"libsys_airflow.plugins.data_exports.transmission_tasks.Connection.get_connection_from_secrets",
return_value=mock_httpx_connection,
)
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.is_production",
return_value=True,
)
mocker.patch.object(transmission_tasks, "S3Path", pathlib.Path)
mock_marc_files["s3"] = True

with mock.patch.dict("os.environ", AIRFLOW_VAR_OKAPI_URL="http://okapi-prod"):
transmit_data_from_s3 = transmit_data_http_task.function(
mock_marc_files,
files_params="upload[files][]",
params={"vendor": "pod", "bucket": "data-export-test"},
)
assert len(transmit_data_from_s3["success"]) == 3
transmit_data_from_s3 = transmit_data_http_task.function(
mock_marc_files,
files_params="upload[files][]",
params={"vendor": "pod", "bucket": "data-export-test"},
)
assert len(transmit_data_from_s3["success"]) == 3


def test_transmit_data_failed(
Expand All @@ -179,14 +222,16 @@ def test_transmit_data_failed(
"libsys_airflow.plugins.data_exports.transmission_tasks.Connection.get_connection_from_secrets",
return_value=mock_httpx_connection,
)

with mock.patch.dict("os.environ", AIRFLOW_VAR_OKAPI_URL="http://okapi-prod"):
transmit_data = transmit_data_http_task.function(
mock_marc_files,
params={"vendor": "pod"},
)
assert len(transmit_data["failures"]) == 3
assert "Transmit data to pod" in caplog.text
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.is_production",
return_value=True,
)
transmit_data = transmit_data_http_task.function(
mock_marc_files,
params={"vendor": "pod"},
)
assert len(transmit_data["failures"]) == 3
assert "Transmit data to pod" in caplog.text


def test_archive_transmitted_data_task(mock_file_system, mock_marc_files):
Expand Down

0 comments on commit 0e0ced7

Please sign in to comment.