## Phase 2: arXiv API Integration & PDF Processing

### Core Objectives
- arXiv API Integration: Build a robust client with rate limiting and retry logic
- PDF Processing Pipeline: Download and parse scientific PDFs with structured content extraction
- Database Storage: Persist paper metadata and content in PostgreSQL
- Error Handling: Implement comprehensive error handling and graceful degradation
- Automation Ready: Prepare components for Airflow orchestration

### ðŸ”§ What We'll Test In This Notebook
- arXiv API Client - Fetch CS.AI papers with proper rate limiting
- PDF Download System - Download and cache PDFs with error handling
- Docling PDF Parser - Extract structured content (sections, tables, figures)
- Database Integration - Store and retrieve papers from PostgreSQL
- Complete Pipeline - End-to-end processing from arXiv to database
- Production Readiness - Error handling, logging, and performance metrics
### ðŸ“Š Success Metrics
- arXiv API calls succeed with proper rate limiting
- PDF download and caching works reliably
- Docling extracts structured content from scientific PDFs
- Database stores complete paper metadata
- Pipeline handles errors gracefully and continues processing
- All components ready for Airflow automation (Week 2+)


In [None]:
# Check if Fresh Containers are Built and All Services Healthy
import subprocess
import requests
from pathlib import Path

print("WEEK 2 CONTAINER & SERVICE HEALTH CHECK")
print("=" * 50)

# Find project root
current_dir = Path.cwd()
if current_dir.name == "notebooks" and current_dir.parent.name == "ArxivPaperCurator":
    project_root = current_dir.parent
elif (current_dir / "compose.yml").exists():
    project_root = current_dir
else:
    print("âœ— Could not find project root")
    exit()

print(f"Project root: {project_root}")



WEEK 2 CONTAINER & SERVICE HEALTH CHECK
Project root: /teamspace/studios/this_studio/ArxivPaperCurator


In [2]:
# Environment Check
import sys
from pathlib import Path

print(f"Python Version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Environment: {sys.executable}")

Python Version: 3.13.7
Environment: /teamspace/studios/this_studio/ArxivPaperCurator/.venv/bin/python


### Service Health Verification

In [3]:
# Step 1: Check if containers are built and running
print("\n1. Checking container status...")
try:
    result = subprocess.run(
        ["docker", "compose", "ps", "--format", "table"],
        cwd=str(project_root),
        capture_output=True,
        text=True,
        timeout=10
    )
    
    if result.returncode == 0 and result.stdout.strip():
        print("âœ“ Containers are running:")
        for line in result.stdout.strip().split('\n'):
            print(f"   {line}")
    else:
        print("âœ— No containers running or docker compose failed")
        print("Please run the build commands from the markdown cell above")
        exit()
        
except Exception as e:
    print(f"âœ— Error checking containers: {e}")
    print("Please run the build commands from the markdown cell above")
    exit()


1. Checking container status...
âœ“ Containers are running:
   NAME                        IMAGE                                            COMMAND                   SERVICE                 CREATED       STATUS                   PORTS
   rag-airflow                 apache/airflow:3.0.3                             "/bin/bash -c '\n  piâ€¦"   airflow                 2 hours ago   Up 2 hours (unhealthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp
   rag-api                     arxivpapercurator-api                            "uvicorn src.main:apâ€¦"    api                     2 hours ago   Up 2 hours (unhealthy)   0.0.0.0:8000->8000/tcp, :::8000->8000/tcp
   rag-ollama                  ollama/ollama:0.11.2                             "/bin/ollama serve"       ollama                  2 hours ago   Up 2 hours (healthy)     0.0.0.0:11434->11434/tcp, :::11434->11434/tcp
   rag-opensearch              opensearchproject/opensearch:2.19.0              "./opensearch-dockerâ€¦"    opensearch     

In [4]:
# Step 2: Check all service health (corrected endpoints)
print("\n2. Checking service health...")
services_to_test = {
    "FastAPI": "https://8000-01khp40rhz7zc0vjm8t9gsh5j1.cloudspaces.litng.ai/api/v1/ping/health",
    "PostgreSQL (via API)": "https://8000-01khp40rhz7zc0vjm8t9gsh5j1.cloudspaces.litng.ai/api/v1/ping/health", 
    "Ollama": "https://11434-01khp40rhz7zc0vjm8t9gsh5j1.cloudspaces.litng.ai/api/version",
    "OpenSearch": "https://9200-01khp40rhz7zc0vjm8t9gsh5j1.cloudspaces.litng.ai/_cluster/health",
    "Airflow": "https://8080-01khp40rhz7zc0vjm8t9gsh5j1.cloudspaces.litng.ai/api/v2/monitor/health"
}

all_healthy = True
for service_name, url in services_to_test.items():
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            print(f"âœ“ {service_name}: Healthy")
        else:
            print(f"âœ— {service_name}: HTTP {response.status_code}")
            all_healthy = False
    except requests.exceptions.ConnectionError:
        print(f"âœ— {service_name}: Not accessible")
        all_healthy = False
    except Exception as e:
        print(f"âœ— {service_name}: {type(e).__name__}")
        all_healthy = False

print("\n" + "=" * 50)
if all_healthy:
    print("âœ“ ALL SERVICES HEALTHY! Ready for Week 2 development.")
else:
    print("âœ— Some services need attention.")
    print("If you just rebuilt containers, wait 1-2 minutes and run this cell again.")
    print("Airflow and OpenSearch take longest to start up.")


2. Checking service health...
âœ“ FastAPI: Healthy
âœ“ PostgreSQL (via API): Healthy
âœ“ Ollama: Healthy
âœ“ OpenSearch: Healthy


âœ“ Airflow: Healthy

âœ“ ALL SERVICES HEALTHY! Ready for Week 2 development.


In [5]:
airflow_password = "FkfGnM9ZPup7YTre"
airflow_usernmae = "admin"

In [6]:
%pwd

'/teamspace/studios/this_studio/ArxivPaperCurator/notebooks'

In [7]:
%cd ..


/teamspace/studios/this_studio/ArxivPaperCurator


In [None]:
import asyncio
from datetime import datetime, timedelta

# Import our arXiv client
from src.services.arxiv.factory import make_arxiv_client

print("TESTING ARXIV API CLIENT")
print("=" * 40)

# Create client
arxiv_client = make_arxiv_client()
print(f"âœ“ Client created: {arxiv_client.base_url}")
print(f"   Rate limit: {arxiv_client.rate_limit_delay}s")
print(f"   Max results: {arxiv_client.max_results_per_query}")
print(f"   Category: {arxiv_client.search_category}")
print()

TESTING ARXIV API CLIENT
âœ“ Client created: https://export.arxiv.org/api/query
   Rate limit: 3.0s
   Max results: 100
   Category: cs.AI



In [None]:
async def test_paper_fetching():
    """Test fetching papers from arXiv with rate limiting."""
    
    print("Test 1: Fetch Recent CS.AI Papers")
    try:
        papers = await arxiv_client.fetch_papers(
            max_results=2, 
            sort_by="submittedDate",
            sort_order="descending"
        )
        
        print(f"âœ“ Fetched {len(papers)} papers")
        
        if papers:
            for i, paper in enumerate(papers[:2], 1):
                print(f"   {i}. [{paper.arxiv_id}] {paper.title[:60]}...")
                print(f"      Authors: {', '.join(paper.authors[:2])}{'...' if len(paper.authors) > 2 else ''}")
                print(f"      Categories: {', '.join(paper.categories)}")
                print(f"      Published: {paper.published_date}")
                print()
        
        return papers
        
    except Exception as e:
        print(f"âœ— Error fetching papers: {e}")
        if "503" in str(e):
            print("   arXiv API temporarily unavailable (normal)")
            print("   Rate limiting and error handling working correctly")
        return []

# Run the test
papers = await test_paper_fetching()

Test 1: Fetch Recent CS.AI Papers
âœ“ Fetched 2 papers
   1. [2602.16708v1] Policy Compiler for Secure Agentic Systems...
      Authors: Nils Palumbo, Sarthak Choudhary...
      Categories: 
      Published: 2026-02-18T18:57:12Z

   2. [2602.16703v1] Measuring Mid-2025 LLM-Assistance on Novice Performance in B...
      Authors: Shen Zhou Hong, Alex Kleinman...
      Categories: 
      Published: 2026-02-18T18:51:28Z



In [10]:
# Test Date Filtering
async def test_date_filtering():
    """Test date range filtering functionality."""
    
    print("Test 2: Date Range Filtering")
    
    # Use specific dates: 
    from_date = "20250808"  
    to_date = "20250809"    
    try:
        date_papers = await arxiv_client.fetch_papers(
            max_results=2,
            from_date=from_date,
            to_date=to_date
        )
        
        print(f"âœ“ Date filtering test: {len(date_papers)} papers from {from_date}-{to_date}")
        
        if date_papers:
            for i, paper in enumerate(date_papers, 1):
                print(f"   {i}. [{paper.arxiv_id}] {paper.title[:60]}...")
                print(f"      Authors: {', '.join(paper.authors[:2])}{'...' if len(paper.authors) > 2 else ''}")
                print(f"      Categories: {', '.join(paper.categories)}")
                print(f"      Published: {paper.published_date}")
                print()
        
        return date_papers
        
    except Exception as e:
        print(f"âœ— Date filtering error: {e}")
        return []

# Run date filtering test
date_papers = await test_date_filtering()

Test 2: Date Range Filtering


âœ“ Date filtering test: 2 papers from 20250808-20250809
   1. [2508.07111v1] Investigating Intersectional Bias in Large Language Models u...
      Authors: Falaah Arif Khan, Nivedha Sivakumar...
      Categories: 
      Published: 2025-08-09T22:24:40Z

   2. [2508.07107v2] Designing a Feedback-Driven Decision Support System for Dyna...
      Authors: Timothy Oluwapelumi Adeyemi, Nadiah Fahad AlOtaibi
      Categories: 
      Published: 2025-08-09T21:24:54Z



In [11]:
date_papers

[ArxivPaper(arxiv_id='2508.07111v1', title='Investigating Intersectional Bias in Large Language Models using Confidence Disparities in Coreference Resolution', authors=['Falaah Arif Khan', 'Nivedha Sivakumar', 'Yinong Oliver Wang', 'Katherine Metcalf', 'Cezanne Camacho', 'Barry-John Theobald', 'Luca Zappella', 'Nicholas Apostoloff'], abstract='Large language models (LLMs) have achieved impressive performance, leading to their widespread adoption as decision-support tools in resource-constrained contexts like hiring and admissions. There is, however, scientific consensus that AI systems can reflect and exacerbate societal biases, raising concerns about identity-based harm when used in critical social contexts. Prior work has laid a solid foundation for assessing bias in LLMs by evaluating demographic disparities in different language reasoning tasks. In this work, we extend single-axis fairness evaluations to examine intersectional bias, recognizing that when multiple axes of discrimina

In [12]:
# Test PDF Download
async def test_pdf_download(test_papers):
    """Test PDF downloading with caching."""

    print("Test 3: PDF Download & Caching")
    
    if not test_papers:
        print("No papers available for PDF download test")
        return None
    
    # Test with first paper
    test_paper = test_papers[0]
    print(f"Testing PDF download for: {test_paper.arxiv_id}")
    print(f"Title: {test_paper.title[:60]}...")
    
    try:
        # Download PDF 
        pdf_path = await arxiv_client.download_pdf(test_paper)
        
        if pdf_path and pdf_path.exists():
            size_mb = pdf_path.stat().st_size / (1024 * 1024)
            print(f"âœ“ PDF downloaded: {pdf_path.name} ({size_mb:.2f} MB)")
            
            return pdf_path
        else:
            print("âœ— PDF download failed")
            return None
            
    except Exception as e:
        print(f"âœ— PDF download error: {e}")
        return None

# Run PDF download test 
pdf_path = await test_pdf_download(date_papers[:1])

Test 3: PDF Download & Caching
Testing PDF download for: 2508.07111v1
Title: Investigating Intersectional Bias in Large Language Models u...
âœ“ PDF downloaded: 2508.07111v1.pdf (6.81 MB)


3. Docling PDF Processing
Test PDF parsing with Docling for structured content extraction:

In [13]:
# Test PDF Parsing with Docling
from src.services.pdf_parser.factory import make_pdf_parser_service
from src.config import get_settings
from pathlib import Path

print("Test 4: PDF Parsing with Docling")
print("=" * 40)

# Create PDF parser
pdf_parser = make_pdf_parser_service()
settings = get_settings()
print("PDF parser service created")
print(f"Config: {settings.pdf_parser.max_pages} pages, {settings.pdf_parser.max_file_size_mb}MB")

# Test parsing with actual PDF files
cache_dir = Path("data/arxiv_pdfs")
if cache_dir.exists():
    pdf_files = list(cache_dir.glob("*.pdf"))
    print(f"\nFound {len(pdf_files)} PDF files to test parsing")
    
    if pdf_files:
        # Test parsing the first PDF
        test_pdf = pdf_files[0]
        print(f"Testing PDF parsing with: {test_pdf.name}")
        
        try:
            pdf_content = await pdf_parser.parse_pdf(test_pdf)
            
            if pdf_content:
                print(f"âœ“ PDF parsing successful!")
                print(f"  Sections: {len(pdf_content.sections)}")
                print(f"  Raw text length: {len(pdf_content.raw_text)} characters")
                print(f"  Parser used: {pdf_content.parser_used}")
                
                # Show first section as example
                if pdf_content.sections:
                    first_section = pdf_content.sections[0]
                    print(f"  First section: '{first_section.title}' ({len(first_section.content)} chars)")
            else:
                print("âœ— PDF parsing failed (Docling compatibility issue)")
                print("This is expected - not all PDFs work with Docling")
                
        except Exception as e:
            print(f"âœ— PDF parsing error: {e}")
            print("This demonstrates the error handling in action")
    else:
        print("No PDF files available for parsing test")
else:
    print("No PDF cache directory found")

Test 4: PDF Parsing with Docling
PDF parser service created
Config: 30 pages, 20MB

Found 1 PDF files to test parsing
Testing PDF parsing with: 2508.07111v1.pdf


Parameter `strict_text` has been deprecated and will be ignored.


âœ“ PDF parsing successful!
  Sections: 23
  Raw text length: 85167 characters
  Parser used: ParserType.DOCLING
  First section: 'content' (84 chars)


In [14]:
pdf_content.sections[3].title

'1 Introduction'

In [None]:
# Test Database Storage
from src.db.factory import make_database
from src.repositories.paper import PaperRepository
from src.schemas.arxiv.paper import PaperCreate
from dateutil import parser as date_parser

print("Test 5: Database Storage")
print("=" * 40)

# Create database connection
database = make_database()
print("âœ“ Database connection created")

if papers:
    test_paper = papers[0]
    print(f"Storing paper: {test_paper.arxiv_id}")
    
    try:
        with database.get_session() as session:
            paper_repo = PaperRepository(session)
            
            # Convert to database format
            published_date = date_parser.parse(test_paper.published_date) if isinstance(test_paper.published_date, str) else test_paper.published_date
            
            paper_create = PaperCreate(
                arxiv_id=test_paper.arxiv_id,
                title=test_paper.title,
                authors=test_paper.authors,
                abstract=test_paper.abstract,
                categories=test_paper.categories,
                published_date=published_date,
                pdf_url=test_paper.pdf_url
            )
            
            # Store paper (upsert to avoid duplicates)
            stored_paper = paper_repo.upsert(paper_create)
            
            if stored_paper:
                print(f"âœ“ Paper stored with ID: {stored_paper.id}")
                print(f"   Database ID: {stored_paper.id}")
                print(f"   arXiv ID: {stored_paper.arxiv_id}")
                print(f"   Title: {stored_paper.title[:50]}...")
                print(f"   Authors: {len(stored_paper.authors)} authors")
                print(f"   Categories: {', '.join(stored_paper.categories)}")
                
                # Test retrieval
                retrieved_paper = paper_repo.get_by_arxiv_id(test_paper.arxiv_id)
                if retrieved_paper:
                    print(f"âœ“ Paper retrieval test passed")
                else:
                    print(f"âœ— Paper retrieval failed")
            else:
                print("âœ— Paper storage failed")
                
    except Exception as e:
        print(f"âœ— Database error: {e}")
else:
    print("No papers available for database storage test")

Session rollback because of exception: (psycopg2.errors.UndefinedColumn) column papers.pdf_processing_date does not exist
LINE 1: ...ed, papers.parser_metadata, papers.pdf_processed, papers.pdf...
                                                             ^
HINT:  Perhaps you meant to reference the column "papers.pdf_processing_data".

[SQL: SELECT papers.id, papers.arxiv_id, papers.title, papers.authors, papers.abstract, papers.categories, papers.published_date, papers.pdf_url, papers.raw_text, papers."references", papers.sections, papers.parser_used, papers.parser_metadata, papers.pdf_processed, papers.pdf_processing_date, papers.created_at, papers.updated_at 
FROM papers 
WHERE papers.arxiv_id = %(arxiv_id_1)s]
[parameters: {'arxiv_id_1': '2602.16708v1'}]
(Background on this error at: https://sqlalche.me/e/20/f405)


Test 5: Database Storage
âœ“ Database connection created
Storing paper: 2602.16708v1


In [None]:
# Test Complete Pipeline
from src.services.metadata_extractor import make_metadata_fetcher

print("Test 6: Complete Metadata Fetcher Pipeline")
print("=" * 50)

# Create metadata fetcher
metadata_fetcher = make_metadata_fetcher(arxiv_client, pdf_parser)
print("âœ“ Metadata fetcher service created")

# Test with small batch
print("Running small batch test (2 papers, no PDF processing for speed)...")

try:
    with database.get_session() as session:
        results = await metadata_fetcher.fetch_and_process_paper(
            max_results=2,  
            process_pdf=False,  
            store_to_db=True,
            db_session=session
        )
    
    print("\nPIPELINE RESULTS:")
    print(f"   Papers fetched: {results.get('papers_fetched', 0)}")
    print(f"   PDFs downloaded: {results.get('pdfs_downloaded', 0)}")
    print(f"   PDFs parsed: {results.get('pdfs_parsed', 0)}")
    print(f"   Papers stored: {results.get('papers_stored', 0)}")
    print(f"   Processing time: {results.get('processing_time', 0):.1f}s")
    print(f"   Errors: {len(results.get('errors', []))}")
    
    if results.get('errors'):
        print("\nErrors encountered:")
        for error in results.get('errors', [])[:3]:  # Show first 3 errors
            print(f"   - {error}")
    
    if results.get('papers_fetched', 0) > 0:
        print("\nâœ“ Pipeline test successful!")
    else:
        print("\nNo papers fetched - may be arXiv API unavailability")
        
except Exception as e:
    print(f"âœ— Pipeline error: {e}")

Critical error in metadata fetching pipeline: 'downloaded'
Pipeline execution: {'papers_fetched': 2, 'papers_downloaded': 0, 'pdf_parsed': 0, 'papers_stored': 0, 'errors': [], 'processing_time': 0.0}.
Session rollback because of exception: Critical error in metadata fetching pipeline: 'downloaded'


Test 6: Complete Metadata Fetcher Pipeline
âœ“ Metadata fetcher service created
Running small batch test (2 papers, no PDF processing for speed)...

PIPELINE RESULTS:
âœ— Pipeline error: name 'results' is not defined
