In [26]:


# Cell 1: Imports & Config
import os
import time
import pickle
import numpy as np
import pandas as pd
import faiss
import google.generativeai as genai
from typing import Dict, Any, List
from google.generativeai import embed_content





In [34]:
# --- Configuration ---
API_KEY = 'your-api-key-here'  # <--- REPLACE THIS WITH YOUR KEY
MODEL_NAME = "models/text-embedding-004"

# File Paths
PATH_TRADES = "../data/raw/trades.csv"
PATH_HOLDINGS = "../data/raw/holdings.csv"
DIR_PROCESSED = "../data/processed"
DIR_VECTORS = "../vectors"

In [28]:
# Setup Environment
genai.configure(api_key=API_KEY)
os.makedirs(DIR_PROCESSED, exist_ok=True)
os.makedirs(f"{DIR_VECTORS}/float", exist_ok=True)
os.makedirs(f"{DIR_VECTORS}/trades_index", exist_ok=True)
os.makedirs(f"{DIR_VECTORS}/holdings_index", exist_ok=True)

In [29]:
# Schema Definitions

SCHEMA_TRADES: Dict[str, Dict[str, Any]] = {
    "TradeDate":     {"type": "date"},
    "PortfolioName": {"type": "categorical"},
    "TradeTypeName": {"type": "categorical"},
    "SecurityId":    {"type": "identifier"},
    "Quantity":      {"type": "numeric"},
    "Price":         {"type": "numeric"},
    "StrategyName":  {"type": "categorical"}
}

SCHEMA_HOLDINGS: Dict[str, Dict[str, Any]] = {
    "AsOfDate":      {"type": "date"},
    "PortfolioName": {"type": "categorical"},
    "SecurityId":    {"type": "identifier"},
    "Qty":           {"type": "numeric"},
    "MV_Base":       {"type": "numeric"},
    "PL_YTD":        {"type": "numeric"}
}

In [30]:
# Validation Logic

def validate_and_clean(df: pd.DataFrame, schema: Dict[str, Any], date_col: str) -> pd.DataFrame:

    """Enforces schema rules and checks for nulls."""

    for col in schema:

        if col not in df.columns:
            raise ValueError(f"Missing required column: {col}")

    df[date_col] = pd.to_datetime(df[date_col], errors='raise')

    for col, rules in schema.items():

        if rules['type'] == 'numeric':

            if not pd.api.types.is_numeric_dtype(df[col]):

                raise TypeError(f"{col} must be numeric")

    if df[date_col].isna().any():
        
        raise ValueError(f"Nulls found in {date_col}")

    return df

In [31]:
# Execution - Load & Validate

print("Loading & Validating")

trades_df = pd.read_csv(PATH_TRADES)
trades_df = validate_and_clean(trades_df, SCHEMA_TRADES, "TradeDate")

# Quarantine Invalid Prices

mask_invalid_price = trades_df["Price"] <= 0

if mask_invalid_price.any():

    print(f"Quarantining {mask_invalid_price.sum()} invalid trade rows.")
    trades_df[mask_invalid_price].to_csv(f"{DIR_PROCESSED}/trades_quarantine.csv", index=False)
    trades_df = trades_df[~mask_invalid_price].copy()


holdings_df = pd.read_csv(PATH_HOLDINGS)
holdings_df = validate_and_clean(holdings_df, SCHEMA_HOLDINGS, "AsOfDate")

trades_df.to_csv(f"{DIR_PROCESSED}/trades_validated.csv", index=False)
holdings_df.to_csv(f"{DIR_PROCESSED}/holdings_validated.csv", index=False)

print("Validation Complete. Data saved.")

Loading & Validating
Quarantining 15 invalid trade rows.
Validation Complete. Data saved.


In [32]:
# Serialization (Semantic Core)

trades_df["semantic_core"] = trades_df.apply(

    lambda x: (
        f"Trade event on {x['TradeDate']} for {x['PortfolioName']}: "
        f"{x['TradeTypeName']} {x['Quantity']} units of {x['SecurityId']} "
        f"at price {x['Price']} strategy {x['StrategyName']}"
    ), axis=1
    
)

holdings_df["semantic_core"] = holdings_df.apply(
    lambda x: (
        f"Holding position as of {x['AsOfDate']} for {x['PortfolioName']}: "
        f"Held {x['Qty']} units of {x['SecurityId']} "
        f"valued at {x['MV_Base']} with YTD P&L {x['PL_YTD']}"
    ), axis=1
)

trades_texts: List[str] = trades_df["semantic_core"].tolist()
holdings_texts: List[str] = holdings_df["semantic_core"].tolist()

In [None]:
# Vector Generation (SUPER FAST BATCH VERSION)

def generate_embeddings(texts: List[str]) -> np.ndarray:

    """Calls API in BATCHES to speed up process significantly."""

    vectors = []
    total = len(texts)
    BATCH_SIZE = 20     # Safe batch size for free tier
    
    print(f"Starting FAST embedding generation for {total} items..")
    
    for i in range(0, total, BATCH_SIZE):

        batch = texts[i : i + BATCH_SIZE]
        retry_count = 0
        
        while retry_count < 3:
            try:
                # API Call with LIST of strings (Batching)

                res = embed_content(model=MODEL_NAME, content=batch)
                
                # Extract embeddings from response

                batch_vectors = res['embedding']
                vectors.extend(batch_vectors)
                
                # Progress Update

                print(f"Processed {min(i + BATCH_SIZE, total)}/{total}...")
                
                # Short pause to prevent rate limits

                time.sleep(1.0) 
                break
                
            except Exception as e:
                retry_count += 1

                print(f"Batch Error (Attempt {retry_count}): {e}")

                time.sleep(5) # Wait longer on error
                
                if retry_count == 3:

                    print(f"Skipping batch starting at index {i}")

                    # Fill dummy zeros to keep shape correct

                    dummy_vec = [0.0] * 768 

                    for _ in range(len(batch)):
                        vectors.append(dummy_vec)

    return np.array(vectors).astype("float32")

print("Generating Embeddings")

trades_vectors = generate_embeddings(trades_texts)

holdings_vectors = generate_embeddings(holdings_texts)

# Ensure directory exists before saving

save_dir = f"{DIR_VECTORS}/float"

os.makedirs(save_dir, exist_ok=True)

with open(f"{save_dir}/trades_embeddings.pkl", "wb") as f:
    pickle.dump(trades_vectors, f)

with open(f"{save_dir}/holdings_embeddings.pkl", "wb") as f:
    pickle.dump(holdings_vectors, f)

print("Embeddings Generated & Saved.")

Embedding Generated and Saved


In [None]:
# Indexing (PCA + PQ + FAISS)

print("Building FAISS Indexes")

TARGET_DIM = 256
N_LIST = 64
M_SUBQUANTIZERS = 32
N_BITS = 8

# 1. Train PCA

pca = faiss.PCAMatrix(trades_vectors.shape[1], TARGET_DIM)
pca.train(trades_vectors) 
faiss.write_VectorTransform(pca, f"{DIR_VECTORS}/pca_transform.faiss")

# 2. Apply PCA

trades_pca = pca.apply_py(trades_vectors)
holdings_pca = pca.apply_py(holdings_vectors)

# 3. Train & Build PQ Index

quantizer = faiss.IndexFlatL2(TARGET_DIM)

trades_index = faiss.IndexIVFPQ(quantizer, TARGET_DIM, N_LIST, M_SUBQUANTIZERS, N_BITS)
holdings_index = faiss.IndexIVFPQ(quantizer, TARGET_DIM, N_LIST, M_SUBQUANTIZERS, N_BITS)

trades_index.train(trades_pca)
holdings_index.train(holdings_pca)

trades_index.add(trades_pca)
holdings_index.add(holdings_pca)

# 4. Save Indexes

faiss.write_index(trades_index, f"{DIR_VECTORS}/trades_index/index_pq.faiss")
faiss.write_index(holdings_index, f"{DIR_VECTORS}/holdings_index/index_pq.faiss")

print("Pipeline Complete. Vectors Ready.")

Building FAISS Indexes
Pipeline Complete. Vectors Ready.
