In [1]:
pip install docker


Note: you may need to restart the kernel to use updated packages.


In [10]:
import os
import json
import re
import time
import pandas as pd
from elasticsearch import Elasticsearch, helpers
from kagglehub import dataset_download
from tqdm import tqdm
import docker
from requests.exceptions import ConnectionError

# ---- Clean and Preprocess ----
def clean_text(text):
    if not text:
        return ""
    text = re.sub(r'\s+', ' ', text.strip())
    return text

def preprocess_arxiv_json(json_file_path, max_records=None):
    records = []
    with open(json_file_path, 'r') as f:
        for i, line in enumerate(f):
            if max_records and i >= max_records:
                break
            entry = json.loads(line)

            # Parse authors
            authors_list = entry.get('authors_parsed', [])
            authors = [f"{a[1]} {a[0]}" for a in authors_list if isinstance(a, list)]
            if not authors:
                authors = [entry.get("authors", "")]

            published = ""
            if 'versions' in entry and isinstance(entry['versions'], list) and len(entry['versions']) > 0:
                published = entry['versions'][0].get('created', '')

            processed_entry = {
                'id': entry.get('id'),
                'submitter': entry.get('submitter', ''),
                'title': clean_text(entry.get('title', '')),
                'summary': clean_text(entry.get('abstract', '')),
                'authors': ', '.join(authors),
                'comments': entry.get('comments', ''),
                'journal_ref': entry.get('journal-ref', ''),
                'doi': entry.get('doi', ''),
                'report_no': entry.get('report-no', ''),
                'categories': entry.get('categories', ''),
                'update_date': entry.get('update_date', ''),
                'published': published
            }
            records.append(processed_entry)
    return pd.DataFrame(records)

# ---- Elasticsearch Setup ----
def create_index(es, index_name):
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)

    index_config = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "submitter": {"type": "text"},
                "title": {"type": "text"},
                "summary": {"type": "text"},
                "authors": {"type": "text"},
                "comments": {"type": "text"},
                "journal_ref": {"type": "text"},
                "doi": {"type": "keyword"},
                "report_no": {"type": "keyword"},
                "categories": {"type": "keyword"},
                "update_date": {"type": "date"},
                "published": {"type": "date"}
            }
        }
    }
    es.indices.create(index=index_name, body=index_config)

# ---- Index Data ----
def index_data(es, df, index_name):
    actions = []
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Indexing"):
        doc = row.to_dict()
        actions.append({
            "_index": index_name,
            "_id": doc["id"],
            "_source": doc
        })
        if len(actions) >= 500:
            helpers.bulk(es, actions)
            actions = []
    if actions:
        helpers.bulk(es, actions)

# ---- Connect to Elasticsearch via Docker ----
def connect_to_elasticsearch_docker(container_name="elasticsearch-container", max_retries=15, wait_seconds=5):
    client = docker.from_env()

    try:
        container = client.containers.get(container_name)
        if container.status != "running":
            print(f"Starting container '{container_name}'...")
            container.start()
        else:
            print(f"Container '{container_name}' is already running.")
    except docker.errors.NotFound:
        print(f"Container '{container_name}' not found. Creating and starting a new one...")
        container = client.containers.run(
            "docker.elastic.co/elasticsearch/elasticsearch:8.11.1",
            name=container_name,
            ports={'9200/tcp': 9200, '9300/tcp': 9300},
            environment={"discovery.type": "single-node", "xpack.security.enabled": "false"},
            detach=True
        )

    # Retry until Elasticsearch is ready
    es = None
    for attempt in range(max_retries):
        try:
            es = Elasticsearch("http://localhost:9200")
            if es.ping():
                print("✅ Successfully connected to Elasticsearch running in Docker!")
                return es
        except ConnectionError:
            print(f"⌛ Waiting for Elasticsearch to be ready (attempt {attempt + 1})...")
            time.sleep(wait_seconds)
    
    raise Exception("❌ Could not connect to Elasticsearch after several attempts.")

# ---- Main Pipeline ----
if __name__ == "__main__":
    print("📦 Downloading dataset...")
    dataset_path = dataset_download("Cornell-University/arxiv")
    json_file_path = os.path.join(dataset_path, "arxiv-metadata-oai-snapshot.json")

    print("🧼 Preprocessing data...")
    df = preprocess_arxiv_json(json_file_path, max_records=10000)  # Change/remove limit as needed

    print("🐳 Connecting to Elasticsearch via Docker...")
    es = connect_to_elasticsearch_docker()

    index_name = "arxiv-papers"

    print(f"📁 Creating index '{index_name}'...")
    create_index(es, index_name)

    print("📨 Indexing documents into Elasticsearch...")
    index_data(es, df, index_name)

    print("🎉 Done!")


📦 Downloading dataset...
🧼 Preprocessing data...
🐳 Connecting to Elasticsearch via Docker...


DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))