# DNB MARC Record Fetching Workflow

This workflow automates the retrieval, parsing, and saving of bibliographic records from the German National Library (DNB) for a given list of titles and authors. It includes functions to query the DNB SRU interface, handle pagination, parse MARCXML data into structured fields (titles, authors, subjects, languages, publication info, etc.), recursively fetch related records, and incrementally save progress to a CSV file while avoiding duplicates.

In [14]:
import os
import re
import time
import requests
import pandas as pd
import unicodedata
from bs4 import BeautifulSoup
from lxml import etree
import csv

In [15]:
titles_df = pd.read_csv('meta-titles_openRefine.csv', sep='\t')

OUTPUT_FILE = "dnb_results.csv"

In [16]:
titles_df

Unnamed: 0,ID,wikiTitle,wikiID,title_original,author_firstname,author_lastname
0,1,…liner Roma…,noWikiID,…liner Roma…,Joachim,Ringelnatz
1,2,Abendstimmung in Scheveningen,noWikiID,Abendstimmung in Scheveningen,Irmgard,Keun
2,3,Abschied von den Eltern,Q178091,Abschied von den Eltern,Peter,Weiss
3,4,Ach Liebste / laß uns eilen,noWikiID,Ach Liebste / laß uns eilen,Martin,Opitz
4,5,Agnes,Q394442,Agnes,Peter,Stamm
...,...,...,...,...,...,...
189,190,"Weh dem, der aus der Reihe tanzt",Q116160841,"Weh dem, der aus der Reihe tanzt",Ludwig,Harig
190,191,Woyzeck,Q657215,Woyzeck,Georg,Büchner
191,192,Wünschelrute,Q2596645,Wünschelrute,Joseph,Eichendorff
192,193,Zehn,noWikiID,Zehn,Franka,Potente


## Functions

### SAVE & LOAD PROGRESS

#### `save_progress(df, output_file=OUTPUT_FILE)`
Saves progress to a CSV file, merging with existing data and avoiding duplicate records.

#### `load_done_ids(output_file=OUTPUT_FILE)`
Loads a list of already processed IDs from the saved CSV file to skip completed titles.

In [None]:
def save_progress(df, output_file=OUTPUT_FILE):
    """
    Append or update progress in the output CSV file.
    Ensures records are unique by 'idn' (or full row if not available).
    """
    if "ID" not in df.columns:
        print("Warning: no 'ID' column in dataframe before saving.")
    
    # Combine with existing file if present
    if os.path.exists(output_file):
        existing = pd.read_csv(output_file, sep=";", quotechar='"', dtype=str)
        df = pd.concat([existing, df], ignore_index=True)
        if "idn" in df.columns:
            df = df.drop_duplicates(subset=["idn"], keep="last")
        else:
            df = df.drop_duplicates()

    # Ensure all values are stored as strings to avoid quoting issues
    df = df.astype(str)

    # Save file with all fields quoted and UTF-8 BOM for Excel compatibility
    df.to_csv(
        output_file,
        index=False,
        sep=";",
        quotechar='"',
        quoting=csv.QUOTE_ALL,
        encoding="utf-8-sig",
    )
    print(f"Progress saved: {len(df)} total records in {output_file}")


def load_done_ids(output_file=OUTPUT_FILE):
    """
    Load already processed IDs from the saved CSV file.
    Returns a set of IDs to skip in the next run.
    """
    if not os.path.exists(output_file):
        return set()
    df = pd.read_csv(output_file, sep=";", quotechar='"', dtype=str)
    if "ID" in df.columns:
        return set(df["ID"].dropna().unique())
    elif "idn" in df.columns:
        print("Warning: no 'ID' column in saved file; using 'idn' instead.")
        return set(df["idn"].dropna().unique())
    else:
        print("Warning: neither 'ID' nor 'idn' found in saved file.")
        return set()


### DNB SRU FUNCTIONS

#### `dnb_sru_all(title, author=None, batch_size=50, sleep_sec=2, verbose=True)`
Queries the DNB SRU interface for bibliographic records by title and optional author, handling pagination and retries.

#### `dnb_sru_all_by_idn(idn, sleep_sec=0.2)`
Fetches a single bibliographic record from the DNB using its unique IDN identifier.

In [None]:
def dnb_sru_all(title, author=None, batch_size=50, sleep_sec=2, verbose=True):
    """
    Query the DNB SRU interface for a given title (and optional author).
    Tries both (title + author) and title-only queries.
    Fetches results in batches and follows SRU pagination.
    """
    base_url = "https://services.dnb.de/sru/dnb"
    title_clean = title.replace("'", "\\'")
    all_records = []

    # Build queries: title+author (preferred) and title-only (fallback)
    queries = []
    if author:
        queries.append(f'(dc.title all "{title_clean}") and (dc.creator all "{author}")')    
    queries.append(f'(dc.title all "{title_clean}")')  # fallback

    # Loop through both queries until results found
    for q in queries:
        start = 1
        total_found = 0
        if verbose:
            print(f"Trying query: {q}")

        while True:
            params = {
                "version": "1.1",
                "operation": "searchRetrieve",
                "recordSchema": "MARC21-xml",
                "maximumRecords": str(batch_size),
                "startRecord": str(start),
                "query": q
            }

            # Query DNB API and handle potential errors
            try:
                r = requests.get(base_url, params=params, timeout=30)
                r.raise_for_status()
            except requests.exceptions.RequestException as e:
                print(f"Request failed at start={start}: {e}")
                break

            xml = BeautifulSoup(r.content, "xml")
            records = xml.find_all("record", {"type": "Bibliographic"})
            num = len(records)

            if verbose:
                print(f" → Retrieved {num} records (start={start})")

            if num == 0:
                break  # no more results

            all_records.extend(records)
            total_found += num

            # Handle pagination if more results available
            next_pos_tag = xml.find("nextRecordPosition")
            if next_pos_tag and next_pos_tag.text:
                start = int(next_pos_tag.text)
                time.sleep(sleep_sec)
            else:
                break

        # Stop after first query that returns results
        if total_found > 0:
            if verbose:
                print(f"Found {total_found} records for query: {q}")
            break
        else:
            if verbose:
                print(f"No results for query: {q}")

    return all_records


def dnb_sru_all_by_idn(idn, sleep_sec=0.2):
    """
    Fetch a specific record (and possible variants) from DNB by its IDN.
    Used to retrieve related records recursively.
    """
    base_url = "https://services.dnb.de/sru/dnb"
    query = f"idn={idn}"
    params = {
        "version": "1.1",
        "operation": "searchRetrieve",
        "recordSchema": "MARC21-xml",
        "maximumRecords": "10",
        "query": query
    }
    try:
        r = requests.get(base_url, params=params, timeout=30)
        r.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"IDN fetch failed for {idn}: {e}")
        return []
    xml = BeautifulSoup(r.content, "xml")
    records = xml.find_all("record", {"type": "Bibliographic"})
    time.sleep(sleep_sec)
    return records

### PARSE MARC XML

#### `parse_record(record)`
Parses MARCXML data into a structured dictionary, extracting fields like title, author, subjects, languages, and publication info.

In [None]:
def parse_record(record):
    """
    Parse one MARCXML record and extract key bibliographic fields.
    Includes both standard and additional (subject, genre, etc.) fields.
    """
    record_str = unicodedata.normalize("NFC", str(record))
    ns = {"marc": "http://www.loc.gov/MARC21/slim"}
    try:
        xml = etree.fromstring(record_str.encode("utf-8"))
    except Exception:
        return {}

    # --- Helper functions for MARC structure ---
    def get_control(tag):
        nodes = xml.xpath(f"marc:controlfield[@tag='{tag}']", namespaces=ns)
        return nodes[0].text if nodes else None

    def get_datafield(tag, code=None):
        nodes = xml.xpath(f"marc:datafield[@tag='{tag}']", namespaces=ns)
        texts = []
        for node in nodes:
            if code:
                subs = node.xpath(f"marc:subfield[@code='{code}']", namespaces=ns)
                texts.extend([s.text.strip() for s in subs if s.text])
            else:
                texts.append(" || ".join([s.text.strip() for s in node.xpath("marc:subfield", namespaces=ns) if s.text]))
        return " || ".join(texts) if texts else None

    def get_fields(tag_list):
        """Combine multiple MARC tags into one string."""
        combined = []
        for t in tag_list:
            val = get_datafield(t)
            if val:
                combined.append(val)
        return " || ".join(combined) if combined else None

    # --- Control fields ---
    idn = get_control("001")
    field_008 = get_control("008")

    # --- Title and variant titles ---
    title = get_datafield("245", "a") or get_datafield("240", "a") or get_datafield("130", "a")
    standardised_title = get_datafield("240", "a") or get_datafield("130", "a") or title
    alternative_titles = get_datafield("246")

    # --- Author and identifiers ---
    author = get_datafield("100", "a") or get_datafield("700", "a")
    rela = get_datafield("100", "e")
    gnd = get_datafield("100", "0") or get_datafield("700", "0")

    # --- Publication info ---
    pub_year = get_datafield("264", "c") or (field_008[7:11] if field_008 else None)

    # --- Links, notes, and series ---
    links = get_datafield("856", "u")
    series = get_fields(["490", "830"])
    notes = get_fields([f"{i:03}" for i in range(500, 600)])
    related_items = get_fields(["776", "787"])

    # --- Additional MARC fields ---
    language_codes = get_datafield("041")
    subject_chron = get_datafield("648")    # chronological subject
    subject_topical = get_datafield("650")  # topical subject
    genre_form = get_datafield("655")       # genre/form
    publication_info = get_fields(["264"])  # full publication statement

    return {
        "idn": idn,
        "title": title,
        "standardised_title": standardised_title,
        "alternative_titles": alternative_titles,
        "author": author,
        "rela": rela,
        "gnd": gnd,
        "pub_year": pub_year,
        "links": links,
        "series": series,
        "notes": notes,
        "related_items": related_items,
        "language_codes": language_codes,
        "subject_chron": subject_chron,
        "subject_topical": subject_topical,
        "genre_form": genre_form,
        "publication_info": publication_info,
        "control_008": field_008
    }


### RECURSIVE FETCHING

#### `extract_idns_from_related(text)`
Extracts IDN identifiers from related-item fields in MARCXML using regular expressions.

#### `fetch_related_records(record_ids, seen=None)`
Retrieves and parses related DNB records recursively, avoiding duplicates through a `seen` set.

In [None]:
def extract_idns_from_related(text):
    """Extract numeric DNB IDNs from a text string using regex."""
    if not text:
        return []
    return re.findall(r'\b\d{8,}\b', text)


def fetch_related_records(record_ids, seen=None):
    """
    Recursively fetch and parse records linked to the given IDNs.
    Uses a 'seen' set to avoid infinite loops.
    """
    if seen is None:
        seen = set()
    all_parsed = []
    for rid in record_ids:
        if rid in seen:
            continue
        seen.add(rid)
        recs = dnb_sru_all_by_idn(rid)
        for r in recs:
            parsed = parse_record(r)
            if parsed:
                all_parsed.append(parsed)
                related_ids = extract_idns_from_related(parsed.get("related_items"))
                # Recursively fetch related records
                all_parsed.extend(fetch_related_records(related_ids, seen))
    return all_parsed


### MAIN SCRIPT

#### `main(titles_df)`
Iterates through the input dataframe of titles, fetches main and related DNB records for each, and saves results incrementally.

In [17]:
def main(titles_df):
    """
    Main driver function:
      - Loads already completed IDs
      - Iterates over all titles in the dataframe
      - Fetches main and related DNB records
      - Saves progress incrementally
    """
    done_ids = load_done_ids()
    print(f"Already done: {len(done_ids)} titles → {sorted(done_ids)}")

    try:
        for _, row in titles_df.iterrows():
            ID = row["ID"]
            if ID in done_ids:
                print(f"Skipping already done: {row['title_original']}")
                continue

            title = row["title_original"]

            # --- Clean and combine author name ---
            first = str(row.get("author_firstname", "") or "").strip()
            last = str(row.get("author_lastname", "") or "").strip()
            author = f"{first} {last}".strip() or None

            print(f"\nFetching main records for: {title}" + (f" ({author})" if author else ""))

            # --- Fetch main records ---
            main_records = dnb_sru_all(title, author=author, batch_size=25) if author else dnb_sru_all(title, batch_size=25)
            parsed = [parse_record(r) for r in main_records if r]
            print(f"Retrieved {len(parsed)} main records for {title}")

            for p in parsed:
                p.update(row.to_dict())  # attach metadata from input CSV

            # --- Fetch related records ---
            related_idns = list(set(idn for p in parsed for idn in extract_idns_from_related(p.get("related_items"))))
            print(f"Fetching {len(related_idns)} related records for {title}")

            related_parsed = fetch_related_records(related_idns)
            for rp in related_parsed:
                rp.update(row.to_dict())
            print(f"Retrieved {len(related_parsed)} related records for {title}")

            # --- Save progress after each title ---
            df_title = pd.DataFrame(parsed + related_parsed)
            if not df_title.empty:
                save_progress(df_title)
            else:
                print(f"No records found for {title}")

    except KeyboardInterrupt:
        print("\nManual stop detected — progress saved.")
    except Exception as e:
        print(f"\nError occurred: {e}")
    finally:
        print("Script finished safely.")

### Execution

In [18]:
main(titles_df)

Already done: 0 titles → []

Fetching main records for: …liner Roma… (Joachim Ringelnatz)
Trying query: (dc.title all "…liner Roma…") and (dc.creator all "Joachim Ringelnatz")
 → Retrieved 13 records (start=1)
Found 13 records for query: (dc.title all "…liner Roma…") and (dc.creator all "Joachim Ringelnatz")
Retrieved 13 main records for …liner Roma…
Fetching 5 related records for …liner Roma…
Retrieved 2 related records for …liner Roma…
Progress saved: 15 total records in 251014_dnb_results.csv

Fetching main records for: Abendstimmung in Scheveningen (Irmgard Keun)
Trying query: (dc.title all "Abendstimmung in Scheveningen") and (dc.creator all "Irmgard Keun")
 → Retrieved 0 records (start=1)
No results for query: (dc.title all "Abendstimmung in Scheveningen") and (dc.creator all "Irmgard Keun")
Trying query: (dc.title all "Abendstimmung in Scheveningen")
 → Retrieved 0 records (start=1)
No results for query: (dc.title all "Abendstimmung in Scheveningen")
Retrieved 0 main records for