This file is a work in progress:
TODO: Rewrite it to scrape ALL entries in newspapers_all_years_updated.CSV, not just unmatched newspapers. Then write a script to query the Claude API with a prompt to turn the essays from the Chronicling America API results into structured JSON files (this might be best done in two stages - first pass to extract the editor/publisher timelines, then second pass to turn it into json). Then write a script to merge all of this new data onto master. I'd have to think through how to make sure that all the papers with data from both sources get merged properly, and one issue with this current script is that the json gets merged onto master and then some of these entries don't actually get matched back in, because the name is slightly different. So when they get added into master, I'd need to make sure that they had the exact same info as those in "newspapers_all_years_updated".

In [None]:
"""
Finds unmatched newspaper entries (missing master_id) in data/matches.csv,
joins with data/newspapers_all_years_updated.csv on ISSN to get the LCCN,
then queries the Library of Congress loc.gov API (current, post-Aug 2025)
for "about this newspaper" information.

NOTE: uses the current loc.gov API exclusively:
  - Item endpoint:      https://www.loc.gov/item/{lccn}/?fo=json
  - Collection search:  https://www.loc.gov/collections/chronicling-america/
                          ?fa=number_lccn:{lccn}&fo=json
  - General search:     https://www.loc.gov/search/?q={query}&fo=json

"""

import csv
import json
import re
import time
import argparse
import logging
import sys
import requests
import math
from pathlib import Path
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger(__name__)

MATCHES_CSV = "data/matches.csv"
NEWSPAPERS_CSV = "data/newspapers_all_years_updated.csv"

# ── Current loc.gov API endpoints (post-Aug 2025) ─────────────────────────
# Item endpoint: returns full bibliographic record for a newspaper title
LOC_ITEM_URL = "https://www.loc.gov/item/{lccn}/?fo=json"
# Collection search: search within Chronicling America by LCCN
LOC_COLLECTION_URL = "https://www.loc.gov/collections/chronicling-america/"
# Newspaper directory: search the full US newspaper directory
LOC_DIRECTORY_URL = "https://www.loc.gov/collections/directory-of-us-newspapers-in-american-libraries/"
# General search fallback
LOC_SEARCH_URL = "https://www.loc.gov/search/"

HEADERS = {"User-Agent": "NewspaperLookup/1.0 (research script)"}
REQUEST_TIMEOUT = 60  # loc.gov can be slow


def _build_session() -> requests.Session:
    """Build a requests session with automatic retries."""
    session = requests.Session()
    retries = Retry(
        total=3,
        backoff_factor=2,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"],
    )
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    session.headers.update(HEADERS)
    return session


SESSION = _build_session()


# HTML essay scraper 

def _html_to_text(raw_html: str) -> str:
    """Convert an HTML string (like the essay field) to clean plain text."""
    import html as html_mod
    # Replace </p> and <br> with newlines to preserve paragraph breaks
    text = re.sub(r'</p>', '\n\n', raw_html)
    text = re.sub(r'<br\s*/?>', '\n', text)
    # Strip remaining HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    # Decode HTML entities (&#160; &#699; &amp; etc.)
    text = html_mod.unescape(text)
    # Clean up whitespace within lines
    text = re.sub(r'[^\S\n]+', ' ', text)
    # Normalize paragraph breaks
    text = re.sub(r'\n\s*\n', '\n\n', text)
    return text.strip()


# CSV reading

def read_csv(filepath: str) -> list[dict]:
    """Read a CSV and return rows as dicts with stripped headers."""
    with open(filepath, newline="", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)
        reader.fieldnames = [h.strip() for h in reader.fieldnames]
        return list(reader)


def build_issn_to_lccn(newspapers_rows: list[dict]) -> dict[str, dict]:
    """
    Build a lookup from ISSN -> {lccn, name, town, state} using
    data/newspapers_all_years_updated.csv.
    """
    lookup = {}
    for row in newspapers_rows:
        issn = row.get("issn", "").strip()
        lccn = row.get("lccn", "").strip()
        if issn and lccn:
            lookup[issn] = {
                "lccn": lccn,
                "name": row.get("name", "").strip(),
                "town": row.get("town", "").strip(),
                "state": row.get("state", "").strip(),
            }
    return lookup


def find_unmatched(rows: list[dict]) -> list[dict]:
    """Return rows from matches.csv where master_id is empty."""
    return [row for row in rows if not row.get("master_id", "").strip()]


# API queries (all using current loc.gov API)

def query_loc_item(lccn: str) -> dict | None:
    """
    PRIMARY METHOD: Fetch newspaper metadata from the loc.gov item endpoint.
    URL: https://www.loc.gov/item/{lccn}/?fo=json

    The essay lives in item.essay as an HTML string.
    """
    url = LOC_ITEM_URL.format(lccn=lccn)
    try:
        resp = SESSION.get(url, timeout=REQUEST_TIMEOUT)
        if resp.status_code == 404:
            log.info(f"    loc.gov/item: 404 for LCCN {lccn}")
            return None
        resp.raise_for_status()
        data = resp.json()
        item = data.get("item", {})

        # The essay is in item.essay as HTML; convert to plain text
        essay_html = item.get("essay", "")
        essay_text = _html_to_text(essay_html) if essay_html else ""
        essay_contributor = item.get("essay_contributor", [])

        return {
            "source": "loc.gov/item",
            "lccn": lccn,
            "url": f"https://www.loc.gov/item/{lccn}/",
            "title": item.get("title", ""),
            "date": item.get("date", ""),
            "dates_of_publication": item.get("dates_of_publication", ""),
            "created_published": item.get("created_published", []),
            "description": item.get("description", []),
            "subject_headings": item.get("subject_headings", []),
            "notes": item.get("notes", []),
            "contributors": item.get("contributor_names", []),
            "location": item.get("location", []),
            "call_number": item.get("call_number", []),
            "medium": item.get("medium", ""),
            "issn": item.get("number_issn", item.get("number", [])),
            "oclc": item.get("number_oclc", []),
            "related_items": item.get("related_items", []),
            "other_title": item.get("other_title", []),
            "publication_frequency": item.get("publication_frequency", []),
            "rights": item.get("rights_advisory", []),
            "essay_html": essay_html,
            "essay": essay_text,
            "essay_contributor": essay_contributor,
        }
    except requests.exceptions.Timeout:
        log.warning(f"    loc.gov/item timed out for LCCN {lccn} (timeout={REQUEST_TIMEOUT}s)")
        return None
    except Exception as e:
        log.warning(f"    loc.gov/item failed for LCCN {lccn}: {e}")
        return None


def query_loc_collection(lccn: str) -> dict | None:
    """
    FALLBACK #1: Search the Chronicling America collection by LCCN.
    URL: https://www.loc.gov/collections/chronicling-america/
         ?fa=number_lccn:{lccn}&fo=json

    Returns summary info from the collection search results.
    """
    params = {
        "fa": f"number_lccn:{lccn}",
        "fo": "json",
        "c": 1,
    }
    try:
        resp = SESSION.get(LOC_COLLECTION_URL, params=params, timeout=REQUEST_TIMEOUT)
        resp.raise_for_status()
        data = resp.json()
        results = data.get("results", [])
        if not results:
            log.info(f"    collection search: no results for LCCN {lccn}")
            return None

        r = results[0]
        return {
            "source": "loc.gov/collections/chronicling-america",
            "lccn": lccn,
            "url": r.get("url", r.get("id", "")),
            "title": r.get("title", ""),
            "date": r.get("date", ""),
            "description": r.get("description", []),
            "subjects": r.get("subject", []),
            "location": r.get("location", []),
            "contributors": r.get("contributor", []),
            "total_results": data.get("pagination", {}).get("of", 0),
        }
    except requests.exceptions.Timeout:
        log.warning(f"    collection search timed out for LCCN {lccn}")
        return None
    except Exception as e:
        log.warning(f"    collection search failed for LCCN {lccn}: {e}")
        return None


def query_loc_directory(lccn: str) -> dict | None:
    """
    FALLBACK #2: Search the US Newspaper Directory collection.
    URL: https://www.loc.gov/collections/
         directory-of-us-newspapers-in-american-libraries/
         ?q={lccn}&fo=json

    Some newspapers are in the directory but not digitized in
    Chronicling America.
    """
    params = {
        "q": lccn,
        "fo": "json",
        "c": 5,
    }
    try:
        resp = SESSION.get(LOC_DIRECTORY_URL, params=params, timeout=REQUEST_TIMEOUT)
        resp.raise_for_status()
        data = resp.json()
        results = data.get("results", [])
        if not results:
            log.info(f"    directory search: no results for LCCN {lccn}")
            return None

        r = results[0]
        return {
            "source": "loc.gov/collections/directory",
            "lccn": lccn,
            "url": r.get("url", r.get("id", "")),
            "title": r.get("title", ""),
            "date": r.get("date", ""),
            "description": r.get("description", []),
            "subjects": r.get("subject", []),
            "location": r.get("location", []),
        }
    except requests.exceptions.Timeout:
        log.warning(f"    directory search timed out for LCCN {lccn}")
        return None
    except Exception as e:
        log.warning(f"    directory search failed for LCCN {lccn}: {e}")
        return None


def query_loc_search_fallback(name: str, issn: str) -> dict | None:
    """
    LAST RESORT: Search loc.gov generally by newspaper name or ISSN.
    Used when we have no LCCN at all.
    """
    queries = []
    if issn:
        queries.append(issn)
    if name:
        queries.append(name)

    for q in queries:
        params = {
            "q": q,
            "fa": "original_format:newspaper",
            "fo": "json",
            "c": 5,
        }
        try:
            resp = SESSION.get(LOC_SEARCH_URL, params=params, timeout=REQUEST_TIMEOUT)
            resp.raise_for_status()
            data = resp.json()
            results = data.get("results", [])
            if results:
                r = results[0]
                return {
                    "source": "loc.gov/search (fallback)",
                    "title": r.get("title", ""),
                    "date": r.get("date", ""),
                    "description": r.get("description", []),
                    "subjects": r.get("subject", []),
                    "location": r.get("location", []),
                    "url": r.get("url", r.get("id", "")),
                    "contributors": r.get("contributor", []),
                }
        except Exception as e:
            log.warning(f"    search fallback failed for '{q}': {e}")

    return None


# Main orchestration

def lookup_newspaper(row: dict, issn_to_lccn: dict, delay: float) -> dict:
    """
    Look up a single unmatched newspaper entry using a cascade:
      1. Join on ISSN to get LCCN
      2. Try loc.gov/item/{lccn}/?fo=json          (primary — full record)
      3. Try loc.gov/collections/chronicling-america (fallback — search)
      4. Try loc.gov/collections/directory           (fallback — directory)
      5. Try loc.gov/search by name/ISSN            (last resort)
    """
    name = row.get("newspapers_all_years_name", "").strip()
    issn = row.get("issn", "").strip()
    master_name = row.get("master_name", "").strip()

    log.info(f"Looking up: '{name}' (ISSN: {issn})")

    result = {
        "input": {"name": name, "issn": issn, "master_name": master_name},
        "lccn": None,
        "lccn_source": None,
        "api_result": None,
    }

    # Step 1: Try to get LCCN from the newspapers CSV
    lccn_info = issn_to_lccn.get(issn)
    if lccn_info:
        lccn = lccn_info["lccn"]
        result["lccn"] = lccn
        result["lccn_source"] = "newspapers_all_years_updated.csv"
        log.info(f"  LCCN from CSV: {lccn}")

        # Step 2: Primary — loc.gov item endpoint
        time.sleep(delay)
        api = query_loc_item(lccn)
        if api:
            result["api_result"] = api
            log.info(f"  ✓ loc.gov/item: {api['title']}")
            return result

        # Step 3: Fallback — Chronicling America collection search
        time.sleep(delay)
        api = query_loc_collection(lccn)
        if api:
            result["api_result"] = api
            log.info(f"  ✓ collection search: {api['title']}")
            return result

        # Step 4: Fallback — US Newspaper Directory
        time.sleep(delay)
        api = query_loc_directory(lccn)
        if api:
            result["api_result"] = api
            log.info(f"  ✓ directory search: {api['title']}")
            return result

    else:
        result["lccn_source"] = "not_found"
        log.info(f"  No LCCN found via ISSN join")

    # Step 5: Last resort — general search
    log.info(f"  Falling back to general search...")
    time.sleep(delay)
    api = query_loc_search_fallback(name, issn)
    if api:
        result["api_result"] = api
        log.info(f"  ✓ search fallback: {api['title']}")
    else:
        log.info(f"  ✗ No results from any source")

    return result


def main(
    matches: str = MATCHES_CSV,
    newspapers: str = NEWSPAPERS_CSV,
    output: str = "data/unmatched_results.json",
    delay: float = 2.0,
    limit: int | None = None,
):
    """
    Can be called directly from a notebook:
        main(limit=3)
    Or from the command line:
        python loc_newspaper_lookup.py --limit 3
    """
    # If running from command line (not Jupyter), parse args
    if not any("jupyter" in arg.lower() or "ipykernel" in arg.lower() for arg in sys.argv):
        parser = argparse.ArgumentParser(
            description="Look up unmatched newspaper entries via Library of Congress APIs"
        )
        parser.add_argument("--matches", default=matches)
        parser.add_argument("--newspapers", default=newspapers)
        parser.add_argument("--output", "-o", default=output)
        parser.add_argument("--delay", "-d", type=float, default=delay)
        parser.add_argument("--limit", "-l", type=int, default=limit)
        args = parser.parse_args()
        matches, newspapers, output, delay, limit = (
            args.matches, args.newspapers, args.output, args.delay, args.limit
        )

    # Load data
    log.info(f"Reading {matches} ...")
    matches_rows = read_csv(matches)
    log.info(f"  {len(matches_rows)} total rows")

    log.info(f"Reading {newspapers} ...")
    newspapers_rows = read_csv(newspapers)
    log.info(f"  {len(newspapers_rows)} total rows")

    issn_to_lccn = build_issn_to_lccn(newspapers_rows)
    log.info(f"  Built ISSN→LCCN lookup with {len(issn_to_lccn)} entries")

    # Find unmatched
    unmatched = find_unmatched(matches_rows)
    log.info(f"Found {len(unmatched)} unmatched entries (missing master_id)")

    if not unmatched:
        log.info("Nothing to look up. Exiting.")
        return

    if limit:
        unmatched = unmatched[: limit]
        log.info(f"Processing first {len(unmatched)} entries (--limit)")

    # Preview LCCN join coverage
    have_lccn = sum(1 for r in unmatched if r.get("issn", "").strip() in issn_to_lccn)
    log.info(f"Of {len(unmatched)} unmatched, {have_lccn} have LCCNs via ISSN join, "
             f"{len(unmatched) - have_lccn} will need search fallback")

    # Look up each entry
    results = []
    for i, row in enumerate(unmatched, 1):
        log.info(f"─── [{i}/{len(unmatched)}] ───")
        result = lookup_newspaper(row, issn_to_lccn, delay)
        results.append(result)

    # Write output
    output_path = Path(output)
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

    # Summary
    lccn_found = sum(1 for r in results if r["lccn"])
    api_found = sum(1 for r in results if r["api_result"])
    by_source = {}
    for r in results:
        if r["api_result"]:
            src = r["api_result"]["source"]
            by_source[src] = by_source.get(src, 0) + 1

    log.info("═══ Summary ═══")
    log.info(f"  Total unmatched:          {len(results)}")
    log.info(f"  LCCN found via ISSN:      {lccn_found}")
    log.info(f"  API results found:        {api_found}")
    for src, count in sorted(by_source.items()):
        log.info(f"    {src}: {count}")
    log.info(f"  No results at all:        {len(results) - api_found}")
    log.info(f"  Results saved to: {output_path}")


def split_results(
    input_file: str = "data/unmatched_results.json",
    output_dir: str = "data/batches",
    batch_size: int = 25,
):
    """
Reads unmatched_results.json and splits it into JSON files of 25 entries each,
keeping only issn, essay, and notes.
"""
    with open(input_file, encoding="utf-8") as f:
        data = json.load(f)

    Path(output_dir).mkdir(exist_ok=True)

    num_batches = math.ceil(len(data) / batch_size)

    for i in range(num_batches):
        batch = data[i * batch_size : (i + 1) * batch_size]

        slim = []
        for entry in batch:
            issn = entry.get("input", {}).get("issn", "")
            api = entry.get("api_result") or {}
            essay = api.get("essay", "")
            notes = api.get("notes", [])

            slim.append({
                "issn": issn,
                "date": api.get("date", ""),
                "dates_of_publication": api.get("dates_of_publication", ""),
                "created_published": api.get("created_published", []),
                "essay": essay,
                "notes": notes,
            })

        out_path = Path(output_dir) / f"batch_{i + 1}.json"
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(slim, f, indent=2, ensure_ascii=False)

        print(f"Wrote {len(slim)} entries to {out_path}")

    print(f"\nDone: {len(data)} entries across {num_batches} files in {output_dir}/")

    split_results()


if __name__ == "__main__":
    main()
    split_results()

In [None]:
# This cell adds entries to Master using data from Chronicling America.

import pandas as pd
import json
import glob

df = pd.read_csv("data/master.csv", dtype=str, keep_default_na=False)

# Add data_source column
df["data_source"] = ""

cols = df.columns.tolist()

# Load all JSON files in the folder
all_new_rows = []
files = sorted(glob.glob("data/essay_derived_rows/*.json"))

for filepath in files:
    with open(filepath, "r") as f:
        new_rows = json.load(f)
    for row in new_rows:
        r = {c: "" for c in cols}
        r.update({k: v for k, v in row.items() if k in cols})
        r["data_source"] = "essay_derived"
        all_new_rows.append(r)
    print(f"  Loaded {len(new_rows)} rows from {filepath}")

new_df = pd.DataFrame(all_new_rows, columns=cols)
df_out = pd.concat([df, new_df], ignore_index=True)
df_out.to_csv("data/master.csv", index=False)

print(f"\nOriginal rows: {len(df)}")
print(f"Files processed: {len(files)}")
print(f"New rows appended: {len(all_new_rows)}")
print(f"Total rows: {len(df_out)}")
print("Saved to data/master.csv")