<a href="https://colab.research.google.com/github/ysugiyama3/google_colab/blob/master/cataloging_copy_lookup_wcapi_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Cataloging copy lookup program - WorldCat Search API V2**

Last updated: 3/13/2024

Please contact yukari.sugiyama@yale.edu if you have any issues, questions, or suggestions.

---
#### **About the program**
Using Python 3 and OCLC WorldCat Search API ***v2***, this program automates the work of cataloing copy search. It automatically searches WorldCat for good cataloging copy by LCCN, ISBN, and OCLC numbers. The process considers OCLC records with English cataloging language, encoding levels (blank, 1, 4, I, M) and LC Subject Headings (except for fiction) as good cataloging copy.

#### **What you need**
*   **Include your Basic Auth Header info in line 29 in the first step's code**
*   Your own OCLC <a href='https://www.oclc.org/developer/develop/authentication/how-to-request-a-wskey.en.html'>WorldCat Search API key.</a>
*   Excel spreadsheet in which **the 1st, 2nd, 3rd columns must be assigned for LCCN, ISBN, and OCLC number respectively.** The spreadsheet can have as many columns as necesary and must have column headers.

#### **How to run the program**
* Execute each step by simply clicking the play button (watch a video demo <a href='https://youtu.be/qAaI75ZTPoA'>here</a>)
* To start over, please go to the menu, go to "Runtime" and then select "Disconnect and delete runtime."
* To clear output, please go to "Edit" and then select "Clear all outputs."
---

In [None]:
#@title <--- Upload Excel file

from google.colab import files
import pandas as pd
import requests
import json
from IPython.display import HTML, display
import time
import re
from getpass import getpass
import matplotlib.pyplot as plt


def progress(value, max=50000):
    return HTML("""
        <progress
            value='{value}'
            max='{max}',
            style='width: 40%'
        >
            {value}
        </progress>
        <br>{value}/{max}</br>
    """.format(value=value, max=max))

def get_token():
    url = "https://oauth.oclc.org/token?grant_type=client_credentials&scope=wcapi"
    payload = {}
    headers = {'Authorization': 'Basic [PUT YOUR BASIC AUTH HEADER HERE]'}
    auth = requests.request('POST', url, headers=headers, data=payload)
    a = json.loads(auth.text)
    token = a["access_token"]
    return token

def clean_lccn(lccn):
    if lccn is None or pd.isnull(lccn) or pd.isna(lccn) or str(lccn).strip() == '':
        lccn = None
    elif len(str(lccn)) > 0:
        lccn = str(lccn)
        lccn = re.sub(r'\..*', '', lccn).strip()
    else:
        lccn = None
    return lccn

def clean_isbn(isbn):
    if isbn is None or pd.isnull(isbn) or pd.isna(isbn) or str(isbn).strip() == '':
        isbn = None
    elif isinstance(isbn, int):
        isbn = str(isbn)
        isbn = re.sub(r'[\(|\:|\.].*', '', isbn)
        isbn = re.sub(r'[^0-9Xx]', '', isbn).strip()
    else:
        isbn = str(isbn)
        isbn = re.sub(r'[\(|\:|\.].*', '', isbn)
        isbn = re.sub(r'[^0-9Xx]', '', isbn).strip()
    return isbn

def clean_oclc(oclc):
    if oclc is None or pd.isnull(oclc) or pd.isna(oclc) or str(oclc).strip() == '':
        oclc = None
    elif len(str(oclc.strip())) > 0:
        oclc = str(oclc)
        if oclc.startswith('(OCoLC)') or oclc.startswith('ocn') or oclc.startswith('ocm') or oclc.startswith('on'):
            oclc = re.sub(r'[^0-9]*','',oclc).strip()
        else:
            oclc = None
    else:
        oclc = None
    return oclc

def search_by_lccn(lccn):
    if lccn is None:
        return False
    else:
        url = 'https://americas.discovery.api.oclc.org/worldcat/search/v2/bibs?q=dn:' + lccn + '&inCatalogLanguage=eng&itemSubType=book-printbook&orderBy=bestMatch&limit=1'
        return check_results(url)

def search_by_isbn(isbn):
    if isbn is None:
        return False
    else:
        url = 'https://americas.discovery.api.oclc.org/worldcat/search/v2/bibs?q=bn:' + isbn + '&inCatalogLanguage=eng&itemSubType=book-printbook&orderBy=bestMatch&limit=1'
        return check_results(url)

def search_by_oclc(oclc):
    if oclc is None:
        return False
    else:
        url = 'https://americas.discovery.api.oclc.org/worldcat/search/v2/bibs?q=no:' + oclc + '&inCatalogLanguage=eng&itemSubType=book-printbook&orderBy=bestMatch&limit=1'
        return check_results(url)

def get_oclc(j):
    try:
        o = j['bibRecords'][0]['identifier']['oclcNumber']
    except:
        o = ''
    return o

def get_subjects(j):
    try:
        s = j['bibRecords'][0]['subjects']
    except:
        s = []
    return s

def get_material_type(j):
    try:
        m = j['bibRecords'][0]['format']['materialTypes']
    except:
        m = []
    return m

def get_call_no(j):
    try:
        c = j['bibRecords'][0]['classification']['lc']
    except:
        c = ''
    return c

def check_lcsh(s, m):
    try:
        for x in s:
            if x['vocabulary'] == 'Library of Congress Subject Headings':
                return 'Yes'
        if 'fic' in m:
            return 'Yes (Fiction)'
        else:
            return 'No'
    except:
        return 'No'

def check_elvl(oclc):
    url = 'https://americas.discovery.api.oclc.org/worldcat/search/v2/brief-bibs/' + oclc
    payload = {}
    headers = {
      'Authorization': 'Bearer ' + token
    }
    response = requests.request("GET", url, headers=headers, data=payload)
    r = json.loads(response.text)
    try:
        elvl = r['catalogingInfo']['levelOfCataloging']
    except:
        elvl = '?'
    return elvl

def check_results(url):
    global output_df, hascopy, count, token
    payload = {}
    if count % 1000 == 0:
      token = get_token()
    headers = {'Authorization': 'Bearer ' + token}
    try:

        result = requests.request("GET", url, headers=headers, data=payload)
        r = json.loads(result.text)
        oclc = get_oclc(r)
        subjects = get_subjects(r)
        material_type = get_material_type(r)
        call_no = get_call_no(r)
        lcsh = check_lcsh(subjects, material_type)
        elvl = check_elvl(oclc)

        if len(oclc) > 0:
            hascopy += 1
            output_df.loc[index, '[MATCH_OCLC_NO]'] = oclc
            output_df.loc[index, '[SUBJECT]'] = lcsh
            output_df.loc[index, '[LC_CALL_NUMBER]'] = call_no
            output_df.loc[index, '[ELVL]'] = elvl
            return True
    except:
        return False
    return False

# Upload an input Excel file
uploaded = files.upload()
input_name = str(list(uploaded.keys())[0])

# Read an input Excel file into a pandas DataFrame
input_df = pd.read_excel(input_name)

# Create an output Excel file based on input excel file
output_name = input_name.rsplit( ".", 1 )[0] + "_output.xlsx"

# Create an output DataFrame
output_df = input_df.copy()
output_df = pd.concat([input_df, pd.DataFrame(columns = [ '[MATCH_OCLC_NO]', '[SUBJECT]', '[LC_CALL_NUMBER]', '[ELVL]'])], sort=False)

# number of rows
total = len(input_df.index)

# count
count = 0
hascopy = 0

# OCLC Search API access token
token = get_token()

out = display(progress(0, total), display_id=True)

for index, row in output_df.iterrows():
    count += 1
    time.sleep(0.02)
    out.update(progress(count, total))

    lccn = clean_lccn(row[0])
    if not search_by_lccn(lccn):
        isbn = clean_isbn(row[1])
        if not search_by_isbn(isbn):
              oclc = clean_oclc(row[2])
              if not search_by_oclc(oclc):
                  oclc = lcsh = call_no = elvl = ''
                  output_df.loc[index, '[MATCH_OCLC_NO]'] = oclc
                  output_df.loc[index, '[SUBJECT]'] = lcsh
                  output_df.loc[index, '[LC_CALL_NUMBER]'] = call_no
                  output_df.loc[index, '[ELVL]'] = elvl

try:
    output_df.to_excel(output_name, index=False)
except:
    output_name = output_name.rsplit( ".", 1 )[0] + '.csv'
    output_df.to_csv(output_name, index=False, encoding='utf-8')

# Visualize the result
labels = 'Has copy', 'No copy'
sizes = [hascopy, total-hascopy]
colors = ['c', 'y']
explode = (0.1, 0.0)

total = sum(sizes)

plt.pie(sizes, explode=explode, labels=labels, colors=colors,
        autopct=lambda s: '{:.0f}'.format(s * total / 100), shadow=True, startangle=90)

plt.axis('equal')
plt.title(input_name + ' Results')
plt.show()

# Automatically download the output file
files.download(output_name)
print('\nDone! \U0001F60E\n')


In [None]:
#@title <--- Preview results (Optional) { vertical-output: true }
output_df