Skip to content

Commit

Permalink
Merge 3b48f9f into befe709
Browse files Browse the repository at this point in the history
  • Loading branch information
jermnelson committed May 6, 2022
2 parents befe709 + 3b48f9f commit e07bcd6
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 25 deletions.
1 change: 1 addition & 0 deletions dags/bib_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def marc_only(*args, **kwargs):
# Strips out spaces from barcode
("BARCODE", lambda x: x.strip()),
],
"tsv_stem": "{{ ti.xcom_pull('move-transform.move-marc-files') }}", # noqa
"source": "symphony",
},
)
Expand Down
77 changes: 59 additions & 18 deletions plugins/folio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib
import shutil

import numpy as np
import pandas as pd
import pymarc
import requests
Expand Down Expand Up @@ -219,40 +220,80 @@ def transformer_data_issues(transformer, message, *args, **kwargs):
logging.getLogger().addHandler(data_issue_file_handler)


def _apply_processing_tsv(tsv_path, airflow, column_transforms):
df = pd.read_csv(tsv_path, sep="\t", dtype=object)
# Performs any transformations to values
def _apply_transforms(df, column_transforms):
for transform in column_transforms:
column = transform[0]
if column in df:
function = transform[1]
df[column] = df[column].apply(function)
new_tsv_path = pathlib.Path(f"{airflow}/migration/data/items/{tsv_path.name}")
df.to_csv(new_tsv_path, sep="\t", index=False)
tsv_path.unlink()
return df


def _get_tsv(**kwargs):
airflow = kwargs.get("airflow", "/opt/airflow")
source_directory = kwargs["source"]
def _merge_notes_into_base(base_df: pd.DataFrame, note_df: pd.DataFrame):
def _populate_column(barcode):
matched_series = note_df.loc[note_df["BARCODE"] == barcode]
if len(matched_series) > 0:
return matched_series[note_type].values[0]
else:
return np.nan

# Extract name of non-barcode column
note_columns = list(note_df.columns)
note_columns.pop(note_columns.index("BARCODE"))
note_type = note_columns[0]
base_df[note_type] = base_df["BARCODE"].apply(_populate_column)
return base_df


def _processes_tsv(tsv_base, tsv_notes, airflow, column_transforms):
items_dir = pathlib.Path(f"{airflow}/migration/data/items/")

tsv_base_df = pd.read_csv(tsv_base, sep="\t", dtype=object)
tsv_base_df = _apply_transforms(tsv_base_df, column_transforms)
new_tsv_base_path = items_dir / tsv_base.name

tsv_base_df.to_csv(new_tsv_base_path, sep="\t", index=False)
tsv_base.unlink()

# Iterate on tsv notes and merge into the tsv base DF based on barcode
for tsv_notes_path in tsv_notes:
note_df = pd.read_csv(tsv_notes_path, sep="\t", dtype=object)
note_df = _apply_transforms(note_df, column_transforms)
tsv_base_df = _merge_notes_into_base(tsv_base_df, note_df)
logging.info(f"Merged {len(note_df)} notes into items tsv")
tsv_notes_path.unlink()

tsv_notes_name_parts = tsv_base.name.split(".")
tsv_notes_name_parts.insert(-1, "notes")

return [path for path in pathlib.Path(f"{airflow}/{source_directory}/").glob("*.tsv")]
tsv_notes_name = ".".join(tsv_notes_name_parts)

new_tsv_notes_path = pathlib.Path(f"{airflow}/migration/data/items/{tsv_notes_name}")
tsv_base_df.to_csv(new_tsv_notes_path, sep="\t", index=False)

return new_tsv_notes_path.name


def _get_tsv_notes(tsv_stem, airflow, source_directory):

return [path for path in pathlib.Path(f"{airflow}/{source_directory}/").glob(f"{tsv_stem}.*.tsv")]


def transform_move_tsvs(*args, **kwargs):
airflow = kwargs.get("airflow", "/opt/airflow")
column_transforms = kwargs.get("column_transforms", [])
tsv_stem = kwargs["tsv_stem"]
source_directory = kwargs["source"]

tsv_paths = _get_tsv(**kwargs)

path_names = [f"{path.name}.tsv" for path in tsv_paths]
tsv_base = pathlib.Path(f"{airflow}/{source_directory}/{tsv_stem}.tsv")

if len(tsv_paths) < 1:
if not tsv_base.exists():
raise ValueError(
"No csv files exist for workflow"
f"{tsv_base} does not exist for workflow"
)

for path in tsv_paths:
_apply_processing_tsv(path, airflow, column_transforms)
tsv_notes = _get_tsv_notes(tsv_stem, airflow, source_directory)

notes_path_name = _processes_tsv(tsv_base, tsv_notes, airflow, column_transforms)

return path_names
return [tsv_base.name, notes_path_name]
17 changes: 15 additions & 2 deletions plugins/folio/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import logging

from folio_migration_tools.migration_tasks.items_transformer import ItemsTransformer
from folio_uuid.folio_uuid import FOLIONamespaces, FolioUUID

from plugins.folio.helpers import post_to_okapi, setup_data_logging

logger = logging.getLogger(__name__)


def _add_hrid(holdings_path: str, items_path: str):
def _add_hrid(okapi_url: str, holdings_path: str, items_path: str):
"""Adds an HRID based on Holdings formerIds"""

# Initializes Holdings lookup and counter
Expand All @@ -31,6 +32,17 @@ def _add_hrid(holdings_path: str, items_path: str):
holding["counter"] = holding["counter"] + 1
hrid_prefix = former_id[:1] + "i" + former_id[1:]
item["hrid"] = f"{hrid_prefix}_{holding['counter']}"
if "barcode" in item:
id_seed = item["barcode"]
else:
id_seed = item["hrid"]
item["id"] = str(
FolioUUID(
okapi_url,
FOLIONamespaces.items,
id_seed,
)
)
items.append(item)

with open(items_path, "w+") as write_output:
Expand Down Expand Up @@ -76,7 +88,7 @@ def run_items_transformer(*args, **kwargs) -> bool:
name="items-transformer",
migration_task_type="ItemsTransformer",
hrid_handling="preserve001",
files=[{"file_name": f"{items_stem}.tsv", "suppress": False}],
files=[{"file_name": f"{items_stem}.notes.tsv", "suppress": False}],
items_mapping_file_name="item_mapping.json",
location_map_file_name="locations.tsv",
default_call_number_type_name="Library of Congress classification",
Expand All @@ -96,6 +108,7 @@ def run_items_transformer(*args, **kwargs) -> bool:
items_transformer.wrap_up()

_add_hrid(
items_transformer.folio_client.okapi_url,
f"{airflow}/migration/results/folio_holdings_{dag.run_id}_holdings-transformer.json",
f"{airflow}/migration/results/folio_items_{dag.run_id}_items-transformer.json",
)
4 changes: 2 additions & 2 deletions plugins/folio/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ def post_marc_to_srs(*args, **kwargs):
name="marc-to-srs-batch-poster",
migration_task_type="BatchPoster",
object_type="SRS",
file={
files=[{
"file_name": f"folio_srs_instances_{dag.run_id}_bibs-transformer.json" # noqa
},
}],
batch_size=kwargs.get("MAX_ENTITIES", 1000),
)

Expand Down
48 changes: 47 additions & 1 deletion plugins/tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

import numpy as np
import pandas as pd
import pytest
import pydantic
import requests
Expand All @@ -13,6 +15,7 @@
move_marc_files_check_tsv,
post_to_okapi,
process_marc,
_merge_notes_into_base,
_move_001_to_035,
transform_move_tsvs,
process_records,
Expand Down Expand Up @@ -252,13 +255,20 @@ def test_transform_move_tsvs(mock_file_system):
airflow_path = mock_file_system[0]
source_dir = mock_file_system[1]

# mock sample csv and tsv
# mock sample tsv
symphony_tsv = source_dir / "sample.tsv"
symphony_tsv.write_text(
"CATKEY\tCALL_NUMBER_TYPE\tBARCODE\n123456\tLC 12345\t45677 ")
tsv_directory = airflow_path / "migration/data/items"
tsv_directory.mkdir(parents=True)
sample_tsv = tsv_directory / "sample.tsv"
sample_notes_tsv = tsv_directory / "sample.notes.tsv"

# mock sample CIRCNOTE tsv
symphony_circnotes_tsv = source_dir / "sample.circnote.tsv"
symphony_circnotes_tsv.write_text(
"BARCODE\tCIRCNOTE\n45677 \tpencil marks 7/28/18cc"
)

column_transforms = [("CATKEY", lambda x: f"a{x}"),
("BARCODE", lambda x: x.strip())]
Expand All @@ -267,11 +277,47 @@ def test_transform_move_tsvs(mock_file_system):
airflow=airflow_path,
column_transforms=column_transforms,
source="symphony",
tsv_stem="sample"
)

f = open(sample_tsv, "r")
assert f.readlines()[1] == "a123456\tLC 12345\t45677\n"
f.close()

f_notes = open(sample_notes_tsv, "r")
assert f_notes.readlines()[1] == "a123456\tLC 12345\t45677\tpencil marks 7/28/18cc\n"
f_notes.close()


def test_transform_move_tsvs_doesnt_exit(mock_file_system):
airflow_path = mock_file_system[0]

with pytest.raises(ValueError, match="sample.tsv does not exist for workflow"):
transform_move_tsvs(
airflow=airflow_path,
source="symphony",
tsv_stem="sample"
)


def test_merge_notes_into_base():
base_df = pd.DataFrame([{"CATKEY": "a1442278",
"BARCODE": "36105033974929",
"BASE_CALL_NUMBER": "PQ6407 .A1 1980B"},
{"CATKEY": "a13776856",
"BARCODE": "36105231406765",
"BASE_CALL_NUMBER": "KGF3055 .M67 2019"}])
notes_df = pd.DataFrame([{"BARCODE": "36105033974929",
"CIRCNOTE": "pen marks 6/5/19cc"}])
base_df = _merge_notes_into_base(base_df, notes_df)
assert "CIRCNOTE" in base_df.columns

note_row = base_df.loc[base_df["BARCODE"] == "36105033974929"]
assert note_row["CIRCNOTE"].item() == "pen marks 6/5/19cc"

no_note_row = base_df.loc[base_df["BARCODE"] == "36105231406765"]
assert no_note_row["CIRCNOTE"].item() is np.nan


def test_process_records(mock_dag_run, mock_file_system):
airflow_path = mock_file_system[0]
Expand Down
5 changes: 4 additions & 1 deletion plugins/tests/test_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ def test_add_hrid(tmp_path): # noqa

items_path.write_text(f"{json.dumps(items_rec)}\n")

_add_hrid(str(holdings_path), str(items_path))
_add_hrid("https://okapi-endpoint.edu",
str(holdings_path),
str(items_path))

with items_path.open() as items_fo:
new_items_rec = json.loads(items_fo.readline())

assert(new_items_rec['hrid']) == "ai23456_1"
assert(new_items_rec['id']) == "f40ad979-32e8-5f54-bb3d-698c0f611a54"

0 comments on commit e07bcd6

Please sign in to comment.