### **Overview**
This script is a robust data harvesting tool designed to fetch academic works from the OpenAlex API and store them in a local DuckDB database. It is specifically optimized for high-volume data retrieval by using multi-threading, automatic retries for network failures, and efficient bulk database writes.

The script targets specific "**Topics**" from topic list file and retrieves all associated articles within a specified date range (2000â€“2026).

In [None]:
import requests
import duckdb
import time
import json
from datetime import datetime, timedelta, date
from typing import List, Dict, Any, Optional
import pandas as pd
import os
from tqdm import tqdm
import threading
import concurrent.futures



### **Configuration**

In [None]:

OPENALEX_BASE = "https://api.openalex.org/works"
TOPIC_BASE = "https://api.openalex.org/topics"
PAGE_SIZE = 200
COUNT_LIMIT = 5000 
DB_DIR = 'db'
DB_FILENAME = 'openAlex.db'
DB_PATH = os.path.join(DB_DIR, DB_FILENAME)
MAX_WORKERS = 5
db_lock = threading.Lock()

MAX_RETRIES_PER_INTERVAL = 3   # Try each interval 3 times before giving up
MAX_CONSECUTIVE_FAILURES = 10  # If 10 intervals fail in a row, STOP the script.
consecutive_failure_count = 0  # Global counter (don't edit this manually)
thread_lock = threading.Lock() # Lock to update the counter safely

### **Setup and Database Connections**

In [2]:
DB_DIR = 'db'
if not os.path.exists(DB_DIR):
    os.makedirs(DB_DIR)
DB_FILENAME = 'openAlex.db'
DB_PATH = os.path.join(DB_DIR, DB_FILENAME)
conn = duckdb.connect(DB_PATH)
print(f"Connected to database at {DB_PATH}")

Connected to database at db\openAlex.db


In [3]:
def init_schema(conn):
    # conn.execute("DROP TABLE IF EXISTS works") 
    
    conn.execute("""
        CREATE TABLE IF NOT EXISTS works (
            id VARCHAR PRIMARY KEY,
            title VARCHAR,
            doi VARCHAR,
            publication_date DATE,
            primary_topic VARCHAR,
            version VARCHAR,
            fwci DOUBLE,
            citation_count INTEGER,
            mag_id BIGINT             -- New Column for MAG ID
        )
    """)
    
    conn.execute("""
        CREATE TABLE IF NOT EXISTS topics (
            topic_id VARCHAR PRIMARY KEY,
            topic_name VARCHAR UNIQUE
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS work_topics (
            work_id VARCHAR,
            topic_id VARCHAR,
            score DOUBLE,
            PRIMARY KEY (work_id, topic_id),
            FOREIGN KEY (work_id) REFERENCES works(id),
            FOREIGN KEY (topic_id) REFERENCES topics(topic_id)
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS harvest_intervals (
            topic_id VARCHAR,
            start_date DATE,
            end_date DATE,
            last_token VARCHAR,
            fetched_count INTEGER DEFAULT 0,
            status VARCHAR,
            last_updated TIMESTAMP,
            PRIMARY KEY (topic_id, start_date, end_date)
        )
    """)
    print("Schema initialized with MAG ID.")

init_schema(conn)

Schema initialized with MAG ID.


### **Request Wraper**
Makes a GET request. If it fails due to network issues or server errors (500, 502, 503, 429), it waits and retries indefinitely.Raises exception only for client errors (400, 401, 404, etc).

In [None]:
def request_with_retry(url, params=None):
    """
    Makes a GET request. If it fails due to network issues or 
    server errors (500, 502, 503, 429), it waits and retries indefinitely.
    Raises exception only for client errors (400, 401, 404, etc).
    """
    delay = 2  # Start with 2 seconds delay
    while True:
        try:
            r = requests.get(url, params=params, timeout=30)
            
            # If successful, return response
            if r.status_code == 200:
                return r
            
            # If server error or rate limit, print and retry
            if r.status_code in [429, 500, 502, 503, 504]:
                print(f"Status {r.status_code} received. Retrying in {delay}s...")
                time.sleep(delay)
                delay = min(delay * 2, 60) # Exponential backoff up to 60s
                continue
            
            # If it's a client error (e.g. 404), raise it immediately
            r.raise_for_status()
            
        except requests.exceptions.RequestException as e:
            print(f"Network error: {e}. Retrying in {delay}s...")
            time.sleep(delay)
            delay = min(delay * 2, 60)

### **Creating Manageable Intervals for each Topic**

In [None]:
def get_topic_name(topic_id):
    # Uses the retry wrapper
    r = request_with_retry(f"{TOPIC_BASE}/{topic_id}")
    data = r.json()
    return data.get("display_name") or data.get("name")

# to get count of works for a topic in a date range
def get_count(topic_id, start_date, end_date):
    params = {
        "filter": f"primary_topic.id:{topic_id},from_publication_date:{start_date},to_publication_date:{end_date},type:article",
        "per-page": 1
    }
    # Uses the retry wrapper
    r = request_with_retry(OPENALEX_BASE, params=params)
    return int(r.json()["meta"]["count"])

# to split date ranges to paper counts in  intervals below limit so if the  inverval fail we have to retry for small number of papers
def split_range_safely(topic_id, start_date, end_date, limit=COUNT_LIMIT):
    """
    Recursively splits date ranges until the count is below the limit.
    """
    cnt = get_count(topic_id, start_date, end_date)
    print(f"   Checking range {start_date} -> {end_date}: count = {cnt}")
    
    if cnt <= limit:
        return [(start_date, end_date)]
    
    days = (end_date - start_date).days
    if days <= 1:
        # If we are down to 1 day and still over limit, we must accept it (or handle pagination)
        return [(start_date, end_date)]
        
    mid = start_date + timedelta(days=days // 2)
    left = split_range_safely(topic_id, start_date, mid, limit)
    right = split_range_safely(topic_id, mid + timedelta(days=1), end_date, limit)
    return left + right

# to register intervals of each topic in the database
def register_intervals(conn, topic_id, intervals):
    for start, end in intervals:
        conn.execute("""
            INSERT OR IGNORE INTO harvest_intervals
            (topic_id, start_date, end_date, status, fetched_count, last_updated)
            VALUES (?, ?, ?, 'pending', 0, NOW())
        """, [topic_id, start, end])


### **Inserting Data into Database**

In [None]:

def insert_works_bulk(conn, works_list):
    """
    Inserts a large list of works efficiently.
    """
    if not works_list:
        return
    

    for w in works_list:
        if isinstance(w.get('publication_date'), (date, datetime)):
            w['publication_date'] = str(w['publication_date'])

    df = pd.DataFrame(works_list)
    
    with db_lock:
        conn.register("batch_works", df)
        conn.execute("INSERT OR IGNORE INTO works SELECT * FROM batch_works")
        conn.unregister("batch_works")

def insert_topics_bulk(conn, results_list):
    """
    Extracts all topics and relations from the results list and bulk inserts them.
    Drastically faster than row-by-row checks.
    """
    if not results_list:
        return
    # 1. Prepare Data in Memory (No Lock needed yet)
    unique_topics = {}
    work_topic_relations = []

    for w in results_list:
        work_id = w.get("id").split('/')[-1]
        all_topics = w.get("topics") or []
        
        for t in all_topics:
            topic_id = t.get("id").split('/')[-1]
            topic_name = t.get("display_name") or t.get("name")
            score = t.get("score") or 0.0
            
            if topic_id and topic_name:
                unique_topics[topic_id] = topic_name
                work_topic_relations.append({
                    "work_id": work_id,
                    "topic_id": topic_id,
                    "score": float(score)
                })

    if not unique_topics:
        return

    topics_df = pd.DataFrame(list(unique_topics.items()), columns=["topic_id", "topic_name"])
    relations_df = pd.DataFrame(work_topic_relations)

    # 2. Bulk Write (Lock needed now)
    with db_lock:
        conn.register("batch_topics", topics_df)
        conn.execute("INSERT OR IGNORE INTO topics SELECT * FROM batch_topics")
        conn.unregister("batch_topics")

        if not relations_df.empty:
            conn.register("batch_relations", relations_df)
            conn.execute("INSERT OR IGNORE INTO work_topics SELECT * FROM batch_relations")
            conn.unregister("batch_relations")

def mark_interval_complete(conn, topic_id, start_date, end_date, total_count):
    """
    Updates the interval status to 'done' in one go.
    """
    with db_lock:
        conn.execute("""
            UPDATE harvest_intervals
            SET status='done', fetched_count=?, last_updated=now()
            WHERE topic_id=? AND start_date=? AND end_date=?
        """, [total_count, topic_id, start_date, end_date])

### **Data Fetching**

In [None]:
# to fetch a single page of works for a topic in a date range
def fetch_page(topic_id, start_date, end_date, cursor="*"):
    params = {
        "filter": f"primary_topic.id:{topic_id},from_publication_date:{start_date},to_publication_date:{end_date},type:article",
        "per-page": PAGE_SIZE,
        "cursor": cursor
    }
    # Uses the retry wrapper
    r = request_with_retry(OPENALEX_BASE, params=params)
    data = r.json()
    results = data.get("results", [])
    next_cursor = data["meta"].get("next_cursor")
    return results, next_cursor

In [7]:
def fetch_interval_atomic(conn, topic_id, start_date, end_date):
    """
    Fetches the ENTIRE interval into memory first, then dumps to DB.
    Eliminates database locking during the download phase.
    """
    all_works = []
    all_raw_results = [] 
    
    cursor = "*"
    finished = False
    

    while not finished:
        results, next_cursor = fetch_page(topic_id, start_date, end_date, cursor)
        all_raw_results.extend(results)

        for w in results:
            primary_topic = w.get("primary_topic", {}).get("display_name") if w.get("primary_topic") else None
            mag_id = w.get("ids", {}).get("mag")
            
            all_works.append({
                "id": w.get("id").split('/')[-1],
                "title": w.get("title"),
                "doi": w.get("doi"),
                "publication_date": w.get("publication_date"),
                "primary_topic": primary_topic,
                "version": w.get("primary_location", {}).get("version", {}),
                "fwci": w.get("fwci"),
                "citation_count": w.get("cited_by_count"),
                "mag_id": mag_id
            })

        if next_cursor is None:
            finished = True
        else:
            cursor = next_cursor


    if all_works:
        insert_works_bulk(conn, all_works)
        insert_topics_bulk(conn, all_raw_results)
        mark_interval_complete(conn, topic_id, start_date, end_date, len(all_works))

In [None]:
def process_interval_wrapper(args):
    """
    Wrapper that retries the interval if it fails, and triggers a stop 
    if too many failures happen globally.
    """
    conn, topic_id, start_date, end_date = args
    global consecutive_failure_count

    # 1. Check Circuit Breaker
    if consecutive_failure_count >= MAX_CONSECUTIVE_FAILURES:
        return (start_date, end_date, False)

    # 2. Retry Loop for this specific interval
    for attempt in range(1, MAX_RETRIES_PER_INTERVAL + 1):
        try:
            fetch_interval_atomic(conn, topic_id, start_date, end_date)
            with thread_lock:
                consecutive_failure_count = 0 # Reset global failure count on success
            return (start_date, end_date, True)

        except Exception as e:
            print(f"Error in interval {start_date}->{end_date} (Attempt {attempt}/{MAX_RETRIES_PER_INTERVAL}): {e}")
            time.sleep(2 * attempt) # Wait a bit before retrying (2s, 4s, 6s)
    
    # 3. If we failed 3 times, mark as a global failure
    print(f"GIVING UP on interval {start_date}->{end_date} after {MAX_RETRIES_PER_INTERVAL} attempts.")
    with thread_lock:
        consecutive_failure_count += 1
        if consecutive_failure_count >= MAX_CONSECUTIVE_FAILURES:
            print(f"\nCRITICAL: {consecutive_failure_count} consecutive failures detected. Stopping script to prevent skipping all intervals.\n")
            
    return (start_date, end_date, False)

In [None]:
def harvest_topics(conn, topic_ids, start_date, end_date):
    global consecutive_failure_count
    
    for topic_id in topic_ids:
        if not str(topic_id).startswith('t'):
             topic_id_fmt = 't' + str(topic_id)
        else:
             topic_id_fmt = str(topic_id)
             
        try:
            topic_name = get_topic_name(topic_id_fmt)
        except Exception as e:
            print(f"Could not fetch name for {topic_id_fmt}, skipping. Error: {e}")
            continue

        print(f"\n=== Harvesting topic {topic_id_fmt} ({topic_name}) ===")

        with db_lock:
            existing_intervals = conn.execute("""
                SELECT start_date, end_date 
                FROM harvest_intervals 
                WHERE topic_id = ? 
                ORDER BY start_date
            """, [topic_id_fmt]).fetchall()

        if existing_intervals:
            print(f"Found {len(existing_intervals)} existing intervals.")
            intervals = existing_intervals
        else:
            print(f"Calculating splits via API...")
            intervals = split_range_safely(topic_id_fmt, start_date, end_date)
            with db_lock:
                register_intervals(conn, topic_id_fmt, intervals)

        # Identify Pending Work
        tasks = []
        for safe_start, safe_end in intervals:
    
            if hasattr(safe_start, 'date'): safe_start = safe_start.date()
            if hasattr(safe_end, 'date'): safe_end = safe_end.date()
            if isinstance(safe_start, str): safe_start = datetime.strptime(safe_start, '%Y-%m-%d').date()
            if isinstance(safe_end, str): safe_end = datetime.strptime(safe_end, '%Y-%m-%d').date()

            with db_lock:
                row = conn.execute("""
                    SELECT status FROM harvest_intervals
                    WHERE topic_id=? AND start_date=? AND end_date=?
                """, [topic_id_fmt, safe_start, safe_end]).fetchone()
            
            if row and row[0] == "done":
                continue
                
            tasks.append((conn, topic_id_fmt, safe_start, safe_end))

        print(f"Starting {len(tasks)} pending intervals with {MAX_WORKERS} threads...")

        # --- EXECUTOR LOOP WITH STOP CHECK ---
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = []
            for t in tasks:
                if consecutive_failure_count >= MAX_CONSECUTIVE_FAILURES:
                    print("Circuit breaker active. Cancelling remaining tasks.")
                    break
                futures.append(executor.submit(process_interval_wrapper, t))

            for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
                start, end, success = future.result()
                if not success and consecutive_failure_count >= MAX_CONSECUTIVE_FAILURES:
                    print("Stopping execution loop due to critical failures.")
                    executor.shutdown(wait=False)
                    return 

        if consecutive_failure_count >= MAX_CONSECUTIVE_FAILURES:
            print("Execution halted.")
            break

        print(f"=== Completed harvesting topic {topic_id_fmt} ===\n")

### **Start of Fetching**

In [None]:
topic_ids = pd.read_excel('Physics.xlsx')['topic_id']
start_date = date(2000, 1, 1)
end_date = date(2026, 1, 1)

harvest_topics(conn, topic_ids, start_date, end_date)


=== Harvesting topic t10466 (Meteorological Phenomena and Simulations) ===
Found 57 existing intervals.
Starting 35 pending intervals with 5 threads...


  0%|          | 0/35 [00:09<?, ?it/s]


In [None]:
conn.close()