In [None]:
# Cell 1
# Setup
# Install required packages
%pip install -r "requirements.txt" --extra-index-url https://download.pytorch.org/whl/cu128


In [None]:
# Cell 2
import pandas as pd
from src.analyzer import analyze_profiles, load_config
from src.evaluate import evaluate_all_models
from dotenv import load_dotenv
from datetime import datetime
import shutil as shutil
import os as os

# load environment variables from .env file
load_dotenv()

# Load configuration
config = load_config('config.yaml')

# Get paths from config
INPUT_PATH = config['paths']['input_data']
RESULTS_OUTPUT_PATH = config['paths']['results_output']
METRICS_OUTPUT_PATH = config['paths']['metrics_output']

# Show enabled models
enabled_models = [m for m in config['models'] if m.get('enabled', True)]
print(f"Enabled models: {len(enabled_models)}")
for m in enabled_models:
    print(f"  - {m['name']}: {m['model_id']}")

# Print model args for pipeline

# Print model args for pipeline (added)
from src.analyzer import build_pipeline_args, debug_print_pipeline_args

hf_token = (
    os.getenv("HF_TOKEN")
)
print(f"HF token present?", bool(hf_token))

for m in enabled_models:
    args = build_pipeline_args(m, hf_token)
    debug_print_pipeline_args(args)
    
# Load data
df = spark.read.csv(INPUT_PATH, header=True, inferSchema=True)
df = df.toPandas()

print(f"Loaded {len(df)} profiles")
print(f"Columns: {list(df.columns)}")

# Run analysis
results = analyze_profiles(
    df,
    config,
    input_col=config.get('input_column', 'about_me'),
    batch_size=config.get('batch_size', 10),
    max_new_tokens=config.get('max_new_tokens', 2000)
)

# Evaluate models against human labels
model_names = [m['name'] for m in enabled_models]
comparison = evaluate_all_models(
    results,
    model_names,
    true_col=config['human_label_column'],
)

# Save metrics and results
timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S')

# Convert pandas to Spark DataFrames and save
metrics_filename = METRICS_OUTPUT_PATH.format(timestamp=timestamp).replace('.csv', '')
spark_comparison = spark.createDataFrame(comparison)
spark_comparison.coalesce(1).write.mode('overwrite').option('header', 'true').csv(metrics_filename)
print(f"Model metrics saved to: {metrics_filename}")

# Convert array columns to strings for Spark CSV compatibility
results_for_spark = results.copy()
for col in results_for_spark.columns:
    if col.endswith('_tags') or col.endswith('_improvement_points'):
        results_for_spark[col] = results_for_spark[col].astype(str)

results_filename = RESULTS_OUTPUT_PATH.format(timestamp=timestamp).replace('.csv', '')
spark_results = spark.createDataFrame(results_for_spark)
spark_results.coalesce(1).write.mode('overwrite').option('header', 'true').csv(results_filename)
print(f"Results saved to {results_filename}")
display(comparison)

In [None]:
#Cell 2 Version that works.

import os
from dotenv import find_dotenv, load_dotenv
import torch

# Hub flags (optional, matches your working snippet)
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0"
os.environ["HF_HUB_DISABLE_XET"] = "1"

# Load .env robustly
dotenv_path = find_dotenv(usecwd=True)
print(f"Using .env at:", dotenv_path if dotenv_path else "(not found)")
load_dotenv(dotenv_path, override=True)

# Import analyzer helpers
from src.analyzer import build_pipeline_args, debug_print_pipeline_args, load_model_pipeline

# Minimal model_config (same as your working snippet)
model_config = {
    "name": "Llama3-8b",
    "model_id": "meta-llama/Meta-Llama-3-8B-Instruct",
    "is_instruct": True,
    "device_map": "auto",
    "torch_dtype": "auto", # or "bfloat16"/"float16"/"float32"
}

# Resolve token
hf_token = os.getenv("HF_TOKEN")

# Build and print the exact args we will send to transformers.pipeline
args = build_pipeline_args(model_config, hf_token)
debug_print_pipeline_args(args)

# Load pipeline (uses exactly those args)
pipe = load_model_pipeline(model_config, hf_token)
print(f"Pipeline created:", type(pipe).__name__, "| Model:", pipe.model.__class__.__name__)

# Tiny generation to confirm E2E
messages = [{"role": "user", "content": "Say hello in one short sentence."}]
out = pipe(messages, max_new_tokens=16, temperature=0.7)
print("OK ->", out[0]["generated_text"])

In [None]:
# Run this once - copies from your workspace to persistent DBFS
dbutils.fs.mkdirs("dbfs:/FileStore/harmony_encodings")

# Adjust the workspace path to where you uploaded the files
workspace_path = "/Workspace/Repos/vthedataeng@gmail.com/wfa_profile_analyzer"

dbutils.fs.cp(f"file:{workspace_path}/o200k_base.tiktoken", "dbfs:/FileStore/harmony_encodings/o200k_base.tiktoken")
dbutils.fs.cp(f"file:{workspace_path}/cl100k_base.tiktoken", "dbfs:/FileStore/harmony_encodings/cl100k_base.tiktoken")

# Verify
display(dbutils.fs.ls("dbfs:/FileStore/harmony_encodings"))