Skip to content

Commit

Permalink
Merge 3e74710 into f8f5ffd
Browse files Browse the repository at this point in the history
  • Loading branch information
jermnelson committed Mar 27, 2024
2 parents f8f5ffd + 3e74710 commit c577657
Show file tree
Hide file tree
Showing 4 changed files with 643 additions and 5 deletions.
18 changes: 13 additions & 5 deletions libsys_airflow/dags/data_exports/oclc_transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from airflow.operators.empty import EmptyOperator

from libsys_airflow.plugins.data_exports.transmission_tasks import (
gather_files_task,
transmit_data_ftp_task,
archive_transmitted_data_task,
gather_oclc_files_task,
transmit_data_oclc_api_task,
)

logger = logging.getLogger(__name__)
Expand All @@ -35,9 +35,17 @@ def send_oclc_records():

end = EmptyOperator(task_id="end")

gather_files = gather_files_task(vendor="oclc")

transmit_data = transmit_data_ftp_task("google", gather_files)
gather_files = gather_oclc_files_task()

transmit_data = transmit_data_oclc_api_task(
[
"http-web.oclc-Business" "http-web.oclc-Hoover",
"http-web.oclc-Lane",
"http-web.oclc-Law",
"http-web.oclc-SUL",
],
gather_files,
)

archive_data = archive_transmitted_data_task(transmit_data['success'])

Expand Down
174 changes: 174 additions & 0 deletions libsys_airflow/plugins/data_exports/oclc_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import json
import logging
import pathlib
import uuid

import httpx
import pymarc

from typing import List, Union

from libsys_airflow.plugins.folio_client import folio_client

logger = logging.getLogger(__name__)


class OCLCAPIWrapper(object):
# Helper class for transmitting MARC records to OCLC Worldcat API

auth_url = "https://oauth.oclc.org/token?grant_type=client_credentials&scope=WorldCatMetadataAPI"
worldcat_metadata_url = "https://metadata.api.oclc.org/worldcat"

def __init__(self, **kwargs):
self.oclc_headers = None
user = kwargs["user"]
password = kwargs["password"]
self.snapshot = None
self.httpx_client = None
self.__authenticate__(user, password)
self.folio_client = folio_client()

def __del__(self):
if self.httpx_client:
self.httpx_client.close()
# ! Close snapshot

def __authenticate__(self, username, passphrase) -> None:
try:
self.httpx_client = httpx.Client()

result = self.httpx_client.post(
url=OCLCAPIWrapper.auth_url, auth=(username, passphrase)
)
logger.info("Retrieved API Access Token")
token = result.json()["access_token"]
self.oclc_headers = {
"Authorization": token,
"Content-type": "application/marc",
}
except Exception as e:
msg = "Unable to Retrieve Access Token"
logger.error(msg)
raise Exception(msg, e)

def __generate_snapshot__(self) -> None:
snapshot_uuid = str(uuid.uuid4())
post_result = self.httpx_client.post(
f"{self.folio_client.okapi_url}source-storage/snapshots",
headers=self.folio_client.okapi_headers,
json={"jobExecuteionId": snapshot_uuid, "status": "NEW"},
)
post_result.raise_for_status()
self.snapshot = snapshot_uuid

def __read_marc_files__(self, marc_files: list) -> list:
records = []
for marc_file in marc_files:
marc_file_path = pathlib.Path(marc_file)
if marc_file_path.exists():
with marc_file_path.open('rb') as fo:
marc_reader = pymarc.MARCReader(fo)
records.extend([r for r in marc_reader])
return records

def __srs_uuid__(self, record) -> Union[str, None]:
srs_uuid = None
for field in record.get_fields("999"):
if field.indicators == ["f", "f"]:
srs_uuid = field["s"]
if srs_uuid is None:
logger.error("Record Missing SRS uuid")
return srs_uuid

def __update_035__(self, oclc_put_result: bytes, record: pymarc.Record) -> None:
"""
Extracts 035 field with new OCLC number adds to existing MARC21
record
"""
oclc_record = pymarc.Record(data=oclc_put_result) # type: ignore
fields_035 = oclc_record.get_fields('035')
for field in fields_035:
subfields_a = field.get_subfields("a")
for subfield in subfields_a:
if subfield.startswith("(OCoLC"):
record.add_ordered_field(field)
break

def put_folio_record(self, srs_uuid: str, record: pymarc.Record) -> bool:
"""
Updates FOLIO SRS with updated MARC record with new OCLC Number
in the 035 field
"""
marc_json = record.as_json()
if self.snapshot is None:
self.__generate_snapshot__()

put_result = self.httpx_client.put(
f"{self.folio_client.okapi_url}source-storage/records/{srs_uuid}",
headers=self.folio_client.okapi_headers,
json={
"snapshotId": self.snapshot,
"matchedId": srs_uuid,
"recordType": "MARC_BIB",
"rawRecord": {"content": json.dumps(marc_json)},
"parsedRecord": {"content": marc_json},
},
)
if put_result.status_code != 200:
logger.error(f"Failed to update FOLIO for SRS {srs_uuid}")
return False
return True

def new(self, marc_files: List[str]) -> dict:
output: dict = {"success": [], "failures": []}
if len(marc_files) < 1:
logger.info("No new marc records")
return output
marc_records = self.__read_marc_files__(marc_files)

for record in marc_records:
srs_uuid = self.__srs_uuid__(record)
if srs_uuid is None:
continue
new_record_result = self.httpx_client.post(
f"{OCLCAPIWrapper.worldcat_metadata_url}/manage/bibs",
headers=self.oclc_headers,
data=record.as_marc21(),
)

if new_record_result.status_code != 200:
logger.error(
f"Failed to create record, error: {new_record_result.text}"
)
output['failures'].append(srs_uuid)
continue
self.__update_035__(new_record_result.content, record)
if not self.put_folio_record(srs_uuid, record):
output['failures'].append(srs_uuid)
continue
output['success'].append(srs_uuid)
return output

def update(self, marc_files: List[str]):
output: dict = {"success": [], "failures": []}
if len(marc_files) < 1:
logger.info("No updated marc records")
return output
marc_records = self.__read_marc_files__(marc_files)

for record in marc_records:
srs_uuid = self.__srs_uuid__(record)
if srs_uuid is None:
continue
post_result = self.httpx_client.post(
f"{OCLCAPIWrapper.worldcat_metadata_url}/manage/institution/holdings/set",
headers=self.oclc_headers,
data=record.as_marc21(),
)
if post_result.status_code != 200:
logger.error(f"Failed to update record, error: {post_result.text}")
output['failures'].append(srs_uuid)
continue
# !Need to update OCLC code in 035 field
output['success'].append(srs_uuid)
return output
54 changes: 54 additions & 0 deletions libsys_airflow/plugins/data_exports/transmission_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from airflow.models.connection import Connection
from airflow.providers.ftp.hooks.ftp import FTPHook

from libsys_airflow.plugins.data_exports.oclc_api import OCLCAPIWrapper

logger = logging.getLogger(__name__)


Expand All @@ -23,6 +25,31 @@ def gather_files_task(**kwargs) -> list:
return [str(p) for p in marc_filepath.glob("*.mrc")]


@task(multiple_outputs=True)
def gather_oclc_files_task(**kwargs) -> dict:
"""
Gets new and updated MARC files by library (SUL, Business, Hoover, and Law)
to send to OCLC
"""
airflow = kwargs.get("airflow", "/opt/airflow")
libraries: dict = {
"S7Z": {}, # Business
"HIN": {}, # Hoover
"CASUM": {}, # Lane
"RCJ": {}, # Law
"STF": {}, # SUL
}
oclc_directory = Path(airflow) / "data-export-files/oclc/marc-files/"
for marc_file_path in oclc_directory.glob("*.mrc"):
file_parts = marc_file_path.name.split("-")
library, type_of = file_parts[1], file_parts[2]
if type_of in libraries[library]:
libraries[library][type_of].append(str(marc_file_path))
else:
libraries[library][type_of] = [str(marc_file_path)]
return libraries


@task(multiple_outputs=True)
def transmit_data_http_task(conn_id, local_files, **kwargs) -> dict:
"""
Expand Down Expand Up @@ -79,6 +106,33 @@ def transmit_data_ftp_task(conn_id, local_files) -> dict:
return {"success": success, "failures": failures}


@task
def transmit_data_oclc_api_task(connection_details, libraries) -> dict:
connection_lookup, success, failures = {}, {}, {}
for conn_id in connection_details:
connection = Connection.get_connection_from_secrets(conn_id)
oclc_code = connection.xtra_dejson["oclc_code"]
connection_lookup[oclc_code] = {
"username": connection.login,
"password": connection.password,
}

for library, records in libraries.items():
oclc_api = OCLCAPIWrapper(
user=connection_lookup[library]["username"],
password=connection_lookup[library]["password"],
)
new_result = oclc_api.new(records['new'])
success[library] = new_result['success']
failures[library] = new_result['failures']

updated_result = oclc_api.update(records['update'])
success[library].extend(updated_result['success'])
failures[library].extend(updated_result['failures'])

return {"success": success, "failures": failures}


@task
def archive_transmitted_data_task(files):
"""
Expand Down
Loading

0 comments on commit c577657

Please sign in to comment.