# Step 1: Ingestion (The Collector)

In [1]:
import pandas as pd
import os
import re
import requests
from bs4 import BeautifulSoup
import time
import concurrent.futures
import urllib3

# Suppress InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

DATA_PATH = '../data/books_data.csv'
OUTPUT_PATH = '../data/books_raw_enriched.csv'

# Load data
if os.path.exists(DATA_PATH):
    try:
        # Default UTF-8 first
        df = pd.read_csv(DATA_PATH, encoding='utf-8')
    except UnicodeDecodeError:
        # Fallback to latin-1
        print("UTF-8 failed, trying latin-1...")
        df = pd.read_csv(DATA_PATH, encoding='latin-1')
    
    print(f"Loaded {len(df)} rows.")
else:
    print("File not found.")
    df = pd.DataFrame()

df.head()

UTF-8 failed, trying latin-1...
Loaded 36358 rows.


  df = pd.read_csv(DATA_PATH, encoding='latin-1')


Unnamed: 0,Acc. Date,Acc. No.,Title,ISBN,Author/Editor,Ed./Vol.,Place & Publisher,Year,Page(s),Class No./Book No.,...,Unnamed: 11,Unnamed: 12,Unnamed: 13,Unnamed: 14,Unnamed: 15,Unnamed: 16,Unnamed: 17,Unnamed: 18,Unnamed: 19,Unnamed: 20
0,20-01-2017,1,Network design : management and technical pers...,849334047,"Mann-Rubinson, Teresa C.",,"Boca Raton: CRC Press,",1999.0,405 p.;,004.6 MAN,...,,,,,,,,,,
1,08-09-2001,2,Multimedia information analysis and retrieval ...,9783540648260,"Ip, Horace H. S.",,"Berlin: Springer,",1998.0,"viii, 264 p.;",004 IPH,...,,,,,,,,,,
2,08-09-2001,3,"Multimedia systems : delivering, generating, a...",1852332484,"Morris, Tim",,"London: Springer,",2000.0,"xi, 191 p.;",006.7 MOR,...,,,,,,,,,,
3,05-09-2001,4,Principles of Data Mining and Knowledge Discovery,9783540410669,"Zytkov, Jan. M.",,"New York: Springer-Verlag,",1999.0,593 p.;,006.3 ZYT,...,,,,,,,,,,
4,09-08-2001,5,Focusing solutions for data mining : analytica...,3540664297,"Reinartz, Thomas",,"New York: Springer,",1999.0,"xiv, 307 p.;",006.3 REI,...,,,,,,,,,,


In [2]:
df = df.drop(["Unnamed: 10", "Unnamed: 11", "Unnamed: 12", "Unnamed: 13", "Unnamed: 14", "Unnamed: 15", "Unnamed: 16", "Unnamed: 17", "Unnamed: 18", "Unnamed: 19", "Unnamed: 20"], axis=1)
df.columns

Index(['Acc. Date', 'Acc. No.', 'Title', 'ISBN', 'Author/Editor', 'Ed./Vol.',
       'Place & Publisher', 'Year', 'Page(s)', 'Class No./Book No.'],
      dtype='object')

**Normalize ISBNs**

In [3]:
def normalize_isbn(isbn):
    if pd.isna(isbn):
        return None
    isbn = str(isbn)
    # Remove non-alphanumeric
    clean = re.sub(r'[^a-zA-Z0-9]', '', isbn)
    return clean

if 'ISBN' in df.columns:
    df['clean_isbn'] = df['ISBN'].apply(normalize_isbn)
else:
    print("Warning: 'ISBN' column not found. Available columns:", df.columns)

**Data Fetching Functions**

In [4]:
def reconstruct_openalex_abstract(inverted_index):
    if not inverted_index:
        return None
    # Reconstruct abstract from inverted index
    # keys are words, values are lists of positions
    word_index = []
    for word, positions in inverted_index.items():
        for pos in positions:
            word_index.append((pos, word))
    word_index.sort()
    return ' '.join([word for _, word in word_index])

**Concurrent Execution Loop**

In [5]:
# Create new column for description
df['description'] = None

# Global session (Simple)
session = requests.Session()

# Redefine fetchers 
def fetch_openlibrary(isbn):
    if not isbn:
        return None
    url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&format=json&jscmd=data"
    try:
        response = session.get(url, timeout=5)
        if response.status_code == 200:
            data = response.json()
            key = f"ISBN:{isbn}"
            if key in data:
                return data[key]
    except Exception:
        pass
    return None

def fetch_google_books(isbn):
    if not isbn:
        return None
    url = f"https://www.googleapis.com/books/v1/volumes?q=isbn:{isbn}"
    try:
        response = session.get(url, timeout=5)
        if response.status_code == 200:
            data = response.json()
            if 'items' in data:
                item = data['items'][0]
                info = item.get('volumeInfo', {})
                return info.get('description')
    except Exception:
        pass
    return None

def fetch_google_books_search(title, author):
    if not title:
        return None
    
    # Simple broad search
    query = title
    if author:
        clean_author = str(author).split(',')[0].split(';')[0].strip()
        query += f" {clean_author}"
        
    url = "https://www.googleapis.com/books/v1/volumes"
    params = {'q': query, 'maxResults': 1}
    
    try:
        response = session.get(url, params=params, timeout=5)
        if response.status_code == 200:
            data = response.json()
            if 'items' in data:
                item = data['items'][0]
                info = item.get('volumeInfo', {})
                return info.get('description')
    except Exception:
        pass
    return None

def fetch_openalex(isbn):
    if not isbn:
        return None
    url = f"https://api.openalex.org/works?filter=ids.isbn:{isbn}"
    try:
        response = session.get(url, timeout=3)
        if response.status_code == 200:
            data = response.json()
            results = data.get('results', [])
            if results:
                work = results[0]
                abstract = reconstruct_openalex_abstract(work.get('abstract_inverted_index'))
                if abstract:
                    return f"Abstract: {abstract}"
                
                concepts = work.get('concepts', [])
                if concepts:
                    concepts.sort(key=lambda x: x.get('score', 0), reverse=True)
                    keywords = [c['display_name'] for c in concepts[:10]]
                    return f"Keywords: {', '.join(keywords)}"
    except Exception:
        pass
    return None


# Helper function 
def process_book(idx, row):
    isbn = row.get('clean_isbn')
    title = row.get('Title')
    author = row.get('Author/Editor')
    
    desc = None
    
    # 1. Google Books (ISBN)
    if isbn:
        desc = fetch_google_books(isbn)
    
    # 2. OpenLibrary (ISBN)
    if not desc and isbn:
        ol_data = fetch_openlibrary(isbn)
        if ol_data and isinstance(ol_data, dict):
             val = ol_data.get('description')
             if isinstance(val, dict):
                 desc = val.get('value')
             else:
                 desc = val
                 
    # 3. OpenAlex (ISBN)
    if not desc and isbn:
        desc = fetch_openalex(isbn)

    # 4. Search Fallback (Title + Author)
    if not desc and title:
        desc = fetch_google_books_search(title, author)
        
    return idx, desc

# Process ALL rows
print(f"Starting concurrent enrichment for {len(df)} books...")

# Use concurrency
MAX_WORKERS = 30 

with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    future_to_idx = {
        executor.submit(process_book, idx, row): idx 
        for idx, row in df.iterrows()
    }
    
    completed_count = 0
    for future in concurrent.futures.as_completed(future_to_idx):
        idx, desc = future.result()
        if desc:
            df.loc[idx, 'description'] = desc
        
        completed_count += 1
        # Reporting progress
        if completed_count % 500 == 0:
            print(f"Processed {completed_count}/{len(df)} books...")

print("Enrichment done.")
df.head()

Starting concurrent enrichment for 36358 books...
Processed 500/36358 books...
Processed 1000/36358 books...
Processed 1500/36358 books...
Processed 2000/36358 books...
Processed 2500/36358 books...
Processed 3000/36358 books...
Processed 3500/36358 books...
Processed 4000/36358 books...
Processed 4500/36358 books...
Processed 5000/36358 books...
Processed 5500/36358 books...
Processed 6000/36358 books...
Processed 6500/36358 books...
Processed 7000/36358 books...
Processed 7500/36358 books...
Processed 8000/36358 books...
Processed 8500/36358 books...
Processed 9000/36358 books...
Processed 9500/36358 books...
Processed 10000/36358 books...
Processed 10500/36358 books...
Processed 11000/36358 books...
Processed 11500/36358 books...
Processed 12000/36358 books...
Processed 12500/36358 books...
Processed 13000/36358 books...
Processed 13500/36358 books...
Processed 14000/36358 books...
Processed 14500/36358 books...
Processed 15000/36358 books...
Processed 15500/36358 books...
Processed

Unnamed: 0,Acc. Date,Acc. No.,Title,ISBN,Author/Editor,Ed./Vol.,Place & Publisher,Year,Page(s),Class No./Book No.,clean_isbn,description
0,20-01-2017,1,Network design : management and technical pers...,849334047,"Mann-Rubinson, Teresa C.",,"Boca Raton: CRC Press,",1999.0,405 p.;,004.6 MAN,849334047,Network Design outlines the fundamental princi...
1,08-09-2001,2,Multimedia information analysis and retrieval ...,9783540648260,"Ip, Horace H. S.",,"Berlin: Springer,",1998.0,"viii, 264 p.;",004 IPH,9783540648260,
2,08-09-2001,3,"Multimedia systems : delivering, generating, a...",1852332484,"Morris, Tim",,"London: Springer,",2000.0,"xi, 191 p.;",006.7 MOR,1852332484,
3,05-09-2001,4,Principles of Data Mining and Knowledge Discovery,9783540410669,"Zytkov, Jan. M.",,"New York: Springer-Verlag,",1999.0,593 p.;,006.3 ZYT,9783540410669,
4,09-08-2001,5,Focusing solutions for data mining : analytica...,3540664297,"Reinartz, Thomas",,"New York: Springer,",1999.0,"xiv, 307 p.;",006.3 REI,3540664297,


**Saving Data**

In [6]:
df.to_csv(OUTPUT_PATH, index=False)
print(f"Saved enriched data to {OUTPUT_PATH}")

Saved enriched data to ../data/books_raw_enriched.csv


In [7]:
df["description"].isna().sum()

np.int64(24890)