# PIPELINE v0.1

## Libraries

In [None]:
import csv
import torch
import os
from typing import List, Type
from tqdm import tqdm
import json

# Database Imports
from sqlmodel import SQLModel, Field, Session, create_engine, select
from sqlalchemy import Column, text
from pgvector.sqlalchemy import Vector

# ML Imports
from transformers import AutoTokenizer, AutoModel

  from .autonotebook import tqdm as notebook_tqdm


## Configuration

In [4]:
# Database Connection
DATABASE_URL = "postgresql+psycopg://bulacio@localhost:5432/vectordb"

# Dataset Path
DATASET_PATH = "../data_filtered/corpus_filtered.jsonl" 

In [None]:
# Experiment A: bert all-MiniLM-L6-v2
CURRENT_MODEL_ID = 'sentence-transformers/all-MiniLM-L6-v2'
CURRENT_TABLE_NAME = 'bert_all_minilm_l6_v2'
VECTOR_DIMENSION = 384

In [6]:
# Experiment B: 
# CURRENT_MODEL_ID = 
# CURRENT_TABLE_NAME = 
# VECTOR_DIMENSION = 

## BERT Embedder

In [7]:
class BertEmbedder:
    """
    Encapsulates the Transformer model logic.
    Initialized with a specific Model ID to allow easy swapping.
    """
    def __init__(self, model_id: str):
        print(f"--- Loading Model: {model_id} ---")
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Using device: {self.device}")
        
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModel.from_pretrained(model_id).to(self.device)

    def _mean_pooling(self, model_output, attention_mask):
        """
        Internal method: Collapses the token matrix into a single vector.
        """
        token_embeddings = model_output.last_hidden_state
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        
        sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
        sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
        
        return sum_embeddings / sum_mask

    def generate_embedding(self, text: str) -> List[float]:
        # A. Tokenize
        inputs = self.tokenizer(text, padding=True, truncation=True, return_tensors='pt').to(self.device)

        # B. Inference
        with torch.no_grad():
            outputs = self.model(**inputs)

        # C. Pooling
        sentence_embeddings = self._mean_pooling(outputs, inputs['attention_mask'])

        # D. Normalize
        sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)

        # Return list (move to CPU first if on GPU)
        return sentence_embeddings[0].cpu().tolist()

## dynamic database creation

In [8]:
def create_table_class(table_name: str, dim: int) -> Type[SQLModel]:
    """
    Dynamically creates a SQLModel class.
    This allows us to save data to different tables (e.g., 'bert_v1', 'bert_v2')
    without rewriting the class code manually.
    """
    # We define the class attributes dynamically
    class DynamicTable(SQLModel, table=True):
        __tablename__ = table_name
        __table_args__ = {'extend_existing': True} # Allows overwriting if class exists in memory

        # Mapping CSV '_id' to primary key
        id: str = Field(primary_key=True) 
        title: str
        text: str
        
        # The Vector column
        embedding: List[float] = Field(sa_column=Column(Vector(dim)))

    return DynamicTable

## Pipeline

In [9]:
def run_pipeline():
    # A. Setup Database
    engine = create_engine(DATABASE_URL)
    
    # Ensure pgvector extension exists
    with engine.connect() as conn:
        conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
        conn.commit()

    # B. Define the Table Model based on configuration
    TableClass = create_table_class(CURRENT_TABLE_NAME, VECTOR_DIMENSION)
    SQLModel.metadata.create_all(engine)

    # C. Initialize ML Model
    embedder = BertEmbedder(CURRENT_MODEL_ID)

    # D. Process JSONL and Insert
    if not os.path.exists(DATASET_PATH):
        print(f"Error: Dataset not found at {DATASET_PATH}")
        return

    print(f"--- Processing JSONL: {DATASET_PATH} ---")
    print(f"--- Target Table: {CURRENT_TABLE_NAME} ---")

    data_buffer = []
    BATCH_SIZE = 100 

    with Session(engine) as session:
        # Open the JSONL file
        with open(DATASET_PATH, mode='r', encoding='utf-8') as f:
            
            # Iterate line by line. 
            # We wrap 'f' with tqdm to show progress (lines processed)
            for line in tqdm(f, desc="Embedding Docs"):
                try:
                    if not line.strip():
                        continue # Skip empty lines

                    # 1. Parse JSON
                    row = json.loads(line)

                    # 2. Extract Data
                    # Assuming keys are: "_id", "title", "text" based on your previous CSV structure
                    doc_id = row.get('_id')
                    title = row.get('title', '')
                    doc_text = row.get('text', '')

                    # Skip if ID is missing
                    if not doc_id:
                        continue

                    # 3. Generate Vector
                    # Combine title and text for better semantic search context
                    full_content = f"{title}: {doc_text}"
                    vector = embedder.generate_embedding(full_content)

                    # 4. Create Record
                    record = TableClass(
                        id=doc_id,
                        title=title,
                        text=doc_text,
                        embedding=vector
                    )
                    data_buffer.append(record)

                    # 5. Batch Commit
                    if len(data_buffer) >= BATCH_SIZE:
                        session.add_all(data_buffer)
                        session.commit()
                        data_buffer = []

                except json.JSONDecodeError:
                    print(f"Skipping invalid JSON line")
                except Exception as e:
                    print(f"Error processing doc {doc_id if 'doc_id' in locals() else 'unknown'}: {e}")

            # 6. Commit remaining records after loop finishes
            if data_buffer:
                session.add_all(data_buffer)
                session.commit()

    print("\n--- Pipeline Finished Successfully ---")

In [None]:
# if __name__ == "__main__":
# Run the full process
run_pipeline()

--- Loading Model: sentence-transformers/all-MiniLM-L6-v2 ---
Using device: cpu
--- Processing JSONL: ../data_filtered/corpus_filtered.jsonl ---
--- Target Table: embeddings_minilm ---


Embedding Docs: 379it [00:14, 27.05it/s]