In [None]:
# !pip install pandas
!pip install replicate

# STEP 1: Imports
import os
import pandas as pd
from tqdm import tqdm
import replicate
from getpass import getpass
from google.colab import drive
import re

# STEP 2: Set up Replicate
os.environ["REPLICATE_API_TOKEN"] = getpass("🔐 Enter your Replicate API Token: ")

# STEP 3: Google Drive setup
drive.mount('/content/drive', force_remount=True)
input_folder = "/content/drive/MyDrive/research/practice_exams"
output_folder = "/content/drive/MyDrive/research/replicate_responses"
os.makedirs(output_folder, exist_ok=True)

# STEP 4: Models

models = {
    # "openai/o4-mini": "o4-mini",
    # "anthropic/claude-3.5-haiku": "claude-3.5-haiku",
    # "openai/o1": "o1",
    # "anthropic/claude-3.7-sonnet": "claude-3.7-sonnet",
    # "meta/meta-llama-3-70b-instruct": "meta-llama-3-70b-instruct",
    "meta/meta-llama-3-8b-instruct": "meta-llama-3-8b-instruct",
    # "deepseek-ai/deepseek-r1": "deepseek-r1",
    # "google-deepmind/gemma-7b-it": "gemma-7b-it",
    # "google-deepmind/gemma-2b-it": "gemma-2b-it",
    # "lucataco/qwen1.5-72b": "qwen1.5-72b",
    # "lucataco/qwen2.5-omni-7b": "qwen2.5-omni-7b",


}

# STEP 5: Exams
exam_files = [
    # "CIPPUS Practice Exam.csv"

    #  "AIGP Practice Exam.csv",
    # "CIPM Practice Exam.csv",
    "CIPT Practice Exam.csv"
]


# STEP 6: Helpers
def build_prompt(row, answer_columns):
    lines = []

    if 'scenario' in row and pd.notna(row['scenario']):
        lines.append(f"Context:\n{row['scenario']}\n")

    lines.append(f"Question:\n{row['question']}\n")
    lines.append("Choices:")
    for i, col in enumerate(answer_columns):
        lines.append(f"{chr(65 + i)}. {row[col]}")

    lines.append(
        "\nYou are a certified U.S. privacy professional taking a high-stakes multiple-choice exam (such as the AIGP, CIPP/US, or CIPT).\n"
        "Read the question and choices carefully. Use your knowledge of U.S. privacy laws (e.g., GDPR, CCPA, HIPAA, etc.), data governance best practices, and legal reasoning.\n\n"
        "Eliminate clearly incorrect choices if possible. Choose the BEST answer, even if more than one seems partially correct.\n\n"
        "Respond only in the following exact format:\n"
        "Final Answer: <A/B/C/D>\n"
        "Explanation: <A concise justification, under 150 words. Reference relevant laws or best practices.>\n\n"
        "Do not explain all four choices—just support your final choice."
    )
    return "\n".join(lines)

def query_model(model_id, prompt):
    try:
        output = replicate.run(model_id, input={"prompt": prompt})
        return "".join(output) if isinstance(output, list) else output
    except Exception as e:
        print(f"⚠️ Model query failed: {e}")
        return ""

def extract_by_content_match(response, answer_choices):
    response = response.lower()
    best_match = ""
    best_score = 0

    for letter, text in answer_choices.items():
        if not isinstance(text, str):
            continue
        overlap = len(set(text.lower().split()) & set(response.split()))
        if overlap > best_score:
            best_score = overlap
            best_match = letter

    return best_match.upper()

def extract_answer_letter(response, answer_choices=None):
    if not isinstance(response, str):
        return ""

    # Match "Final Answer: A"
    match = re.search(r"final answer\s*[:\-]?\s*([A-D])\b", response, re.IGNORECASE)
    if match:
        return match.group(1).upper()

    lines = response.strip().splitlines()
    if not lines:
        return ""

    for line in lines:
        if re.fullmatch(r"[A-Da-d]", line.strip()):
            return line.strip().upper()

    match = re.search(r"\b(correct|best|answer)\s*(is|:)?\s*([A-D])\b", response, re.IGNORECASE)
    if match:
        return match.group(3).upper()

    match = re.match(r"^\s*([A-D])[\.:]?\s", lines[0])
    if match:
        return match.group(1).upper()

    if answer_choices:
        return extract_by_content_match(response, answer_choices)

    return ""


def evaluate_response(model_letter, correct_letter, choices, response):
    model_letter = model_letter.strip().upper()
    correct_letter = correct_letter.strip().upper()

    if model_letter == correct_letter:
        return 1

    correct_text = choices.get(correct_letter, "").strip().lower()
    if correct_text and correct_text in response.lower():
        return 1

    return 0

# STEP 7: Main process
def process_exam(model_id, model_name, file_name):
    print(f"📄 Processing {file_name} with {model_name}")

    file_path = os.path.join(input_folder, file_name)
    df = pd.read_csv(file_path)

    answer_cols = sorted([col for col in df.columns if col.lower().startswith("answer")])
    required = ['question', 'correct answer'] + answer_cols
    if not all(col in df.columns for col in required):
        raise ValueError(f"Missing required columns in {file_name}")

    optional_cols = ['scenario'] if 'scenario' in df.columns else []
    df = df[required + optional_cols]
    df = df.dropna(subset=['question'])

    results = []
    for _, row in tqdm(df.iterrows(), total=len(df), desc="💬 Asking model"):
        prompt = build_prompt(row, answer_cols)
        response = query_model(model_id, prompt)
        answer_choices = {chr(65 + i): row[col] for i, col in enumerate(answer_cols)}
        model_letter = extract_answer_letter(response, answer_choices)
        score = evaluate_response(model_letter, row['correct answer'], answer_choices, response)

        result = {
            "question": row['question'],
            "scenario": row.get('scenario', ""),
            "correct answer": row['correct answer'],
            "model answer": model_letter,
            "score": score,
            "response": response,
            **{col: row[col] for col in answer_cols}
        }
        results.append(result)

    df_out = pd.DataFrame(results)

    # Reorder columns for output
    columns_order = ['question']
    if 'scenario' in df_out.columns:
        columns_order.append('scenario')
    columns_order += answer_cols + ['correct answer', 'model answer', 'score', 'response']
    df_out = df_out[columns_order]

      # Add total score row with percentage
    total_score = df_out['score'].sum()
    total_questions = len(df_out)  # includes TOTAL row, so subtract 1 later
    percentage = (total_score / (total_questions - 1)) * 100 if total_questions > 1 else 0

    score_row = {col: "" for col in df_out.columns}
    score_row['question'] = "TOTAL SCORE"
    score_row['score'] = f"{total_score} / {total_questions - 1} ({percentage:.1f}%)"

    df_out = pd.concat([df_out, pd.DataFrame([score_row])], ignore_index=True)

    # Save
    output_file = f"{file_name.split('.')[0]}-{model_name}-output.csv"
    df_out.to_csv(os.path.join(output_folder, output_file), index=False)

    print(f"✅ Saved to: {output_file}")
    print(f"🎯 Score: {total_score} / {len(df_out) - 1} ({(total_score / (len(df_out) - 1)) * 100:.2f}%)")

# STEP 8: Run
for model_id, model_name in models.items():
    for exam_file in exam_files:
        process_exam(model_id, model_name, exam_file)

🔐 Enter your Replicate API Token: ··········
Mounted at /content/drive
📄 Processing CIPT Practice Exam.csv with meta-llama-3-8b-instruct


💬 Asking model: 100%|██████████| 90/90 [02:34<00:00,  1.71s/it]


✅ Saved to: CIPT Practice Exam-meta-llama-3-8b-instruct-output.csv
🎯 Score: 68 / 90 (75.56%)
