diff --git a/dags/bib_records.py b/dags/bib_records.py index 5e4908c64..fbadfbde9 100644 --- a/dags/bib_records.py +++ b/dags/bib_records.py @@ -135,6 +135,7 @@ def marc_only(*args, **kwargs): # Strips out spaces from barcode ("BARCODE", lambda x: x.strip()), ], + "tsv_stem": "{{ ti.xcom_pull('move-transform.move-marc-files') }}", # noqa "source": "symphony", }, ) diff --git a/migration b/migration index cd255f3c0..8e253dae7 160000 --- a/migration +++ b/migration @@ -1 +1 @@ -Subproject commit cd255f3c04b8acaa590c25fd62cf6a7694b49965 +Subproject commit 8e253dae7508dcdc6192689f7a19e84284921a42 diff --git a/plugins/folio/helpers.py b/plugins/folio/helpers.py index f6d8decdf..f2c948bca 100644 --- a/plugins/folio/helpers.py +++ b/plugins/folio/helpers.py @@ -3,6 +3,7 @@ import pathlib import shutil +import numpy as np import pandas as pd import pymarc import requests @@ -219,40 +220,80 @@ def transformer_data_issues(transformer, message, *args, **kwargs): logging.getLogger().addHandler(data_issue_file_handler) -def _apply_processing_tsv(tsv_path, airflow, column_transforms): - df = pd.read_csv(tsv_path, sep="\t", dtype=object) - # Performs any transformations to values +def _apply_transforms(df, column_transforms): for transform in column_transforms: column = transform[0] if column in df: function = transform[1] df[column] = df[column].apply(function) - new_tsv_path = pathlib.Path(f"{airflow}/migration/data/items/{tsv_path.name}") - df.to_csv(new_tsv_path, sep="\t", index=False) - tsv_path.unlink() + return df -def _get_tsv(**kwargs): - airflow = kwargs.get("airflow", "/opt/airflow") - source_directory = kwargs["source"] +def _merge_notes_into_base(base_df: pd.DataFrame, note_df: pd.DataFrame): + def _populate_column(barcode): + matched_series = note_df.loc[note_df["BARCODE"] == barcode] + if len(matched_series) > 0: + return matched_series[note_type].values[0] + else: + return np.nan + + # Extract name of non-barcode column + note_columns = list(note_df.columns) + note_columns.pop(note_columns.index("BARCODE")) + note_type = note_columns[0] + base_df[note_type] = base_df["BARCODE"].apply(_populate_column) + return base_df + + +def _processes_tsv(tsv_base, tsv_notes, airflow, column_transforms): + items_dir = pathlib.Path(f"{airflow}/migration/data/items/") + + tsv_base_df = pd.read_csv(tsv_base, sep="\t", dtype=object) + tsv_base_df = _apply_transforms(tsv_base_df, column_transforms) + new_tsv_base_path = items_dir / tsv_base.name + + tsv_base_df.to_csv(new_tsv_base_path, sep="\t", index=False) + tsv_base.unlink() + + # Iterate on tsv notes and merge into the tsv base DF based on barcode + for tsv_notes_path in tsv_notes: + note_df = pd.read_csv(tsv_notes_path, sep="\t", dtype=object) + note_df = _apply_transforms(note_df, column_transforms) + tsv_base_df = _merge_notes_into_base(tsv_base_df, note_df) + logging.info(f"Merged {len(note_df)} notes into items tsv") + tsv_notes_path.unlink() + + tsv_notes_name_parts = tsv_base.name.split(".") + tsv_notes_name_parts.insert(-1, "notes") - return [path for path in pathlib.Path(f"{airflow}/{source_directory}/").glob("*.tsv")] + tsv_notes_name = ".".join(tsv_notes_name_parts) + + new_tsv_notes_path = pathlib.Path(f"{airflow}/migration/data/items/{tsv_notes_name}") + tsv_base_df.to_csv(new_tsv_notes_path, sep="\t", index=False) + + return new_tsv_notes_path.name + + +def _get_tsv_notes(tsv_stem, airflow, source_directory): + + return [path for path in pathlib.Path(f"{airflow}/{source_directory}/").glob(f"{tsv_stem}.*.tsv")] def transform_move_tsvs(*args, **kwargs): airflow = kwargs.get("airflow", "/opt/airflow") column_transforms = kwargs.get("column_transforms", []) + tsv_stem = kwargs["tsv_stem"] + source_directory = kwargs["source"] - tsv_paths = _get_tsv(**kwargs) - - path_names = [f"{path.name}.tsv" for path in tsv_paths] + tsv_base = pathlib.Path(f"{airflow}/{source_directory}/{tsv_stem}.tsv") - if len(tsv_paths) < 1: + if not tsv_base.exists(): raise ValueError( - "No csv files exist for workflow" + f"{tsv_base} does not exist for workflow" ) - for path in tsv_paths: - _apply_processing_tsv(path, airflow, column_transforms) + tsv_notes = _get_tsv_notes(tsv_stem, airflow, source_directory) + + notes_path_name = _processes_tsv(tsv_base, tsv_notes, airflow, column_transforms) - return path_names + return [tsv_base.name, notes_path_name] diff --git a/plugins/folio/items.py b/plugins/folio/items.py index 1e3c2832a..43f801de1 100644 --- a/plugins/folio/items.py +++ b/plugins/folio/items.py @@ -2,13 +2,14 @@ import logging from folio_migration_tools.migration_tasks.items_transformer import ItemsTransformer +from folio_uuid.folio_uuid import FOLIONamespaces, FolioUUID from plugins.folio.helpers import post_to_okapi, setup_data_logging logger = logging.getLogger(__name__) -def _add_hrid(holdings_path: str, items_path: str): +def _add_hrid(okapi_url: str, holdings_path: str, items_path: str): """Adds an HRID based on Holdings formerIds""" # Initializes Holdings lookup and counter @@ -31,6 +32,17 @@ def _add_hrid(holdings_path: str, items_path: str): holding["counter"] = holding["counter"] + 1 hrid_prefix = former_id[:1] + "i" + former_id[1:] item["hrid"] = f"{hrid_prefix}_{holding['counter']}" + if "barcode" in item: + id_seed = item["barcode"] + else: + id_seed = item["hrid"] + item["id"] = str( + FolioUUID( + okapi_url, + FOLIONamespaces.items, + id_seed, + ) + ) items.append(item) with open(items_path, "w+") as write_output: @@ -76,7 +88,7 @@ def run_items_transformer(*args, **kwargs) -> bool: name="items-transformer", migration_task_type="ItemsTransformer", hrid_handling="preserve001", - files=[{"file_name": f"{items_stem}.tsv", "suppress": False}], + files=[{"file_name": f"{items_stem}.notes.tsv", "suppress": False}], items_mapping_file_name="item_mapping.json", location_map_file_name="locations.tsv", default_call_number_type_name="Library of Congress classification", @@ -96,6 +108,7 @@ def run_items_transformer(*args, **kwargs) -> bool: items_transformer.wrap_up() _add_hrid( + items_transformer.folio_client.okapi_url, f"{airflow}/migration/results/folio_holdings_{dag.run_id}_holdings-transformer.json", f"{airflow}/migration/results/folio_items_{dag.run_id}_items-transformer.json", ) diff --git a/plugins/folio/marc.py b/plugins/folio/marc.py index bc8344815..9f5ff1a49 100644 --- a/plugins/folio/marc.py +++ b/plugins/folio/marc.py @@ -12,9 +12,9 @@ def post_marc_to_srs(*args, **kwargs): name="marc-to-srs-batch-poster", migration_task_type="BatchPoster", object_type="SRS", - file={ + files=[{ "file_name": f"folio_srs_instances_{dag.run_id}_bibs-transformer.json" # noqa - }, + }], batch_size=kwargs.get("MAX_ENTITIES", 1000), ) diff --git a/plugins/tests/test_helpers.py b/plugins/tests/test_helpers.py index 7904e327b..5b02a50a1 100644 --- a/plugins/tests/test_helpers.py +++ b/plugins/tests/test_helpers.py @@ -1,5 +1,7 @@ import logging +import numpy as np +import pandas as pd import pytest import pydantic import requests @@ -13,6 +15,7 @@ move_marc_files_check_tsv, post_to_okapi, process_marc, + _merge_notes_into_base, _move_001_to_035, transform_move_tsvs, process_records, @@ -252,13 +255,20 @@ def test_transform_move_tsvs(mock_file_system): airflow_path = mock_file_system[0] source_dir = mock_file_system[1] - # mock sample csv and tsv + # mock sample tsv symphony_tsv = source_dir / "sample.tsv" symphony_tsv.write_text( "CATKEY\tCALL_NUMBER_TYPE\tBARCODE\n123456\tLC 12345\t45677 ") tsv_directory = airflow_path / "migration/data/items" tsv_directory.mkdir(parents=True) sample_tsv = tsv_directory / "sample.tsv" + sample_notes_tsv = tsv_directory / "sample.notes.tsv" + + # mock sample CIRCNOTE tsv + symphony_circnotes_tsv = source_dir / "sample.circnote.tsv" + symphony_circnotes_tsv.write_text( + "BARCODE\tCIRCNOTE\n45677 \tpencil marks 7/28/18cc" + ) column_transforms = [("CATKEY", lambda x: f"a{x}"), ("BARCODE", lambda x: x.strip())] @@ -267,11 +277,47 @@ def test_transform_move_tsvs(mock_file_system): airflow=airflow_path, column_transforms=column_transforms, source="symphony", + tsv_stem="sample" ) + f = open(sample_tsv, "r") assert f.readlines()[1] == "a123456\tLC 12345\t45677\n" f.close() + f_notes = open(sample_notes_tsv, "r") + assert f_notes.readlines()[1] == "a123456\tLC 12345\t45677\tpencil marks 7/28/18cc\n" + f_notes.close() + + +def test_transform_move_tsvs_doesnt_exit(mock_file_system): + airflow_path = mock_file_system[0] + + with pytest.raises(ValueError, match="sample.tsv does not exist for workflow"): + transform_move_tsvs( + airflow=airflow_path, + source="symphony", + tsv_stem="sample" + ) + + +def test_merge_notes_into_base(): + base_df = pd.DataFrame([{"CATKEY": "a1442278", + "BARCODE": "36105033974929", + "BASE_CALL_NUMBER": "PQ6407 .A1 1980B"}, + {"CATKEY": "a13776856", + "BARCODE": "36105231406765", + "BASE_CALL_NUMBER": "KGF3055 .M67 2019"}]) + notes_df = pd.DataFrame([{"BARCODE": "36105033974929", + "CIRCNOTE": "pen marks 6/5/19cc"}]) + base_df = _merge_notes_into_base(base_df, notes_df) + assert "CIRCNOTE" in base_df.columns + + note_row = base_df.loc[base_df["BARCODE"] == "36105033974929"] + assert note_row["CIRCNOTE"].item() == "pen marks 6/5/19cc" + + no_note_row = base_df.loc[base_df["BARCODE"] == "36105231406765"] + assert no_note_row["CIRCNOTE"].item() is np.nan + def test_process_records(mock_dag_run, mock_file_system): airflow_path = mock_file_system[0] diff --git a/plugins/tests/test_items.py b/plugins/tests/test_items.py index 33cfad5d5..e2c5a111e 100644 --- a/plugins/tests/test_items.py +++ b/plugins/tests/test_items.py @@ -35,9 +35,12 @@ def test_add_hrid(tmp_path): # noqa items_path.write_text(f"{json.dumps(items_rec)}\n") - _add_hrid(str(holdings_path), str(items_path)) + _add_hrid("https://okapi-endpoint.edu", + str(holdings_path), + str(items_path)) with items_path.open() as items_fo: new_items_rec = json.loads(items_fo.readline()) assert(new_items_rec['hrid']) == "ai23456_1" + assert(new_items_rec['id']) == "f40ad979-32e8-5f54-bb3d-698c0f611a54"