In [1]:
import os, re, json, time, math
from difflib import SequenceMatcher
from urllib.parse import quote_plus

import pandas as pd
import requests

INPUT_CSV  = "biblio_longform_all_records.csv"
OUTPUT_CSV = "biblio_longform_all_records_enriched.csv"
PROGRESS_JSON = "biblio_enrich_progress.json"

# --- Networking ---
SESSION = requests.Session()
SESSION.headers.update({
    "User-Agent": "biblio-enricher/1.0 (academic research; contact: pietro.terna@unito.it)"
})
TIMEOUT = 30
SLEEP_BETWEEN_CALLS = 0.25  # prudenziale per non martellare i servizi

# --- Matching ---
TOPK_PER_SOURCE = 5

def _sleep():
    time.sleep(SLEEP_BETWEEN_CALLS)

def norm(s: str) -> str:
    if s is None:
        return ""
    s = str(s).strip().lower()
    s = re.sub(r"\s+", " ", s)
    s = re.sub(r"[“”\"'’`]", "", s)
    s = re.sub(r"[^0-9a-zàèéìòùçäëïöüßñ \-:/]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def sim(a: str, b: str) -> float:
    a, b = norm(a), norm(b)
    if not a or not b:
        return 0.0
    return SequenceMatcher(None, a, b).ratio()

def extract_year(s: str):
    if not s:
        return None
    m = re.search(r"(1[5-9]\d{2}|20\d{2})", str(s))
    return int(m.group(1)) if m else None

def safe_get(d, *keys, default=None):
    cur = d
    for k in keys:
        if isinstance(cur, dict) and k in cur:
            cur = cur[k]
        else:
            return default
    return cur

def load_progress():
    if os.path.exists(PROGRESS_JSON):
        with open(PROGRESS_JSON, "r", encoding="utf-8") as f:
            return json.load(f)
    return {"done_n_scheda": [], "last_index": 0}

def save_progress(prog):
    with open(PROGRESS_JSON, "w", encoding="utf-8") as f:
        json.dump(prog, f, ensure_ascii=False, indent=2)

print("Setup OK.")


Setup OK.


In [2]:
df = pd.read_csv(INPUT_CSV)

# Sanity check
assert set(df.columns) >= {"n_scheda","campo","valore"}, "CSV non nel formato atteso (n_scheda,campo,valore)."

# Raggruppo per scheda (libro)
groups = list(df.groupby("n_scheda", sort=True))
n_books = len(groups)

print("Righe:", len(df), " | Libri (n_scheda distinti):", n_books)
print("Esempio campi:", sorted(df["campo"].unique())[:20])


Righe: 11477  | Libri (n_scheda distinti): 1000
Esempio campi: ['autore', 'cod_isbn', 'cod_sez', 'collabor', 'collezion', 'data_pub', 'editore', 'edizione', 'formato', 'luogo_pub', 'n_s_sez', 'n_scheda', 'note_sp', 'pagine', 'soggetto', 'titolo']


In [3]:
def book_input_from_group(g: pd.DataFrame) -> dict:
    # prende la "migliore" occorrenza per campo
    # (se ci sono ripetizioni, tiene la prima non vuota)
    rec = {"n_scheda": int(g["n_scheda"].iloc[0])}

    def first_value(field):
        vals = g.loc[g["campo"] == field, "valore"].dropna().astype(str).tolist()
        vals = [v.strip() for v in vals if v.strip() and v.strip().lower() != "nan"]
        return vals[0] if vals else ""

    rec["autore"] = first_value("autore")
    rec["titolo"] = first_value("titolo")
    rec["editore"] = first_value("editore")
    rec["luogo_pub"] = first_value("luogo_pub")
    rec["data_pub"] = first_value("data_pub")
    rec["cod_isbn"] = first_value("cod_isbn")

    rec["year"] = extract_year(rec["data_pub"]) or extract_year(rec["titolo"])
    # query “compatta” per motori che accettano full text
    q_parts = [rec["titolo"], rec["autore"], str(rec["year"]) if rec["year"] else ""]
    rec["q"] = " ".join([p for p in q_parts if p]).strip()

    return rec

# preview di un libro
sample = book_input_from_group(groups[0][1])
sample


{'n_scheda': 41186,
 'autore': 'CARINGTON, WHATELY',
 'titolo': '',
 'editore': 'ASTROLABIO',
 'luogo_pub': 'ROMA',
 'data_pub': 'COPYRIGHT 1972',
 'cod_isbn': '',
 'year': 1972,
 'q': 'CARINGTON, WHATELY 1972'}

In [4]:
def sbn_search(q: str, rows=TOPK_PER_SOURCE):
    # endpoint app mobile (non ufficiale)
    url = f"https://opac.sbn.it/opacmobilegw/search.json?any={quote_plus(q)}&type=0&start=0&rows={rows}"
    r = SESSION.get(url, timeout=TIMEOUT)
    _sleep()
    r.raise_for_status()
    return r.json()

def sbn_full(bid: str):
    # bid in forma "IT\\ICCU\\..." nella search; in URL bisogna passarlo con backslash singoli
    url = f"https://opac.sbn.it/opacmobilegw/full.json?bid={quote_plus(bid)}"
    r = SESSION.get(url, timeout=TIMEOUT)
    _sleep()
    r.raise_for_status()
    return r.json()

def sbn_candidates(book: dict):
    if not book["q"]:
        return []
    try:
        js = sbn_search(book["q"])
    except Exception:
        return []

    hits = js.get("records") or js.get("record") or js.get("results") or js.get("docs") or js.get("elenco") or js.get("items")
    # in molte risposte reali l'elenco è in chiave "records"; ma manteniamo fallback
    if hits is None:
        hits = js.get("records", [])

    if not isinstance(hits, list):
        hits = []

    cands = []
    for h in hits[:TOPK_PER_SOURCE]:
        title = h.get("titolo") or ""
        auth  = h.get("autorePrincipale") or ""
        pub   = h.get("pubblicazione") or ""
        bid   = h.get("codiceIdentificativo") or ""

        score = 0.70 * sim(book["titolo"], title) + 0.30 * sim(book["autore"], auth)
        by = extract_year(pub) or None
        if book.get("year") and by and abs(book["year"] - by) <= 1:
            score += 0.05

        cands.append({"source":"OPAC SBN", "score":score, "title":title, "author":auth, "pub":pub, "bid":bid, "raw":h})

    # arricchisco col full.json solo per i migliori 2 (per ridurre traffico)
    cands = sorted(cands, key=lambda x: x["score"], reverse=True)
    for c in cands[:2]:
        if c.get("bid"):
            try:
                full = sbn_full(c["bid"])
                c["full"] = full
            except Exception:
                c["full"] = None
    return cands
