# Upload Title CSV ‚Üí Query OCLC KB API ‚Üí kb:collection_name

This notebook:
1. Lets you upload a CSV of Titles.
2. Queries the OCLC Knowledge Base API for each Title.
3. Extracts **kb:collection_name** only (pipe-separated if multiple).
4. Writes results to a CSV.

## Prerequisites
- Your **OCLC WSKey** authorized for the KB API.
- A CSV with a column of titles (default column: `titles`).


Setup & Imports

In [None]:

# If needed (Colab usually has these ready), you can install:
# !pip install requests pandas

import re
import time
import unicodedata  # for diacritic removal
import requests
import pandas as pd
from google.colab import files

# ---- Configuration constants ----
KB_API_BASE = "https://worldcat.org/webservices/kb"  # KB API base (HTTPS)
REQUEST_TIMEOUT = 30
SLEEP_BETWEEN_CALLS = 0.3  # seconds
TITLE_COLUMN = "title"

# Prompt for your WSKey to avoid hard-coding secrets
WSKEY = input("Enter your OCLC WSKey: ").strip()
if not WSKEY:
    raise ValueError("WSKey is required.")


In [None]:

print("üì§ Please select your TITLE CSV file...")
uploaded = files.upload()

uploaded_df = None
for filename in uploaded.keys():
    print(f"‚úÖ Uploaded: {filename} ({len(uploaded[filename])} bytes)")
    try:
        df = pd.read_csv(filename, dtype=str)
        if TITLE_COLUMN not in df.columns:
            raise ValueError(
                f"Column '{TITLE_COLUMN}' not found. Columns: {df.columns.tolist()}"
            )
        df[TITLE_COLUMN] = df[TITLE_COLUMN].astype(str).str.strip()
        df = df.dropna(subset=[TITLE_COLUMN])
        uploaded_df = df
        print(f"‚úÖ Loaded {len(df)} rows. Ready to query.")
    except Exception as e:
        print(f"‚ùå Could not read CSV: {e}")
    break

if uploaded_df is None:
    raise RuntimeError("No valid CSV uploaded or missing required column.")

In [None]:

# Remove leading article with punctuation and quotes/brackets
_ARTICLE_PATTERN = re.compile(
    r"""^
        \s*
        [\"'\(\[\{‚Äú‚Äù‚Äò‚Äô]*      # optional leading quotes/brackets
        \s*
        \b(the|a|an)\b
        [\s:,\-\u2013\u2014]+  # space, colon, comma, dash, en/em dash
    """,
    re.IGNORECASE | re.VERBOSE
)

def strip_initial_article(title: str) -> str:
    if not isinstance(title, str) or not title.strip():
        return title
    cleaned = _ARTICLE_PATTERN.sub("", title)
    cleaned = cleaned.strip()
    return cleaned if cleaned else title.strip()

def strip_trailing_punct(title: str) -> str:
    # remove trailing .,;:!? and whitespace
    return re.sub(r"[\s\.\,\;\:\!\?]+$", "", title).strip()

def remove_diacritics(s: str) -> str:
    # Unicode NFKD normalization: remove combining marks (accents)
    if not s:
        return s
    nfkd = unicodedata.normalize("NFKD", s)
    return "".join(ch for ch in nfkd if not unicodedata.combining(ch))

def normalize_title_for_search(title: str) -> str:
    """Apply all normalizations to improve KB API matching."""
    if not title:
        return title
    t = title.strip()
    t = strip_initial_article(t)
    t = strip_trailing_punct(t)
    # ASCII-friendly pass for keyword search
    t_ascii = remove_diacritics(t)
    t_ascii = re.sub(r"\s+", " ", t_ascii).strip()
    return t_ascii or t


In [None]:

def kb_entries_search(params: dict) -> dict | list:
    """Call /rest/entries/search and return JSON (dict with 'entries' or list)."""
    url = f"{KB_API_BASE}/rest/entries/search"
    resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
    if resp.status_code != 200:
        return {}
    try:
        return resp.json()
    except Exception:
        return {}

def kb_openurl_resolve(rft_title: str) -> dict | list:
    """Call /openurl/resolve with rft.title and return JSON (dict or list)."""
    url = f"{KB_API_BASE}/openurl/resolve"
    params = {"rft.title": rft_title, "alt": "json", "wskey": WSKEY}
    resp = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
    if resp.status_code != 200:
        return {}
    try:
        return resp.json()
    except Exception:
        return {}

def _extract_collection_names(data) -> list:
    """
    Normalize KB/OpenURL responses into a list of kb:collection_name strings.
    Handles dict with 'entries' and raw list responses.
    """
    names = []
    entries = []
    if isinstance(data, dict):
        entries = data.get("entries", [])
    elif isinstance(data, list):
        entries = data

    for entry in entries:
        # Some responses may nest fields differently; we focus on kb:collection_name
        cname = entry.get("kb:collection_name")
        if cname:
            names.append(cname)
    # Unique while preserving order
    seen = set()
    uniq = []
    for n in names:
        if n not in seen:
            uniq.append(n)
            seen.add(n)
    return uniq

def fetch_kb_collection_names(title: str) -> (list, str):
    """
    Try exact title ‚Üí starts-with ‚Üí keyword(q), then OpenURL.
    Return (unique_names, strategy_used)
    """
    norm = normalize_title_for_search(title)

    # Pass 1: exact title
    p1 = {"title": norm, "alt": "json", "wskey": WSKEY}
    names = _extract_collection_names(kb_entries_search(p1))
    if names:
        return names, "entries.title=exact"

    # Pass 2: starts-with
    p2 = {"title": f"{norm}%", "alt": "json", "wskey": WSKEY}
    names = _extract_collection_names(kb_entries_search(p2))
    if names:
        return names, "entries.title=starts-with%"

    # Pass 3: keyword q
    p3 = {"q": norm, "alt": "json", "wskey": WSKEY}
    names = _extract_collection_names(kb_entries_search(p3))
    if names:
        return names, "entries.q=keyword"

    # Pass 4: OpenURL rft.title
    names = _extract_collection_names(kb_openurl_resolve(norm))
    if names:
        return names, "openurl.resolve"

    return [], "none"


In [None]:

OUTPUT_CSV = "/content/kb_collections_output.csv"

results = []
for i, title in enumerate(uploaded_df[TITLE_COLUMN].tolist(), start=1):
    if not title:
        continue
    names, strat = fetch_kb_collection_names(title)
    results.append({
        "title": title,
        "normalized_title": normalize_title_for_search(title),
        "kb:collection_name": "|".join(names) if names else None,
        "strategy_used": strat
    })
    time.sleep(SLEEP_BETWEEN_CALLS)
    if i % 25 == 0:
        print(f"Processed {i} titles...")

out_df = pd.DataFrame(results)
out_df.to_csv(OUTPUT_CSV, index=False)
print(f"‚úÖ Wrote {len(out_df)} rows to {OUTPUT_CSV}")

missing = out_df[out_df["kb:collection_name"].isna()]
print(f"‚ö†Ô∏è Missing collection names for {len(missing)} titles.")
display(missing.head(20))

files.download(OUTPUT_CSV)


In [None]:

test_titles = [
    "Aquatic Phycomycetes.",
    "Dental patterns in mice of the genus Peromyscus."
]

for t in test_titles:
    norm = normalize_title_for_search(t)
    print("\n---")
    print(f"Original: {t}")
    print(f"Normalized: {norm}")

    for label, params in [
        ("entries.title=exact", {"title": norm}),
        ("entries.title=starts-with%", {"title": f"{norm}%"}),
        ("entries.q=keyword", {"q": norm}),
    ]:
        params.update({"alt": "json", "wskey": WSKEY})
        data = kb_entries_search(params)
        names = _extract_collection_names(data)
        print(f"{label}: {len(names)} match(es)")
        for n in names[:5]:
            print("  ‚Ä¢", n)

    data = kb_openurl_resolve(norm)
    names = _extract_collection_names(data)
    print(f"openurl.resolve: {len(names)} match(es)")
    for n in names[:5]:
        print("  ‚Ä¢", n)


In [None]:
from google.colab import drive
drive.mount('/content/drive')