In [11]:
%reload_ext autoreload
%autoreload 2

In [1]:
import os
from openai import OpenAI
import numpy as np
import asyncio
from typing import List
from pdf2image import convert_from_path
import pytesseract
from concurrent.futures import ThreadPoolExecutor
import polars as pl
from dotenv import load_dotenv

load_dotenv()
client = OpenAI()

In [2]:
len(os.listdir('pdfs'))

4460

In [3]:
def extract_text_from_pdf(path: str) -> str:
    """
    Extract text from a PDF file using OCR.
    :param path: Local path to PDF file
    :return: Extracted text from the PDF
    """
    images = convert_from_path(path)
    text = ""
    for img in images:
        text += pytesseract.image_to_string(img)
    return text

def process_pdfs_parallel(directory: str, max_workers: int = 4):
    """
    Process multiple PDFs in parallel.
    :param directory: Directory containing PDF files
    :param max_workers: Number of threads to use
    :return: List of extracted text for each PDF
    """
    pdf_files = [os.path.join(directory, f) for f in os.listdir(directory)[:100] if f.endswith(".pdf")]
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(extract_text_from_pdf, pdf_files))
    return results

documents = process_pdfs_parallel('pdfs', max_workers=4)

In [4]:
def create_chunks(documents: list, min_chunk_length: int = 250) -> list:
    """
    Creates chunks from a list of documents with a minimum chunk length.
    Smaller chunks are concatenated together until they reach the minimum length.
    :param documents: List of document strings
    :param min_chunk_length: Minimum length for each chunk
    :return: List of chunks
    """
    chunks = []
    current_chunk = ""

    for document in documents:
        for part in document.split('\n\n'):
            if len(current_chunk) + len(part) >= min_chunk_length:
                current_chunk += (' ' + part)  # Add part to current chunk
                chunks.append(current_chunk.strip())  # Save the chunk
                current_chunk = ""  # Reset for the next chunk
            else:
                current_chunk += (' ' + part) if current_chunk else part  # Concatenate to current chunk

    if current_chunk.strip():
        chunks.append(current_chunk.strip())

    return chunks


chunks = create_chunks(documents)

In [5]:
from pathlib import Path

async def fetch_batch_embeddings(
    texts: List[str], 
    model: str = "text-embedding-3-small"
) -> List[List[float]]:
    """Fetch embeddings for a single batch asynchronously."""
    response = await asyncio.to_thread(
        client.embeddings.create, 
        input=texts, 
        model=model
    )
    return [item.embedding for item in response.data]

async def get_batch_embeddings_async(
    texts: List[str], 
    model: str = "text-embedding-3-small", 
    batch_size: int = 16
) -> np.ndarray:
    """Asynchronously fetch embeddings in batches."""
    tasks = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        tasks.append(fetch_batch_embeddings(batch, model))
    
    # Process all batches concurrently
    results = await asyncio.gather(*tasks)
    return np.array([embedding for batch in results for embedding in batch])


async def save_embeddings(filepath: Path, texts: List[str], embeddings: np.ndarray):
    """Save embeddings and texts to a Parquet file using Polars."""
    # Ensure the directory exists
    filepath = Path(filepath)  # Ensure it’s a Path object
    filepath.parent.mkdir(parents=True, exist_ok=True)

    # Create a Polars DataFrame
    df = pl.DataFrame({
        "text": texts,
        "embedding": [embedding.tolist() for embedding in embeddings]  # Convert numpy array to list
    })

    # Save the DataFrame as a Parquet file
    await asyncio.to_thread(df.write_parquet, str(filepath))

async def load_embeddings(filepath: Path) -> pl.DataFrame:
    """Load embeddings and texts from a Parquet file using Polars."""
    return await asyncio.to_thread(pl.read_parquet, str(filepath))


embeddings = await get_batch_embeddings_async(
    chunks, model="text-embedding-3-small", batch_size=16
)

# Save embeddings to disk
output_file = Path("embeddings/embeddings.parquet")
await save_embeddings(embeddings=embeddings, texts=chunks, filepath=output_file)

# Load embeddings later
df = await load_embeddings(output_file)