From 16246d1ecfc444688b8c1d871083da6559f84470 Mon Sep 17 00:00:00 2001 From: Jeremy Nelson Date: Fri, 26 Aug 2022 09:45:37 -0700 Subject: [PATCH] Supports ON-ORDER records with fixed tests --- dags/bib_records.py | 2 +- plugins/folio/helpers.py | 5 +++ plugins/folio/items.py | 88 +++++++++++++++++++++---------------- plugins/tests/test_items.py | 17 ++++--- 4 files changed, 67 insertions(+), 45 deletions(-) diff --git a/dags/bib_records.py b/dags/bib_records.py index 78dd724e..b01b3eaa 100644 --- a/dags/bib_records.py +++ b/dags/bib_records.py @@ -116,7 +116,7 @@ def marc_only(*args, **kwargs): # Adds a prefix to match bib 001 ("CATKEY", lambda x: x if x.startswith("a") else f"a{x}"), # Strips out spaces from barcode - ("BARCODE", lambda x: x.strip()), + ("BARCODE", lambda x: x.strip() if isinstance(x, str) else x), ], "tsv_files": "{{ ti.xcom_pull('bib-file-groups', key='tsv-files') }}", # noqa }, diff --git a/plugins/folio/helpers.py b/plugins/folio/helpers.py index 92414688..b151bb26 100644 --- a/plugins/folio/helpers.py +++ b/plugins/folio/helpers.py @@ -383,6 +383,11 @@ def _processes_tsv(tsv_base: str, tsv_notes: list, airflow, column_transforms): logging.info(f"Merged {len(note_df)} notes into items tsv") tsv_notes_path.unlink() + # Add note columns to tsv_base_df if notes do not exist + if len(tsv_notes) < 1: + for note in ['CIRCNOTE', 'CIRCNOTE', 'TECHSTAFF', 'PUBLIC']: + tsv_base_df[note] = np.NaN + tsv_notes_name_parts = tsv_base.name.split(".") tsv_notes_name_parts.insert(-1, "notes") diff --git a/plugins/folio/items.py b/plugins/folio/items.py index e508e88b..6b1a14c4 100644 --- a/plugins/folio/items.py +++ b/plugins/folio/items.py @@ -1,5 +1,6 @@ import json import logging +import pathlib from folio_migration_tools.migration_tasks.items_transformer import ItemsTransformer from folio_uuid.folio_uuid import FOLIONamespaces, FolioUUID @@ -9,47 +10,57 @@ logger = logging.getLogger(__name__) -def _add_hrid(okapi_url: str, holdings_path: str, items_path: str): - """Adds an HRID based on Holdings formerIds""" - - # Initializes Holdings lookup and counter +def _generate_holdings_keys(results_dir: pathlib.Path, holdings_pattern: str) -> dict: + """Initializes Holdings lookup and counter for hrid generation""" holdings_keys = {} - with open(holdings_path) as fo: - for line in fo.readlines(): - holdings_record = json.loads(line) - holdings_keys[holdings_record["id"]] = { - "formerId": holdings_record["formerIds"][0], - "counter": 0, - } + for holdings_file in results_dir.glob(holdings_pattern): + with holdings_file.open() as fo: + for line in fo.readlines(): + holdings_record = json.loads(line) + holdings_keys[holdings_record["id"]] = { + "formerId": holdings_record["formerIds"][0], + "counter": 0, + } + + return holdings_keys + + +def _add_hrid(okapi_url: str, airflow: str, holdings_pattern: str, items_pattern: str): + """Adds an HRID based on Holdings formerIds""" + results_dir = pathlib.Path(f"{airflow}/migration/results") + + holdings_keys = _generate_holdings_keys(results_dir, holdings_pattern) items = [] - with open(items_path) as fo: - for line in fo.readlines(): - item = json.loads(line) - holding = holdings_keys[item["holdingsRecordId"]] - former_id = holding["formerId"] - holding["counter"] = holding["counter"] + 1 - hrid_prefix = former_id[:1] + "i" + former_id[1:] - item["hrid"] = f"{hrid_prefix}_{holding['counter']}" - if "barcode" in item: - id_seed = item["barcode"] - else: - id_seed = item["hrid"] - item["id"] = str( - FolioUUID( - okapi_url, - FOLIONamespaces.items, - id_seed, + for items_file in results_dir.glob(items_pattern): + + with items_file.open() as fo: + for line in fo.readlines(): + item = json.loads(line) + holding = holdings_keys[item["holdingsRecordId"]] + former_id = holding["formerId"] + holding["counter"] = holding["counter"] + 1 + hrid_prefix = former_id[:1] + "i" + former_id[1:] + item["hrid"] = f"{hrid_prefix}_{holding['counter']}" + if "barcode" in item: + id_seed = item["barcode"] + else: + id_seed = item["hrid"] + item["id"] = str( + FolioUUID( + okapi_url, + FOLIONamespaces.items, + id_seed, + ) ) - ) - # To handle optimistic locking - item["_version"] = 1 - items.append(item) + # To handle optimistic locking + item["_version"] = 1 + items.append(item) - with open(items_path, "w+") as write_output: - for item in items: - write_output.write(f"{json.dumps(item)}\n") + with open(items_file, "w+") as write_output: + for item in items: + write_output.write(f"{json.dumps(item)}\n") def post_folio_items_records(**kwargs): @@ -63,7 +74,7 @@ def post_folio_items_records(**kwargs): items_records = json.load(fo) for i in range(0, len(items_records), batch_size): - items_batch = items_records[i: i + batch_size] + items_batch = items_records[i:i + batch_size] logger.info(f"Posting {len(items_batch)} in batch {i/batch_size}") post_to_okapi( token=kwargs["task_instance"].xcom_pull( @@ -111,6 +122,7 @@ def run_items_transformer(*args, **kwargs) -> bool: _add_hrid( items_transformer.folio_client.okapi_url, - f"{airflow}/migration/results/folio_holdings_{dag.run_id}_holdings-transformer.json", - f"{airflow}/migration/results/folio_items_{dag.run_id}_items-transformer.json", + airflow, + f"folio_holdings_{dag.run_id}_holdings-*transformer.json", + f"folio_items_{dag.run_id}_items-*transformer.json", ) diff --git a/plugins/tests/test_items.py b/plugins/tests/test_items.py index aded7334..c6a266f5 100644 --- a/plugins/tests/test_items.py +++ b/plugins/tests/test_items.py @@ -1,6 +1,8 @@ import json import pytest # noqa +from plugins.tests.mocks import mock_file_system # noqa + from plugins.folio.items import ( post_folio_items_records, @@ -17,17 +19,19 @@ def test_items_transformers(): assert run_items_transformer -def test_add_hrid(tmp_path): # noqa - holdings_path = tmp_path / "holdings_transformer-test_dag.json" +def test_add_hrid(mock_file_system): # noqa + results_dir = mock_file_system[3] + holdings_path = results_dir / "holdings_transformer-test_dag.json" holdings_rec = { "id": "8e6e9fb5-f914-4d38-87d2-ccb52f9a44a4", - "formerIds": ["a23456"] + "formerIds": ["a23456"], + "hrid": "ah23456_1" } holdings_path.write_text(f"{json.dumps(holdings_rec)}\n") - items_path = tmp_path / "items_transformer-test_dag.json" + items_path = results_dir / "items_transformer-test_dag.json" items_rec = { "holdingsRecordId": "8e6e9fb5-f914-4d38-87d2-ccb52f9a44a4" @@ -36,8 +40,9 @@ def test_add_hrid(tmp_path): # noqa items_path.write_text(f"{json.dumps(items_rec)}\n") _add_hrid("https://okapi-endpoint.edu", - str(holdings_path), - str(items_path)) + str(mock_file_system[0]), + "holdings_transformer-*.json", + "items_transformer-*.json") with items_path.open() as items_fo: new_items_rec = json.loads(items_fo.readline())