# Semantic Scholar Abstracts Collection Pipeline

This notebook downloads and processes paper abstracts from the [Semantic Scholar Academic Graph](https://www.semanticscholar.org/product/api) dataset. Abstracts are filtered to include only papers that exist in our papers database (collected via `get_semantic_scholar_papers.ipynb`).

**Prerequisite:** Run `get_semantic_scholar_papers.ipynb` first to populate the papers database (`ss_aws.db`) which contains the target `corpusid` values.

**Output:** Abstracts with `corpusid`, `externalids`, and `abstract` text stored in `ss_abstracts.db`.

In [1]:
import pandas as pd
from typing import List, Dict, Optional
import requests
import duckdb
import time
from tqdm.notebook import tqdm
import os

## 1. Database Setup

Initialize DuckDB connections:
- **Primary connection:** `ss_abstracts.db` for storing abstracts
- **Attached database:** `ss_aws.db` (papers database) for filtering by valid `corpusid`

In [2]:
DB_DIR = 'db'
if not os.path.exists(DB_DIR):
    os.makedirs(DB_DIR)

# Connect to abstracts database
DB_FILENAME = 'ss_abstracts.db'
DB_PATH = os.path.join(DB_DIR, DB_FILENAME)
conn = duckdb.connect(DB_PATH)
print(f"Connected to abstracts database at {DB_PATH}")

# Attach the papers database to access corpusids
PAPERS_DB_PATH = os.path.join(DB_DIR, 'ss_aws.db')
if os.path.exists(PAPERS_DB_PATH):
    conn.execute(f"ATTACH '{PAPERS_DB_PATH}' AS papers_db")
    print(f"Attached papers database at {PAPERS_DB_PATH}")
    
    # Get count of papers to filter against
    papers_count = conn.execute("SELECT COUNT(*) FROM papers_db.papers").fetchone()[0]
    print(f"Found {papers_count:,} papers to filter against")
else:
    print(f"WARNING: Papers database not found at {PAPERS_DB_PATH}")
    print("Will process all abstracts without filtering")

Connected to abstracts database at db/ss_abstracts.db
Attached papers database at db/ss_aws.db
Found 43,337,660 papers to filter against


In [3]:
# Create table for abstracts
conn.execute("""
    CREATE TABLE IF NOT EXISTS abstracts (
        corpusid VARCHAR PRIMARY KEY,
        externalids VARCHAR,
        abstract VARCHAR
    )
""")

# Create metadata table to track file processing status
conn.execute("""
    CREATE TABLE IF NOT EXISTS file_metadata (
        file_index INTEGER PRIMARY KEY,
        file_url VARCHAR,
        status VARCHAR,  -- 'pending', 'processing', 'success', 'failed'
        records_processed INTEGER DEFAULT 0,
        records_inserted INTEGER DEFAULT 0,
        chunks_processed INTEGER DEFAULT 0,
        size_mb FLOAT DEFAULT 0,
        error_message VARCHAR,
        started_at TIMESTAMP,
        completed_at TIMESTAMP,
        last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

print("Tables 'abstracts' and 'file_metadata' created/verified")

# Show table schemas
print("\nAbstracts table schema:")
display(conn.execute("DESCRIBE abstracts").df())
print("\nFile metadata table schema:")
display(conn.execute("DESCRIBE file_metadata").df())

Tables 'abstracts' and 'file_metadata' created/verified

Abstracts table schema:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,corpusid,VARCHAR,NO,PRI,,
1,externalids,VARCHAR,YES,,,
2,abstract,VARCHAR,YES,,,



File metadata table schema:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,file_index,INTEGER,NO,PRI,,
1,file_url,VARCHAR,YES,,,
2,status,VARCHAR,YES,,,
3,records_processed,INTEGER,YES,,0,
4,records_inserted,INTEGER,YES,,0,
5,chunks_processed,INTEGER,YES,,0,
6,size_mb,FLOAT,YES,,0,
7,error_message,VARCHAR,YES,,,
8,started_at,TIMESTAMP,YES,,,
9,completed_at,TIMESTAMP,YES,,,


## 2. Semantic Scholar API Configuration

Connect to the Semantic Scholar Datasets API to retrieve the abstracts dataset manifest.

In [None]:
API_KEY = "" # Semantic Scholar API key
API_HEADERS = {"x-api-key": API_KEY}


In [5]:
release_info = requests.get('https://api.semanticscholar.org/datasets/v1/release/latest').json()
release_info

{'release_id': '2025-12-02',
 'README': 'Semantic Scholar Academic Graph Datasets\n\nThese datasets provide a variety of information about research papers taken from a snapshot in time of the Semantic Scholar corpus.\n\nThis site is provided by The Allen Institute for Artificial Intelligence (“AI2”) as a service to the\nresearch community. The site is covered by AI2 Terms of Use and Privacy Policy. AI2 does not claim\nownership of any materials on this site unless specifically identified. AI2 does not exercise editorial\ncontrol over the contents of this site. AI2 respects the intellectual property rights of others. If\nyou believe your copyright or trademark is being infringed by something on this site, please follow\nthe "DMCA Notice" process set out in the Terms of Use (https://allenai.org/terms).\n\nSAMPLE DATA ACCESS\nSample data files can be downloaded with the following UNIX command:\n\nfor f in $(curl https://s3-us-west-2.amazonaws.com/ai2-s2ag/samples/MANIFEST.txt)\n  do curl 

The [dataset release endpoint](https://api.semanticscholar.org/api-docs/datasets) provides a list of datasets available for download.

In [6]:
abstracts_dataset = requests.get('https://api.semanticscholar.org/datasets/v1/release/latest/dataset/abstracts', headers=API_HEADERS).json()
abstracts_dataset


{'name': 'abstracts',
 'description': 'Paper abstract text, where available.\n100M records in 30 1.8GB files.',
 'README': 'Semantic Scholar Academic Graph Datasets\n\nThe "abstracts" dataset provides abstract text for selected papers.\n\nSCHEMA\n - openAccessInfo\n   - externalIds: IDs of this paper in different catalogs\n   - license/url/status: open-access information provided by Unpaywall, linked by DOI or PubMed Central ID\n\nLICENSE\nThis collection is licensed under ODC-BY. (https://opendatacommons.org/licenses/by/1.0/)\n\nBy downloading this data you acknowledge that you have read and agreed to all the terms in this license.\n\nATTRIBUTION\nWhen using this data in a product or service, or including data in a redistribution, please cite the following paper:\n\nBibTex format:\n@misc{https://doi.org/10.48550/arxiv.2301.10140,\n  title = {The Semantic Scholar Open Data Platform},\n  author = {Kinney, Rodney and Anastasiades, Chloe and Authur, Russell and Beltagy, Iz and Bragg, Jona

Here, we download the `abstracts` dataset which contains paper abstracts. Unlike the `papers` dataset used in `get_semantic_scholar_papers.ipynb`, abstracts are stored separately and need to be joined via `corpusid`. Additionally, we also store `externalIds` (DOI, MAG, ..) to enable merge with external database (OpenAlex)

In [7]:
len(abstracts_dataset['files'])

30

The abstracts dataset is split into multiple files in an AWS bucket. We track processing status for each file to enable resumable downloads.

In [None]:
# Initialize file metadata for all files in the dataset
global abstracts_dataset
def initialize_file_metadata():
    """Initialize metadata entries for all files if not already present"""
    for idx, file_url in enumerate(abstracts_dataset['files']):
        conn.execute("""
            INSERT OR IGNORE INTO file_metadata (file_index, file_url, status)
            VALUES (?, ?, 'pending')
        """, [idx, file_url])
    
    total_files = len(abstracts_dataset['files'])
    print(f"Initialized metadata for {total_files} files")
    
    # Show status summary
    status_summary = conn.execute("""
        SELECT status, COUNT(*) as count 
        FROM file_metadata 
        GROUP BY status 
        ORDER BY count DESC
    """).df()
    print("\nCurrent status summary:")
    display(status_summary)

def update_expired_urls():
    """Update URLs in metadata table with fresh URLs from the API"""
    print("Updating URLs with fresh tokens...")
    global abstracts_dataset
    # Get fresh dataset with new URLs
    fresh_dataset = requests.get(
        'https://api.semanticscholar.org/datasets/v1/release/latest/dataset/abstracts', 
        headers=API_HEADERS
    ).json()
    
    if len(fresh_dataset['files']) != len(abstracts_dataset['files']):
        print(f"WARNING: File count mismatch!")
        print(f"   Old: {len(abstracts_dataset['files'])}, New: {len(fresh_dataset['files'])}")
        print(f"   URLs may not align correctly!")
    
    # Update URLs in the database
    updated_count = 0
    for idx, new_url in enumerate(fresh_dataset['files']):
        result = conn.execute("""
            UPDATE file_metadata 
            SET file_url = ?,
                last_updated = CURRENT_TIMESTAMP
            WHERE file_index = ?
        """, [new_url, idx])
        updated_count += 1
    
    print(f"Updated {updated_count} URLs in metadata table")
    
    # Update the global variable as well
    
    abstracts_dataset = fresh_dataset
    
    return fresh_dataset
    
initialize_file_metadata()

Initialized metadata for 30 files

Current status summary:


Unnamed: 0,status,count
0,pending,30


## 3. Streaming Download & Processing

Download compressed JSONL files, decompress on-the-fly, **filter by corpusid** (to keep only abstracts for papers in our database), and insert matching records into DuckDB.

**Key difference from papers pipeline:** Instead of filtering by field of study, we filter by `corpusid` to only store abstracts for papers we've already collected. This is done via an efficient `INNER JOIN` with a temporary table of valid corpusids.

In [None]:
import asyncio
import aiohttp
from pathlib import Path
import time as time_module
import gzip
import json
from io import BytesIO
from collections import deque

# Configuration
BATCH_SIZE = 2  # Number of parallel file downloads
DELAY_BETWEEN_BATCHES = 2  # Seconds to wait between batches
DELAY_BETWEEN_FILES = 0.5  # Seconds to wait between individual files
CHUNK_SIZE = 256 * 1024  # 256KB chunks for streaming
RECORDS_PER_BATCH = 5000  # Process records in batches

processing_queue = asyncio.Queue()

# Create a temporary table with valid corpusids for efficient filtering
print("Setting up corpusid filtering...")
try:
    # Create temp table with corpusids from papers database
    conn.execute("""
        CREATE TEMP TABLE IF NOT EXISTS valid_corpusids AS 
        SELECT DISTINCT CAST(corpusid AS VARCHAR) as corpusid 
        FROM papers_db.papers
    """)
    
    # Create index for fast lookups
    conn.execute("CREATE INDEX IF NOT EXISTS idx_valid_corpusids ON valid_corpusids(corpusid)")
    
    valid_count = conn.execute("SELECT COUNT(*) FROM valid_corpusids").fetchone()[0]
    print(f"Created temp table with {valid_count:,} valid corpusids")
    FILTER_BY_CORPUSID = True
except Exception as e:
    print(f"Could not setup filtering: {e}")
    print("Will process all abstracts without filtering")
    FILTER_BY_CORPUSID = False

print(f"\nTotal files to download: {len(abstracts_dataset['files'])}")
print(f"Batch size: {BATCH_SIZE}")
print(f"Chunk size: {CHUNK_SIZE/1024:.0f} KB")
print(f"Records processed per batch: {RECORDS_PER_BATCH}")
print(f"Filtering by corpusid: {FILTER_BY_CORPUSID}")

Setting up corpusid filtering...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Created temp table with 43,337,660 valid corpusids

Total files to download: 30
Batch size: 2
Chunk size: 256 KB
Records processed per batch: 5000
Filtering by corpusid: True


In [11]:
async def process_records_batch(records, file_index):
    """Process a batch of records and write abstracts to DuckDB (filtered by corpusid)"""
    records_to_insert = []
    
    for record in records:
        # Extract corpusid
        corpusid = record.get('corpusid')
        if not corpusid:
            continue
        
        # Extract abstract
        abstract = record.get('abstract', '')
        if not abstract:
            continue
        
        # Extract externalids from openaccessinfo
        externalids_dict = {}
        openaccessinfo = record.get('openaccessinfo', {})
        if openaccessinfo and isinstance(openaccessinfo, dict):
            externalids_dict = openaccessinfo.get('externalids', {})
        
        # Convert externalids dict to JSON string for storage
        externalids_str = json.dumps(externalids_dict) if externalids_dict else None
        
        abstract_record = {
            'corpusid': corpusid,
            'externalids': externalids_str,
            'abstract': abstract
        }
        records_to_insert.append(abstract_record)
    
    # Write to DuckDB with filtering
    inserted_count = 0
    if records_to_insert:
        df_batch = pd.DataFrame(records_to_insert)
        
        # Get count before insertion
        count_before = conn.execute("SELECT COUNT(*) FROM abstracts").fetchone()[0]
        
        if FILTER_BY_CORPUSID:
            # Use DuckDB JOIN to filter - only insert records with valid corpusids
            conn.execute("""
                INSERT OR IGNORE INTO abstracts 
                SELECT df_batch.* 
                FROM df_batch
                INNER JOIN valid_corpusids 
                ON CAST(df_batch.corpusid AS VARCHAR) = valid_corpusids.corpusid
            """)
        else:
            # Insert all without filtering
            conn.execute("""
                INSERT OR IGNORE INTO abstracts 
                SELECT * FROM df_batch
            """)
        
        # Get count after insertion to determine actual rows inserted
        count_after = conn.execute("SELECT COUNT(*) FROM abstracts").fetchone()[0]
        inserted_count = count_after - count_before
    
    return inserted_count

### Processing Functions

Define async functions to:
1. Process batches of records and filter by valid corpusid
2. Stream and decompress files on-the-fly
3. Orchestrate parallel downloads with rate limiting

In [None]:
async def stream_and_process_file(session, url, index, semaphore, pbar_position):
    """Stream file, decompress on-the-fly, and process JSONL records as they arrive"""
    async with semaphore:
        pbar = None
        
        # Update status to 'processing' and set started_at
        conn.execute("""
            UPDATE file_metadata 
            SET status = 'processing', 
                started_at = CURRENT_TIMESTAMP,
                last_updated = CURRENT_TIMESTAMP
            WHERE file_index = ?
        """, [index])
        
        try:
            timeout = aiohttp.ClientTimeout(total=30000, connect=10, sock_read=60)
            async with session.get(url, timeout=timeout, headers = API_HEADERS) as response:
                if response.status != 200:
                    # Update metadata with failure
                    conn.execute("""
                        UPDATE file_metadata 
                        SET status = 'failed',
                            error_message = ?,
                            completed_at = CURRENT_TIMESTAMP,
                            last_updated = CURRENT_TIMESTAMP
                        WHERE file_index = ?
                    """, [f"HTTP {response.status}", index])
                    
                    return {
                        'index': index,
                        'status': 'failed',
                        'error': f"HTTP {response.status}",
                        'records_processed': 0,
                        'records_inserted': 0
                    }
                
                # Create progress bar for this file
                pbar = tqdm(
                    total=0,  # We don't know total size initially
                    desc=f"File {index}",
                    position=pbar_position,
                    leave=True,
                    unit='rec',
                    unit_scale=True,
                    ncols=100,
                    mininterval=1.0  # Update at most once per second
                )
                
                # Stream and decompress in chunks
                decompressor = gzip.GzipFile(fileobj=BytesIO())
                buffer = b''
                records_processed = 0
                records_inserted = 0
                chunk_num = 0
                total_bytes = 0
                records_batch = []
                
                async for compressed_chunk in response.content.iter_chunked(CHUNK_SIZE):
                    chunk_num += 1
                    total_bytes += len(compressed_chunk)
                    
                    # Decompress chunk
                    try:
                        import zlib
                        if chunk_num == 1:
                            decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
                        
                        decompressed = decompressor.decompress(compressed_chunk)
                        buffer += decompressed
                        
                        # Process complete lines from buffer
                        while b'\n' in buffer:
                            line, buffer = buffer.split(b'\n', 1)
                            if line.strip():
                                try:
                                    record = json.loads(line.decode('utf-8'))
                                    records_batch.append(record)
                                    records_processed += 1
                                    
                                    # Process in batches
                                    if len(records_batch) >= RECORDS_PER_BATCH:
                                        inserted = await process_records_batch(records_batch, index)
                                        records_inserted += inserted
                                        
                                        # Update metadata periodically
                                        conn.execute("""
                                            UPDATE file_metadata 
                                            SET records_processed = ?,
                                                records_inserted = ?,
                                                chunks_processed = ?,
                                                size_mb = ?,
                                                last_updated = CURRENT_TIMESTAMP
                                            WHERE file_index = ?
                                        """, [records_processed, records_inserted, chunk_num, 
                                              total_bytes/1024/1024, index])
                                        
                                        # Update progress bar
                                        pbar.total = records_processed
                                        pbar.n = records_processed
                                        pbar.set_postfix({
                                            'inserted': f'{records_inserted:,}',
                                            'chunks': chunk_num,
                                            'MB': f'{total_bytes/1024/1024:.1f}'
                                        })
                                        pbar.refresh()
                                        
                                        records_batch = []
                                        
                                except json.JSONDecodeError:
                                    continue
                    except Exception as e:
                        # Continue with next chunk if decompression fails
                        continue
                
                # Process remaining records in buffer
                if buffer.strip():
                    try:
                        record = json.loads(buffer.decode('utf-8'))
                        records_batch.append(record)
                        records_processed += 1
                    except:
                        pass
                
                if records_batch:
                    inserted = await process_records_batch(records_batch, index)
                    records_inserted += inserted
                
                # Final metadata update with success status
                conn.execute("""
                    UPDATE file_metadata 
                    SET status = 'success',
                        records_processed = ?,
                        records_inserted = ?,
                        chunks_processed = ?,
                        size_mb = ?,
                        completed_at = CURRENT_TIMESTAMP,
                        last_updated = CURRENT_TIMESTAMP,
                        error_message = NULL
                    WHERE file_index = ?
                """, [records_processed, records_inserted, chunk_num, 
                      total_bytes/1024/1024, index])
                
                # Final update
                pbar.total = records_processed
                pbar.n = records_processed
                pbar.set_postfix({
                    'inserted': f'{records_inserted:,}',
                    'chunks': chunk_num,
                    'MB': f'{total_bytes/1024/1024:.1f}',
                    'status': 'done'
                })
                pbar.refresh()
                pbar.close()
                
                return {
                    'index': index,
                    'status': 'success',
                    'records_processed': records_processed,
                    'records_inserted': records_inserted,
                    'chunks': chunk_num,
                    'size_mb': total_bytes / 1024 / 1024
                }
                
        except asyncio.TimeoutError:
            error_msg = 'Timeout'
            conn.execute("""
                UPDATE file_metadata 
                SET status = 'failed',
                    error_message = ?,
                    completed_at = CURRENT_TIMESTAMP,
                    last_updated = CURRENT_TIMESTAMP
                WHERE file_index = ?
            """, [error_msg, index])
            
            if pbar:
                pbar.set_postfix({'status': 'Timeout'})
                pbar.close()
            return {
                'index': index,
                'status': 'failed',
                'error': 'Timeout',
                'records_processed': 0,
                'records_inserted': 0
            }
        except Exception as e:
            error_msg = str(e)[:200]
            conn.execute("""
                UPDATE file_metadata 
                SET status = 'failed',
                    error_message = ?,
                    completed_at = CURRENT_TIMESTAMP,
                    last_updated = CURRENT_TIMESTAMP
                WHERE file_index = ?
            """, [error_msg, index])
            
            if pbar:
                pbar.set_postfix({'status': f'{str(e)[:20]}'})
                pbar.close()
            return {
                'index': index,
                'status': 'failed',
                'error': str(e)[:200],
                'records_processed': 0,
                'records_inserted': 0
            }
        finally:
            await asyncio.sleep(DELAY_BETWEEN_FILES)

In [None]:
async def download_all_files(urls, file_indices):
    """Download and process all files in batches with delays"""
    semaphore = asyncio.Semaphore(BATCH_SIZE)
    results = []
    
    # Create session with connector settings
    connector = aiohttp.TCPConnector(limit=BATCH_SIZE, limit_per_host=BATCH_SIZE)
    timeout = aiohttp.ClientTimeout(total=300000)
    
    async with aiohttp.ClientSession(connector=connector, timeout=timeout, headers=API_HEADERS) as session:
        total_batches = (len(urls) + BATCH_SIZE - 1) // BATCH_SIZE
        
        for batch_num in range(total_batches):
            start_idx = batch_num * BATCH_SIZE
            end_idx = min((batch_num + 1) * BATCH_SIZE, len(urls))
            batch_urls = urls[start_idx:end_idx]
            batch_indices = file_indices[start_idx:end_idx]
            
            print(f"\n{'='*70}")
            print(f"Batch {batch_num + 1}/{total_batches} (files {batch_indices[0]}-{batch_indices[-1]})")
            print(f"{'='*70}\n")
            
            # Create tasks for this batch with proper pbar positioning
            tasks = [
                stream_and_process_file(session, url, file_idx, semaphore, i)
                for i, (url, file_idx) in enumerate(zip(batch_urls, batch_indices))
            ]
            
            # Download this batch
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle any exceptions
            for i, result in enumerate(batch_results):
                if isinstance(result, Exception):
                    batch_results[i] = {
                        'index': batch_indices[i],
                        'status': 'failed',
                        'error': str(result)[:200],
                        'records_processed': 0,
                        'records_inserted': 0
                    }
            
            results.extend(batch_results)
            
            # Count records from this batch
            batch_processed = sum(r.get('records_processed', 0) for r in batch_results if isinstance(r, dict))
            batch_inserted = sum(r.get('records_inserted', 0) for r in batch_results if isinstance(r, dict))
            
            # Get total count from database
            total_in_db = conn.execute("SELECT COUNT(*) FROM abstracts").fetchone()[0]
            
            # Get overall processing status
            status_counts = conn.execute("""
                SELECT status, COUNT(*) as count 
                FROM file_metadata 
                GROUP BY status
            """).df()
            
            # Show progress for this batch
            successful = sum(1 for r in batch_results if isinstance(r, dict) and r['status'] == 'success')
            failed = len(batch_results) - successful
            
            print(f"\n{'='*70}")
            print(f"Batch {batch_num + 1} Summary:")
            print(f"   Success: {successful}, Failed: {failed}")
            print(f"   Processed: {batch_processed:,}")
            print(f"   Inserted: {batch_inserted:,}")
            print(f"   Total in DB: {total_in_db:,}")
            print(f"\n   Overall Status:")
            for _, row in status_counts.iterrows():
                print(f"   {row['status']:>12}: {row['count']:>5}")
            print(f"{'='*70}")
            
            # Delay before next batch (except after the last batch)
            if batch_num < total_batches - 1:
                print(f"\nWaiting {DELAY_BETWEEN_BATCHES}s before next batch...\n")
                await asyncio.sleep(DELAY_BETWEEN_BATCHES)
    
    return results

### Execute Pipeline

In [None]:
# Start the download and processing
    
TEST_LIMIT = None  # Set to None to process all files
RESUME_MODE = True  # Set to True to skip already processed files
REFRESH_URLS = True  # Set to True to get fresh URLs before processing

print("Starting streaming download and processing...")
print(f"   Writing directly to DuckDB at: {DB_PATH}")
print(f"   Processing abstracts with corpusid, externalids, and abstract")
print(f"   Resume mode: {RESUME_MODE}\n")

# Refresh URLs if needed (to handle expired tokens)
if REFRESH_URLS:
    print("Refreshing URLs with new tokens...")
    update_expired_urls()
    print()

# Get list of files to process based on their status
if RESUME_MODE:
    # Only process files that are pending or failed
    pending_files = conn.execute("""
        SELECT file_index, file_url 
        FROM file_metadata 
        WHERE status IN ('pending', 'failed')
        ORDER BY file_index
    """).df()
    
    if TEST_LIMIT:
        pending_files = pending_files.head(TEST_LIMIT)
    
    file_indices = pending_files['file_index'].tolist()
    urls_to_process = pending_files['file_url'].tolist()
    
    print(f"Resume mode: Found {len(urls_to_process)} files to process")
    print(f"   (Skipping already successful files)\n")
else:
    # Process all files (or limited set for testing)
    if TEST_LIMIT:
        urls_to_process = abstracts_dataset['files'][:TEST_LIMIT]
        file_indices = list(range(TEST_LIMIT))
    else:
        urls_to_process = abstracts_dataset['files']
        file_indices = list(range(len(abstracts_dataset['files'])))
    
    print(f"Processing {len(urls_to_process)} files from scratch\n")

if len(urls_to_process) == 0:
    print("All files already processed!")
else:
    start_time = time_module.time()
    
    results = await download_all_files(urls_to_process, file_indices)
    
    # Summary
    end_time = time_module.time()
    elapsed = end_time - start_time
    successful = sum(1 for r in results if r['status'] == 'success')
    failed = len(results) - successful
    total_size = sum(r.get('size_mb', 0) for r in results if r['status'] == 'success')
    total_processed = sum(r.get('records_processed', 0) for r in results if r['status'] == 'success')
    total_inserted = sum(r.get('records_inserted', 0) for r in results if r['status'] == 'success')
    
    # Get final count from database
    final_count = conn.execute("SELECT COUNT(*) FROM abstracts").fetchone()[0]
    
    # Get overall status summary
    overall_status = conn.execute("""
        SELECT 
            status,
            COUNT(*) as count,
            SUM(records_processed) as total_processed,
            SUM(records_inserted) as total_inserted,
            SUM(size_mb) as total_mb
        FROM file_metadata
        GROUP BY status
        ORDER BY count DESC
    """).df()
    
    print(f"\n{'='*70}")
    print(f"Final Summary")
    print(f"{'='*70}")
    print(f"Files in this run: {len(results)}")
    print(f"Successful: {successful}")
    print(f"Failed: {failed}")
    print(f"Records processed: {total_processed:,}")
    print(f"Records inserted: {total_inserted:,}")
    print(f"Total abstracts in DB: {final_count:,}")
    print(f"Data processed: {total_size:.2f} MB")
    print(f"Time elapsed: {elapsed:.2f}s")
    if elapsed > 0 and total_processed > 0:
        print(f"Processing rate: {total_processed/elapsed:.1f} records/sec")
    if total_processed > 0:
        print(f"Insertion rate: {total_inserted/total_processed*100:.1f}% (records with valid abstracts)")
    
    print(f"\n{'='*70}")
    print(f"Overall Status (All Files)")
    print(f"{'='*70}")
    display(overall_status)
    
    # Show failed downloads if any
    if failed > 0:
        print(f"\nFailed downloads in this run:")
        for r in results:
            if r['status'] == 'failed':
                print(f"   - File {r['index']}: {r.get('error', 'Unknown error')}")

 Starting streaming download and processing...
   Writing directly to DuckDB at: db/ss_abstracts.db
   Processing abstracts with corpusid, externalids, and abstract
   Resume mode: True

 Refreshing URLs with new tokens...
 Updating URLs with fresh tokens...
 Updated 30 URLs in metadata table

 Resume mode: Found 30 files to process
   (Skipping already successful files)


 Batch 1/15 (files 0-1)

 Updated 30 URLs in metadata table

 Resume mode: Found 30 files to process
   (Skipping already successful files)


 Batch 1/15 (files 0-1)



File 0: 0.00rec [00:00, ?rec/s]

File 1: 0.00rec [00:00, ?rec/s]


 Batch 1 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,477,221
    Inserted: 541,954
    Total in DB: 541,954

   Overall Status:
        pending:    28
        success:     2

 Waiting 2s before next batch...


 Batch 2/15 (files 2-3)


 Batch 2/15 (files 2-3)



File 2: 0.00rec [00:00, ?rec/s]

File 3: 0.00rec [00:00, ?rec/s]


 Batch 2 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,439,680
    Inserted: 533,694
    Total in DB: 1,075,648

   Overall Status:
        pending:    26
        success:     4

 Waiting 2s before next batch...


 Batch 3/15 (files 4-5)


 Batch 3/15 (files 4-5)



File 4: 0.00rec [00:00, ?rec/s]

File 5: 0.00rec [00:00, ?rec/s]


 Batch 3 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,409,028
    Inserted: 527,137
    Total in DB: 1,602,785

   Overall Status:
        success:     6
        pending:    24

 Waiting 2s before next batch...


 Batch 4/15 (files 6-7)


 Batch 4/15 (files 6-7)



File 6: 0.00rec [00:00, ?rec/s]

File 7: 0.00rec [00:00, ?rec/s]


 Batch 4 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,462,040
    Inserted: 540,239
    Total in DB: 2,143,024

   Overall Status:
        pending:    22
        success:     8

 Waiting 2s before next batch...


 Batch 5/15 (files 8-9)


 Batch 5/15 (files 8-9)



File 9: 0.00rec [00:00, ?rec/s]

File 8: 0.00rec [00:00, ?rec/s]


 Batch 5 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,433,038
    Inserted: 532,626
    Total in DB: 2,675,650

   Overall Status:
        pending:    20
        success:    10

 Waiting 2s before next batch...


 Batch 6/15 (files 10-11)


 Batch 6/15 (files 10-11)



File 10: 0.00rec [00:00, ?rec/s]

File 11: 0.00rec [00:00, ?rec/s]


 Batch 6 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,508,339
    Inserted: 550,155
    Total in DB: 3,225,805

   Overall Status:
        pending:    18
        success:    12

 Waiting 2s before next batch...


 Batch 7/15 (files 12-13)


 Batch 7/15 (files 12-13)



File 12: 0.00rec [00:00, ?rec/s]

File 13: 0.00rec [00:00, ?rec/s]


 Batch 7 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,408,611
    Inserted: 527,967
    Total in DB: 3,753,772

   Overall Status:
        success:    14
        pending:    16

 Waiting 2s before next batch...


 Batch 8/15 (files 14-15)


 Batch 8/15 (files 14-15)



File 14: 0.00rec [00:00, ?rec/s]

File 15: 0.00rec [00:00, ?rec/s]


 Batch 8 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,429,232
    Inserted: 532,091
    Total in DB: 4,285,863

   Overall Status:
        success:    16
        pending:    14

 Waiting 2s before next batch...


 Batch 9/15 (files 16-17)


 Batch 9/15 (files 16-17)



File 16: 0.00rec [00:00, ?rec/s]

File 17: 0.00rec [00:00, ?rec/s]


 Batch 9 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,474,544
    Inserted: 542,042
    Total in DB: 4,827,905

   Overall Status:
        pending:    12
        success:    18

 Waiting 2s before next batch...


 Batch 10/15 (files 18-19)


 Batch 10/15 (files 18-19)



File 18: 0.00rec [00:00, ?rec/s]

File 19: 0.00rec [00:00, ?rec/s]


 Batch 10 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,437,431
    Inserted: 534,916
    Total in DB: 5,362,821

   Overall Status:
        success:    20
        pending:    10

 Waiting 2s before next batch...


 Batch 11/15 (files 20-21)


 Batch 11/15 (files 20-21)



File 20: 0.00rec [00:00, ?rec/s]

File 21: 0.00rec [00:00, ?rec/s]


 Batch 11 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,415,036
    Inserted: 528,273
    Total in DB: 5,891,094

   Overall Status:
        pending:     8
        success:    22

 Waiting 2s before next batch...


 Batch 12/15 (files 22-23)


 Batch 12/15 (files 22-23)



File 22: 0.00rec [00:00, ?rec/s]

File 23: 0.00rec [00:00, ?rec/s]

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))


 Batch 12 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,456,991
    Inserted: 538,690
    Total in DB: 6,429,784

   Overall Status:
        pending:     6
        success:    24

 Waiting 2s before next batch...


 Batch 13/15 (files 24-25)


 Batch 13/15 (files 24-25)



File 25: 0.00rec [00:00, ?rec/s]

File 24: 0.00rec [00:00, ?rec/s]


 Batch 13 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,473,658
    Inserted: 541,882
    Total in DB: 6,971,666

   Overall Status:
        success:    26
        pending:     4

 Waiting 2s before next batch...


 Batch 14/15 (files 26-27)


 Batch 14/15 (files 26-27)



File 27: 0.00rec [00:00, ?rec/s]

File 26: 0.00rec [00:00, ?rec/s]


 Batch 14 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,465,858
    Inserted: 539,764
    Total in DB: 7,511,430

   Overall Status:
        success:    28
        pending:     2

 Waiting 2s before next batch...


 Batch 15/15 (files 28-29)


 Batch 15/15 (files 28-29)



File 28: 0.00rec [00:00, ?rec/s]

File 29: 0.00rec [00:00, ?rec/s]


 Batch 15 Summary:
   done Success: 2, failed Failed: 0
    Processed: 2,429,936
    Inserted: 531,964
    Total in DB: 8,043,394

   Overall Status:
        success:    30

 Final Summary
Files in this run: 30
done Successful: 30
failed Failed: 0
 Records processed: 36,720,643
 Records inserted: 8,043,394
  Total abstracts in DB: 8,043,394
 Data processed: 20008.64 MB
  Time elapsed: 4876.88s
 Processing rate: 7529.5 records/sec
 Insertion rate: 21.9% (records with valid abstracts)

 Overall Status (All Files)


Unnamed: 0,status,count,total_processed,total_inserted,total_mb
0,success,30,36720643.0,8043394.0,20008.640259


## 4. Data Validation & Inspection

Verify collected data and review processing status.

In [15]:
# Check the data in the database
print("Sample of inserted records:")
sample_abstracts = conn.execute("SELECT * FROM abstracts LIMIT 100 offset 1190").df()

Sample of inserted records:


In [16]:
sample_abstracts

Unnamed: 0,corpusid,externalids,abstract
0,279391632,"{""Medline"": null, ""MAG"": null, ""ACL"": null, ""D...",While particles cannot travel faster than the ...
1,235434103,"{""Medline"": null, ""MAG"": null, ""ACL"": null, ""D...",In information and communication technologies ...
2,246996905,"{""Medline"": null, ""MAG"": ""3202314026"", ""ACL"": ...",The current state-of-the-art thermoelectric ma...
3,139237486,"{""Medline"": null, ""MAG"": ""2764288934"", ""ACL"": ...",Optofluidics has recently become a new active ...
4,218797241,"{""Medline"": null, ""MAG"": ""3017021220"", ""ACL"": ...",The purpose of this work is to present the rig...
...,...,...,...
95,273810435,"{""Medline"": null, ""MAG"": null, ""ACL"": null, ""D...","Language models (LMs), which are neural sequen..."
96,119210083,"{""Medline"": null, ""MAG"": ""2020327212"", ""ACL"": ...",In considering alternative higher-order gravit...
97,230571262,"{""Medline"": null, ""MAG"": ""3110972490"", ""ACL"": ...",Diabetic retinopathy (DR) is a disease with an...
98,36033579,"{""Medline"": ""18958046v1"", ""MAG"": ""2011248755"",...",We analytically and numerically analyze the fl...


In [None]:
# View file processing metadata
print("File processing status:")
conn.execute("""
    SELECT 
        file_index,
        status,
        records_processed,
        records_inserted,
        chunks_processed,
        ROUND(size_mb, 2) as size_mb,
        error_message,
        started_at,
        completed_at
    FROM file_metadata
    ORDER BY file_index
""").df()

File processing status:


Unnamed: 0,file_index,status,records_processed,records_inserted,chunks_processed,size_mb,error_message,started_at,completed_at
0,0,success,1223804,267655,10256,666.609985,,2025-12-09 23:44:56.418780,2025-12-09 23:49:07.931758
1,1,success,1253417,274299,9478,683.099976,,2025-12-09 23:44:56.419633,2025-12-09 23:48:50.683057
2,2,success,1219913,266612,8180,665.380005,,2025-12-09 23:49:10.444947,2025-12-09 23:53:21.438248
3,3,success,1219767,267082,7526,664.599976,,2025-12-09 23:49:10.446687,2025-12-09 23:52:56.463451
4,4,success,1202302,263141,7736,655.01001,,2025-12-09 23:53:23.956461,2025-12-09 23:57:26.369938
5,5,success,1206726,263996,8759,657.640015,,2025-12-09 23:53:23.959889,2025-12-09 23:57:59.353440
6,6,success,1196345,262185,8953,651.97998,,2025-12-09 23:58:01.864413,2025-12-10 00:02:51.439880
7,7,success,1265695,278054,8283,689.830017,,2025-12-09 23:58:01.869943,2025-12-10 00:02:28.828099
8,8,success,1212638,265517,6427,660.450012,,2025-12-10 00:02:53.952433,2025-12-10 00:07:22.542946
9,9,success,1220400,267109,6678,664.710022,,2025-12-10 00:02:53.957800,2025-12-10 00:07:26.137784


In [50]:
# View failed files with details
print("Failed files (if any):")
conn.execute("""
    SELECT 
        file_index,
        error_message,
        records_processed,
        chunks_processed,
        started_at,
        completed_at
    FROM file_metadata
    WHERE status = 'failed'
    ORDER BY file_index
""").df()

Failed files (if any):


Unnamed: 0,file_index,error_message,records_processed,chunks_processed,started_at,completed_at


In [18]:
# Check statistics on abstracts
print("Abstracts statistics:")
conn.execute("""
    SELECT 
        COUNT(*) as total_abstracts,
        COUNT(DISTINCT corpusid) as unique_corpusids,
        AVG(LENGTH(abstract)) as avg_abstract_length,
        MAX(LENGTH(abstract)) as max_abstract_length,
        MIN(LENGTH(abstract)) as min_abstract_length
    FROM abstracts
""").df()

Abstracts statistics:


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,total_abstracts,unique_corpusids,avg_abstract_length,max_abstract_length,min_abstract_length
0,8043394,8043394,1178.508859,10000,1


### Abstracts Statistics & Search

In [21]:
# Search for abstracts containing specific keywords
conn.execute("""
    SELECT corpusid, externalids, SUBSTRING(abstract, 1, 200) as abstract_preview
    FROM abstracts 
    WHERE LOWER(abstract) LIKE '%neural network%'
    LIMIT 10
""").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,corpusid,externalids,abstract_preview
0,182227807,"{""Medline"": null, ""MAG"": ""2944881968"", ""ACL"": ...","AbstractIn a time span of over 3,000 years, th..."
1,225097123,"{""Medline"": null, ""MAG"": ""3047590649"", ""ACL"": ...",In order to echo with fractional-order smoothn...
2,125274045,"{""Medline"": null, ""MAG"": ""2799571132"", ""ACL"": ...","In the present Paper, a boundary element solve..."
3,219324182,"{""Medline"": null, ""MAG"": ""3025440078"", ""ACL"": ...",After stroke rehabilitation is a long-term rel...
4,188992481,"{""Medline"": null, ""MAG"": ""2891483086"", ""ACL"": ...",The Hundred and One Dalmatians: Or the Great D...
...,...,...,...
62953,102509345,"{""Medline"": null, ""MAG"": ""2754184649"", ""ACL"": ...",We have found the ferroelectric-like character...
62954,118577492,"{""Medline"": null, ""MAG"": ""2952527563"", ""ACL"": ...",The most direct definition of a patterning pro...
62955,216249897,"{""Medline"": null, ""MAG"": ""3015226030"", ""ACL"": ...",Purpose Admissions committees rely heavily on ...
62956,209917350,"{""Medline"": null, ""MAG"": ""2986133968"", ""ACL"": ...",Monolayer silicon phosphide (SiP) and germaniu...


### Cleanup

In [25]:
conn.close()