In [10]:
!pip install arxiv requests psutil memory_profiler
# !apt-get install -y file



In [11]:
%%writefile arxiv_crawler.py
import arxiv
import os
import re
import json
import time
import tarfile
import shutil
import subprocess
import gzip

SAVE_DIR = "./ArXivPapers"


def detect_and_fix_filetype(tar_path):
    """Detect file type using 'file' command or fallback to extension."""
    try:
        result = subprocess.run(["file", tar_path], capture_output=True, text=True, errors='ignore')
        output = result.stdout.strip()
    except FileNotFoundError:
        # Fallback: check extension
        if tar_path.endswith('.tar.gz') or tar_path.endswith('.tgz'):
            return "tar.gz", None
        elif tar_path.endswith('.gz'):
            return "gz", None
        return "unknown", None
    except Exception:
        return "unknown", None

    if "PDF document" in output:
        return "pdf", None
    elif "gzip compressed data" in output:
        match = re.search(r', was "([^"]+)"', output)
        if match:
            return "gz", os.path.basename(match.group(1))
        else:
            return "tar.gz", None
    elif "tar archive" in output:
        return "tar.gz", None
    else:
        return "unknown", None


def extract_and_clean(tar_path, dest_folder, base_name):
    """Extract and clean archive, keeping only .tex and .bib files."""
    filetype, orig_name = detect_and_fix_filetype(tar_path)
    extract_path = os.path.join(dest_folder, base_name)
    os.makedirs(extract_path, exist_ok=True)

    if filetype == "pdf":
        return (True, 0)
    if filetype == "unknown":
        return (False, 0)

    try:
        if filetype == "tar.gz":
            with tarfile.open(tar_path, 'r:*') as tar:
                tar.extractall(path=extract_path)
        elif filetype == "gz":
            out_name = orig_name or f"{base_name}.file"
            out_path = os.path.join(extract_path, out_name)
            with gzip.open(tar_path, 'rb') as fin, open(out_path, 'wb') as fout:
                shutil.copyfileobj(fin, fout)
    except Exception:
        shutil.rmtree(extract_path, ignore_errors=True)
        return (False, 0)

    deleted = 0
    for root, _, files in os.walk(extract_path):
        for f in files:
            if not f.lower().endswith(('.tex', '.bib')):
                try:
                    os.remove(os.path.join(root, f))
                    deleted += 1
                except:
                    pass
    return (True, deleted)


def crawl_single_paper(arxiv_id, save_dir=SAVE_DIR):
    """Download and process a single arXiv paper with all its versions."""
    if '.' not in arxiv_id:
        return False

    client = arxiv.Client()
    prefix, suffix = arxiv_id.split('.')
    paper_folder = os.path.join(save_dir, f"{prefix}-{suffix}")
    tex_folder = os.path.join(paper_folder, "tex")
    os.makedirs(tex_folder, exist_ok=True)

    try:
        search = arxiv.Search(id_list=[arxiv_id])
        base_paper = next(client.results(search))
        match = re.search(r'v(\d+)$', base_paper.entry_id)
        latest_version = int(match.group(1)) if match else 1
    except (StopIteration, Exception):
        return False

    title = base_paper.title
    authors = [a.name for a in base_paper.authors]
    abstract = base_paper.summary
    submission_date = base_paper.published.strftime("%Y-%m-%d") if base_paper.published else None
    publication_venue = base_paper.journal_ref if base_paper.journal_ref else None
    categories = base_paper.categories
    revised_dates = []

    if latest_version > 1:
        for v in range(2, latest_version + 1):
            try:
                vid = f"{arxiv_id}v{v}"
                search_v = arxiv.Search(id_list=[vid])
                paper_v = next(client.results(search_v))
                revised_dates.append(paper_v.updated.strftime("%Y-%m-%d") if paper_v.updated else None)
            except:
                revised_dates.append(None)

    pdf_url = base_paper.pdf_url

    metadata = {
        "arxiv_id": arxiv_id.replace('.', '-'),
        "paper_title": title,
        "abstract": abstract,
        "authors": authors,
        "submission_date": submission_date,
        "revised_dates": revised_dates,
        "publication_venue": publication_venue,
        "latest_version": latest_version,
        "categories": categories,
        "pdf_url": pdf_url
    }

    metadata_path = os.path.join(paper_folder, "metadata.json")
    try:
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, indent=4, ensure_ascii=False)
    except Exception:
        return False

    versions_processed = 0
    for v in range(1, latest_version + 1):
        version_id = f"{arxiv_id}v{v}"
        version_folder_name = f"{prefix}-{suffix}v{v}"
        temp_tar = os.path.join(paper_folder, f"{version_id}.tar.gz")

        try:
            search_v = arxiv.Search(id_list=[version_id])
            paper_v = next(client.results(search_v))
            paper_v.download_source(dirpath=paper_folder, filename=f"{version_id}.tar.gz")

            success, _ = extract_and_clean(temp_tar, tex_folder, version_folder_name)
            if success:
                versions_processed += 1

            try:
                os.remove(temp_tar)
            except:
                pass

            time.sleep(0.3)

        except (StopIteration, Exception):
            continue

    return versions_processed > 0


Writing arxiv_crawler.py


In [12]:
%%writefile reference_extractor.py
import requests
import json
import os
import time
import re
from dotenv import load_dotenv

load_dotenv()

def format_arxiv_id_for_key(arxiv_id):
    """
    Convert arXiv ID to folder format (yymm-nnnnn).
    Examples:
        "2305.04793" -> "2305-04793"
        "2305.04793v1" -> "2305-04793"
    """
    # Remove version suffix if present
    clean_id = re.sub(r'v\d+$', '', arxiv_id)
    # Replace dot with dash
    return clean_id.replace('.', '-')


def get_paper_references(arxiv_id, delay=3, max_retries=5):
    """Fetch references from Semantic Scholar API with retries."""
    clean_id = re.sub(r'v\d+$', '', arxiv_id)
    url = f"https://api.semanticscholar.org/graph/v1/paper/arXiv:{clean_id}"
    params = {
        "fields": "references,references.title,references.authors,references.year,references.venue,references.externalIds,references.publicationDate"
    }

    API_KEY = os.getenv("SEMANTIC_SCHOLAR_API_KEY")
    headers = {}
    if API_KEY:
        headers["x-api-key"] = API_KEY

    retries = 0
    while retries < max_retries:
        try:
            response = requests.get(url, params=params, headers=headers, timeout=10)
            if response.status_code == 200:
                data = response.json()
                references = data.get("references", [])
                return references, len(references) if references else 0
            elif response.status_code == 404:
                return None, 0
            elif response.status_code == 429:
                time.sleep(delay)
                retries += 1
            else:
                time.sleep(delay)
                retries += 1
        except requests.exceptions.RequestException:
            time.sleep(delay)
            retries += 1

    return None, 0


def convert_to_references_dict(references):
    """Convert Semantic Scholar references to dict format."""
    result = {}

    for ref in references:
        if not ref:
            continue

        external_ids = ref.get("externalIds", {}) or {}
        arxiv_id = external_ids.get("ArXiv", "")

        if not arxiv_id:
            continue

        key = format_arxiv_id_for_key(arxiv_id)
        authors_list = ref.get("authors", [])
        authors = [author.get("name", "") for author in authors_list if author.get("name")]

        publication_date = ref.get("publicationDate", "")
        year = ref.get("year")
        if not publication_date and year:
            publication_date = f"{year}-01-01"

        metadata = {
            "paper_title": ref.get("title", ""),
            "authors": authors,
            "submission_date": publication_date if publication_date else "",
            "semantic_scholar_id": ref.get("paperId")
        }

        result[key] = metadata

    return result


def extract_references_for_paper(arxiv_id, save_dir="./ArXivPapers"):
    """Extract references for a paper and save to references.json."""
    paper_id_key = format_arxiv_id_for_key(arxiv_id)
    paper_folder = os.path.join(save_dir, paper_id_key)

    if not os.path.exists(paper_folder):
        return False

    try:
        json_path = os.path.join(paper_folder, "references.json")
        references, total_found = get_paper_references(arxiv_id)

        if references is None or total_found == 0:
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump({}, f, indent=4, ensure_ascii=False)
            return False

        references_dict = convert_to_references_dict(references)
        if not references_dict:
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump({}, f, indent=4, ensure_ascii=False)
            return False

        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(references_dict, f, indent=4, ensure_ascii=False)

        return True

    except Exception:
        return False


Writing reference_extractor.py


In [13]:
%%writefile main.py
import time
import os
import shutil
import psutil
import threading
import sys
import requests
import signal
import pathlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from arxiv_crawler import crawl_single_paper
from reference_extractor import extract_references_for_paper


# Global statistics
stats_lock = Lock()
stats = {
    "total_processed": 0,
    "both_success": 0,
    "only_crawler_fail": 0,
    "no_references": 0,
    "both_failed": 0,
}

monitor_running = True
ram_samples_bytes = []
peak_disk_usage_bytes = 0

def _monitor_resources(baseline_ram, baseline_disk, sleep_interval=2):
    """Monitor RAM and disk usage in background."""
    global ram_samples_bytes, peak_disk_usage_bytes, monitor_running

    ram_samples_bytes = []
    peak_disk_usage_bytes = 0

    while monitor_running:
        try:
            current_ram = psutil.virtual_memory().used
            ram_above_baseline = current_ram - baseline_ram
            ram_samples_bytes.append(ram_above_baseline)

            try:
                current_disk = shutil.disk_usage('/').used
            except OSError:
                current_disk = shutil.disk_usage(pathlib.Path.cwd().anchor).used
            disk_above_baseline = current_disk - baseline_disk

            if disk_above_baseline > peak_disk_usage_bytes:
                peak_disk_usage_bytes = disk_above_baseline

        except Exception:
            pass

        time.sleep(sleep_interval)

def _print_custom_resource_report(disk_start, disk_end):
    """Print resource usage report."""
    global ram_samples_bytes, peak_disk_usage_bytes

    avg_ram_bytes = sum(ram_samples_bytes) / len(ram_samples_bytes) if ram_samples_bytes else 0
    peak_ram_bytes = max(ram_samples_bytes) if ram_samples_bytes else 0
    final_disk_bytes = disk_end - disk_start

    print("\n" + "="*80)
    print("RESOURCE REPORT")
    print(f"  Average RAM: {avg_ram_bytes / (1024**2):.2f} MB")
    print(f"  Peak RAM: {peak_ram_bytes / (1024**2):.2f} MB")
    print(f"  Peak Disk: {peak_disk_usage_bytes / (1024**2):.2f} MB")
    print(f"  Final Disk: {final_disk_bytes / (1024**2):.2f} MB")
    print("="*80)

def process_paper(arxiv_id, save_dir="./ArXivPapers"):
    """Process a single paper: crawl data first, then extract references."""
    try:
        crawler_success = crawl_single_paper(arxiv_id, save_dir)
        references_success = False
        
        if crawler_success:
            references_success = extract_references_for_paper(arxiv_id, save_dir)

        with stats_lock:
            stats["total_processed"] += 1
            if crawler_success and references_success:
                stats["both_success"] += 1
            elif not crawler_success:
                stats["only_crawler_fail"] += 1
            if not references_success:
                stats["no_references"] += 1

        return arxiv_id, crawler_success, references_success

    except Exception as e:
        with stats_lock:
            stats["total_processed"] += 1
            stats["both_failed"] += 1
        return arxiv_id, False, False


def check_paper_exists(arxiv_id):
    """Check if paper exists with HEAD request."""
    url = f"https://arxiv.org/abs/{arxiv_id}"
    try:
        response = requests.head(url, timeout=5, allow_redirects=True)
        time.sleep(0.3)
        return response.status_code == 200
    except requests.RequestException:
        time.sleep(0.3)
        return False


def find_last_valid_id(prefix, start_id, jump1=50, back1=10, jump2=5, back2=1):
    """Find last valid ID using binary search strategy."""
    try:
        start_id = int(start_id)
        jump1 = int(jump1)
        back1 = int(back1)
        jump2 = int(jump2)
        back2 = int(back2)
    except ValueError:
        return 0

    start_arxiv_id = f"{prefix}.{start_id:05d}"
    if not check_paper_exists(start_arxiv_id):
        return start_id - 1

    last_known_good_id = start_id
    current_id = start_id + jump1
    state = "JUMP1"

    while True:
        arxiv_id = f"{prefix}.{current_id:05d}"
        exists = check_paper_exists(arxiv_id)

        if exists:
            last_known_good_id = current_id
            if state == "JUMP1":
                current_id += jump1
            elif state == "BACK1":
                state = "JUMP2"
                current_id += jump2
            elif state == "JUMP2":
                current_id += jump2
            elif state == "BACK2":
                break
        else:
            if state == "JUMP1":
                state = "BACK1"
                current_id -= back1
            elif state == "BACK1":
                current_id -= back1
            elif state == "JUMP2":
                state = "BACK2"
                current_id -= back2
            elif state == "BACK2":
                current_id -= back2

    return last_known_good_id


def generate_paper_ids(start_month, start_id, end_month, end_id, save_dir="./ArXivPapers", resume_file=None):
    """Generate list of arXiv IDs, optionally excluding already processed ones."""
    start_year, start_mon = start_month.split('-')
    end_year, end_mon = end_month.split('-')
    start_prefix = start_year[2:] + start_mon
    end_prefix = end_year[2:] + end_mon

    paper_ids = []
    processed_ids = set()
    processed_ids_file = [
        name.replace('-', '.')
        for name in os.listdir(save_dir)
        if os.path.isdir(os.path.join(save_dir, name))
        and "references.json" in os.listdir(os.path.join(save_dir, name))
    ]

    processed_ids.update(processed_ids_file)
    print(f"Resuming: skipping {len(processed_ids)} already processed IDs")

    if start_month == end_month:
        for i in range(start_id, end_id + 1):
            paper_id = f"{start_prefix}.{i:05d}"
            if paper_id not in processed_ids:
                paper_ids.append(paper_id)
    else:
        last_valid_start_month = find_last_valid_id(start_prefix, start_id)
        for i in range(start_id, last_valid_start_month + 1):
            paper_id = f"{start_prefix}.{i:05d}"
            if paper_id not in processed_ids:
                paper_ids.append(paper_id)
        for i in range(1, end_id + 1):
            paper_id = f"{end_prefix}.{i:05d}"
            if paper_id not in processed_ids:
                paper_ids.append(paper_id)

    return paper_ids


def print_progress_report():
    """Print current statistics."""
    with stats_lock:
        total = stats['total_processed']
        print(f"\nProgress: {total} processed | Success: {stats['both_success']} | Crawler fail: {stats['only_crawler_fail']} | No refs: {stats['no_references']} | Errors: {stats['both_failed']}")


def print_final_report():
    """Print final statistics."""
    total = stats['total_processed']
    if total == 0:
        return

    both_success_rate = (stats['both_success'] / total * 100)
    crawl_fail_rate = (stats['both_failed'] / total * 100)
    no_references_rate = (stats['no_references'] / total * 100)

    print(f"\n{'='*80}")
    print("FINAL REPORT")
    print(f"{'='*80}")
    print(f"Total processed: {total}")
    print(f"Both success: {stats['both_success']} ({both_success_rate:.2f}%)")
    print(f"Crawler fail: {stats['only_crawler_fail']}")
    print(f"No references: {stats['no_references']} ({no_references_rate:.2f}%)")
    print(f"Errors: {stats['both_failed']} ({crawl_fail_rate:.2f}%)")
    print(f"{'='*80}")


def run_parallel_processing(start_month, start_id, end_month, end_id,
                            max_parallels=2, save_dir="./ArXivPapers"):
    """Run parallel processing of papers."""

    with stats_lock:
        for key in stats:
            stats[key] = 0

    paper_ids = generate_paper_ids(start_month, start_id, end_month, end_id, save_dir)
    total_papers = len(paper_ids)

    print(f"Processing {total_papers} papers with {max_parallels} threads")

    start_time = time.time()

    with ThreadPoolExecutor(max_workers=max_parallels) as executor:
        futures = {
            executor.submit(process_paper, arxiv_id, save_dir): arxiv_id
            for arxiv_id in paper_ids
        }

        completed = 0
        for future in as_completed(futures):

            arxiv_id = futures[future]
            completed += 1

            try:
                paper_id, crawler_ok, refs_ok = future.result()
                status = "OK" if (crawler_ok and refs_ok) else "PARTIAL" if crawler_ok else "FAILED"
                print(f"[{completed}/{total_papers}] {status} {paper_id}")

                if completed % 10 == 0:
                    print_progress_report()

            except Exception as e:
                print(f"[{completed}/{total_papers}] ERROR {arxiv_id}: {e}")

    elapsed_time = time.time() - start_time
    print(f"\nCompleted in {elapsed_time:.2f}s ({elapsed_time/total_papers:.2f}s per paper)" if total_papers > 0 else "")
    print_final_report()


def main():
    """Main function to run the entire processing."""

    START_MONTH = "2023-04"
    START_ID = 14607
    END_MONTH = "2023-05"
    END_ID = 14596
    MAX_PARALLELS = 2
    SAVE_DIR = "./ArXivPapers"

    try:
        disk_usage_start = shutil.disk_usage('/').used
    except OSError:
        disk_usage_start = shutil.disk_usage(pathlib.Path.cwd().anchor).used
    ram_usage_start = psutil.virtual_memory().used

    global monitor_running
    monitor_running = True

    monitor_thread = threading.Thread(
        target=_monitor_resources,
        args=(ram_usage_start, disk_usage_start, 2),
        daemon=True
    )
    monitor_thread.start()

    try:
        run_parallel_processing(
            start_month=START_MONTH,
            start_id=START_ID,
            end_month=END_MONTH,
            end_id=END_ID,
            max_parallels=MAX_PARALLELS,
            save_dir=SAVE_DIR,
        )
    finally:
        monitor_running = False
        monitor_thread.join()

        try:
            disk_usage_end = shutil.disk_usage('/').used
        except OSError:
            disk_usage_end = shutil.disk_usage(pathlib.Path.cwd().anchor).used
        _print_custom_resource_report(disk_usage_start, disk_usage_end)


Writing main.py


In [None]:
%load_ext memory_profiler

# Import your main.py file as a module
import main
import importlib

importlib.reload(main)

print("--- START MEASURING PEAK RAM OF MAIN ---")
%memit main.main()
print("--- PEAK RAM OF MAIN MEASUREMENT END ---")

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
--- START MEASURING PEAK RAM OF MAIN ---
Resuming: skipping 220 already processed IDs
Processing 14780 papers with 2 threads
[1/14780] OK 2304.14828
[2/14780] OK 2304.14824
[3/14780] OK 2304.14829
[4/14780] OK 2304.14830


In [None]:
print("--- COLAB DRIVE OVERVIEW ---")
!df -h /

print("\n--- REQUIRED OUTPUT CAPACITY ---")
# Measure output folder size
!du -sh ./ArXivPapers

# Mount Google Drive and save outputs

If you are running this notebook on Google Colab and want to persist the generated output folder `./ArXivPapers` to your Google Drive, run the cell below.

Steps:
1. Run the code cell and follow the authorization link to mount your Drive.
2. The cell will copy (merge) the contents of `./ArXivPapers` to `MyDrive/ArXivPapers`.
3. Check your Drive (https://drive.google.com/drive/my-drive) to confirm files are copied.

Note: Running this will not delete your Drive files; existing files may be overwritten if names conflict.

In [None]:
# # If you are running on google colab and want to save data to Drive to avoid losing data when Colab disconnection

# from google.colab import drive

# drive.mount('/content/drive')

# !zip -r ArXivPapers.zip /content/ArXivPapers

# !cp ArXivPapers.zip /content/drive/MyDrive/