In [None]:
# Install the correct FAISS library for GPU or CPU
# The first command attempts to install the GPU version, which is preferred for speed
# If that fails, it will fall back to the CPU version.
!pip install -q faiss-gpu || pip install -q faiss-cpu

# Install other required libraries
!pip install -q transformers sentence-transformers torchmetrics pytorch-lightning

# Fix for the 'AdamW' import error
# In newer versions of the Hugging Face library, AdamW is part of PyTorch
# and should be imported from there directly.
from transformers import AutoTokenizer, AutoModel
from torch.optim import AdamW

# Log in to Weights & Biases for experiment tracking
!pip install -q wandb
import wandb
wandb.login()

# Import other necessary libraries
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import faiss
import random
import re
import pytorch_lightning as pl
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.metrics import f1_score, precision_recall_curve, auc

# Check if a GPU is available and set the device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using device: {device}")

[31mERROR: Could not find a version that satisfies the requirement faiss-gpu (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for faiss-gpu[0m[31m
[0m



Using device: cuda


In [None]:


# Set the path to your data files in Google Drive
data_path = "/content/" # CHANGE THIS TO YOUR FOLDER PATH

# Load the datasets
train_df = pd.read_csv(os.path.join(data_path, 'train.csv'))
val_df = pd.read_csv(os.path.join(data_path, 'test.csv'))

# Display the first few rows to confirm it's loaded correctly
print("Training Data:")
print(train_df.head())
print("\nValidation Data:")
print(val_df.head())

# Combine text fields for easier processing later
train_df['text'] = train_df['QuestionText'] + " " + train_df['StudentExplanation'].fillna('')
val_df['text'] = val_df['QuestionText'] + " " + val_df['StudentExplanation'].fillna('')

# Handle the 'Category' column which contains our labels
# We need to create a canonical knowledge store of misconceptions
# We'll treat 'Category' and 'Misconception' as the ground truth labels
# For now, let's create a list of all unique misconception labels
all_labels = pd.concat([train_df['Category'], train_df['Misconception'].dropna()]).unique()
misconception_catalog = {label: {"id": i, "description": "", "examples": ""} for i, label in enumerate(all_labels)}

# Example of a simple text cleaning function
def clean_text(text):
    text = text.lower() # Lowercasing
    text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace
    # Keep math tokens and numerals
    # For now, this is light-touch, as planned in the methodology
    return text

train_df['cleaned_text'] = train_df['text'].apply(clean_text)
val_df['cleaned_text'] = val_df['text'].apply(clean_text)

print("\nSample of cleaned text:")
print(train_df['cleaned_text'].head())

Training Data:
   row_id  QuestionId                                       QuestionText  \
0       0       31772  What fraction of the shape is not shaded? Give...   
1       1       31772  What fraction of the shape is not shaded? Give...   
2       2       31772  What fraction of the shape is not shaded? Give...   
3       3       31772  What fraction of the shape is not shaded? Give...   
4       4       31772  What fraction of the shape is not shaded? Give...   

           MC_Answer                                 StudentExplanation  \
0  \( \frac{1}{3} \)                  0ne third is equal to tree nineth   
1  \( \frac{1}{3} \)  1 / 3 because 6 over 9 is 2 thirds and 1 third...   
2  \( \frac{1}{3} \)  1 3rd is half of 3 6th, so it is simplee to un...   
3  \( \frac{1}{3} \)        1 goes into everything and 3 goes into nine   
4  \( \frac{1}{3} \)                    1 out of every 3 isn't coloured   

       Category Misconception  
0  True_Correct           NaN  
1  True_Corre

In [None]:
# Create the canonical list of misconception labels from the training data
all_unique_labels = sorted(list(set(train_df['Category'].dropna()).union(set(train_df['Misconception'].dropna()))))

# A manually curated, richer set of descriptions for a few key misconceptions
rich_misconception_descriptions = {
    "Decimal_Comparison_Whole_Numbers": "The student is incorrectly comparing decimals by ignoring the decimal point and treating the numbers as if they were whole numbers. They may also think that a longer decimal is always a larger number.",
    "Fraction_Visual_Incorrect": "The student has a misconception based on a visual representation of a fraction, perhaps by miscounting the total number of parts or shaded parts from an image.",
    "Incorrect_Denominator": "The student uses the wrong denominator in a fraction, perhaps by not simplifying correctly or miscounting the total number of parts.",
    "Incorrect_Numerator": "The student is using the wrong numerator in a fraction, perhaps by miscounting the number of parts being described.",
    "Additive": "The student is using an additive strategy where a multiplicative one is required. For example, they might add or subtract numbers when they should be multiplying or dividing.",
    "Procedural": "The student follows a faulty procedure or sequence of steps to solve the problem, rather than a correct conceptual understanding.",
    "Lexical": "This is a misconception related to the meaning of a word, not a mathematical concept itself. The student misunderstands the problem's vocabulary.",
    "True_Correct": "The student's response is correct and does not contain a misconception.",
    "True_Neither": "The student's response is neither correct nor does it contain a clear, identifiable misconception.",
    "True_Partial": "The student's response is partially correct, but contains errors or omissions that prevent it from being a fully correct answer.",
}

# Add the rich descriptions to the list of unique labels to ensure they are included
for label in rich_misconception_descriptions.keys():
    if label not in all_unique_labels:
        all_unique_labels.append(label)
all_unique_labels = sorted(all_unique_labels)


# Create a dictionary to hold our knowledge store
misconception_knowledge_store = {}

# Populate the store with either the rich description or a fallback
for i, label in enumerate(all_unique_labels):
    if label in rich_misconception_descriptions:
        description = rich_misconception_descriptions[label]
    else:
        # A good fallback description for any labels not caught by the rules above
        description = f"This response demonstrates a mathematical issue related to the concept: {label}."

    misconception_knowledge_store[label] = {
        "id": i,
        "label": label,
        "description": description,
        "embedding": None
    }

# Print a sample to verify
if "Decimal_Comparison_Whole_Numbers" in misconception_knowledge_store:
    print("--- Updated Misconception Knowledge Store (Sample) ---")
    print(f"Label: Decimal_Comparison_Whole_Numbers")
    print(f"Description: {misconception_knowledge_store['Decimal_Comparison_Whole_Numbers']['description']}\n")
else:
    print("The 'Decimal_Comparison_Whole_Numbers' key does not exist in the training data and was not added.")

--- Updated Misconception Knowledge Store (Sample) ---
Label: Decimal_Comparison_Whole_Numbers
Description: The student is incorrectly comparing decimals by ignoring the decimal point and treating the numbers as if they were whole numbers. They may also think that a longer decimal is always a larger number.



In [None]:
# Load a pre-trained Sentence-Transformer model
# We'll use 'all-MiniLM-L6-v2', which is a good balance of speed and performance.
print("Loading bi-encoder model...")
bi_encoder = SentenceTransformer('all-MiniLM-L6-v2').to(device)
print("Bi-encoder loaded.")

# Generate embeddings for our misconception knowledge store
print("Generating embeddings for misconceptions...")
misconception_texts = [entry['description'] for entry in misconception_knowledge_store.values()]
misconception_embeddings = bi_encoder.encode(misconception_texts, convert_to_tensor=True, show_progress_bar=True)

# Store the embeddings back in our knowledge store
for label, embedding in zip(misconception_knowledge_store.keys(), misconception_embeddings):
    misconception_knowledge_store[label]['embedding'] = embedding

# Sanity check: verify the shape of the embeddings
print(f"Shape of misconception embeddings: {misconception_embeddings.shape}")

# Prepare the FAISS index for fast search
# We use IndexFlatL2 for L2 distance (Euclidean distance) search
dimension = misconception_embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)

# Add the misconception embeddings to the FAISS index
index.add(misconception_embeddings.cpu().numpy())

print(f"FAISS index created with {index.ntotal} embeddings.")

# Now, let's test the retrieval for one student response
sample_response = train_df['cleaned_text'].iloc[0]

print(f"\n--- Testing Retrieval for a Sample Response ---")
print(f"Student Response: {sample_response}")

# Encode the student response
query_embedding = bi_encoder.encode(sample_response, convert_to_tensor=True).reshape(1, -1)

# Search the FAISS index for the top-K nearest neighbors
K = 5
distances, indices = index.search(query_embedding.cpu().numpy(), K)

print(f"\nTop {K} retrieved misconceptions:")
retrieved_misconceptions = [all_unique_labels[i] for i in indices[0]]
for i, label in enumerate(retrieved_misconceptions):
    distance = distances[0][i]
    print(f"{i+1}. Label: {label}, Distance: {distance:.4f}")

Loading bi-encoder model...
Bi-encoder loaded.
Generating embeddings for misconceptions...


Batches:   0%|          | 0/2 [00:00<?, ?it/s]

Shape of misconception embeddings: torch.Size([48, 384])
FAISS index created with 48 embeddings.

--- Testing Retrieval for a Sample Response ---
Student Response: what fraction of the shape is not shaded? give your answer in its simplest form. [image: a triangle split into 9 equal smaller triangles. 6 of them are shaded.] 0ne third is equal to tree nineth

Top 5 retrieved misconceptions:
1. Label: Fraction_Visual_Incorrect, Distance: 1.1473
2. Label: Wrong_Fraction, Distance: 1.4066
3. Label: Wrong_fraction, Distance: 1.4066
4. Label: Division, Distance: 1.4131
5. Label: Incorrect_Numerator, Distance: 1.4326


In [None]:
!pip install rank-bm25 --quiet
from rank_bm25 import BM25Okapi

# Prepare corpus
corpus = [data['description'] for data in misconception_knowledge_store.values()]
tokenized_corpus = [doc.split(" ") for doc in corpus]

# Initialize BM25
bm25 = BM25Okapi(tokenized_corpus)

# REVISED hybrid_search function with debugging prints
def hybrid_search(query_text, k=10, alpha=0.7):
    # Dense (FAISS)
    query_embedding = bi_encoder.encode(query_text, convert_to_tensor=True).reshape(1, -1)
    faiss_distances, faiss_indices = index.search(query_embedding.cpu().numpy(), k)

    max_dist = np.max(faiss_distances)
    min_dist = np.min(faiss_distances)
    dense_scores = 1 - (faiss_distances[0] - min_dist) / (max_dist - min_dist + 1e-6)
    dense_scores_dict = {faiss_indices[0][i]: dense_scores[i] for i in range(k)}

    # BM25 (lexical)
    tokenized_query = query_text.split(" ")
    bm25_scores = bm25.get_scores(tokenized_query)
    top_bm25_indices = np.argsort(bm25_scores)[::-1][:k]

    max_bm25_score = max(bm25_scores)
    if max_bm25_score == 0:
        bm25_scores_dict = {idx: 0 for idx in top_bm25_indices}
    else:
        bm25_scores_dict = {idx: bm25_scores[idx] / max_bm25_score for idx in top_bm25_indices}

    # Combine
    final_scores = {}
    all_indices = set(list(dense_scores_dict.keys()) + list(bm25_scores_dict.keys()))
    for doc_id in all_indices:
        dense_score = dense_scores_dict.get(doc_id, 0)
        bm25_score = bm25_scores_dict.get(doc_id, 0)
        final_scores[doc_id] = alpha * dense_score + (1 - alpha) * bm25_score

    sorted_scores = sorted(final_scores.items(), key=lambda x: x[1], reverse=True)

    # Store results for debugging and return
    retrieved_results = []
    for doc_id, score in sorted_scores[:k]:
        label = all_unique_labels[doc_id]
        description = misconception_knowledge_store[label]['description']
        retrieved_results.append({'label': label, 'score': score, 'description': description})

    return retrieved_results

# Now, let's test the hybrid search function on the problem response
# Make sure to run the notebook from the very top before executing this.
test_response = "I think 0.5 is bigger than 0.75 because 5 is bigger than 75."
cleaned_test_response = "i think 0.5 is bigger than 0.75 because 5 is bigger than 75."

print(f"--- Debugging Hybrid Retrieval for Sample Response ---")
print(f"Student Response: {test_response}")
retrieved_candidates = hybrid_search(cleaned_test_response, k=10, alpha=0.7)

print("\nTop 10 retrieved candidates from Stage A:")
for i, candidate in enumerate(retrieved_candidates):
    print(f"  {i+1}. Label: {candidate['label']} (Score: {candidate['score']:.4f})")
    print(f"     Description: {candidate['description']}\n")


--- Debugging Hybrid Retrieval for Sample Response ---
Student Response: I think 0.5 is bigger than 0.75 because 5 is bigger than 75.

Top 10 retrieved candidates from Stage A:
  1. Label: Decimal_Comparison_Whole_Numbers (Score: 0.9816)
     Description: The student is incorrectly comparing decimals by ignoring the decimal point and treating the numbers as if they were whole numbers. They may also think that a longer decimal is always a larger number.

  2. Label: Scale (Score: 0.5578)
     Description: This response demonstrates a mathematical issue related to the concept: Scale.

  3. Label: Incorrect_Numerator (Score: 0.3592)
     Description: The student is using the wrong numerator in a fraction, perhaps by miscounting the number of parts being described.

  4. Label: Shorter_is_bigger (Score: 0.3466)
     Description: This response demonstrates a mathematical issue related to the concept: Shorter_is_bigger.

  5. Label: Longer_is_bigger (Score: 0.3120)
     Description: This res

In [None]:
# --- Step 1: Binary Classification for Correct vs. Incorrect ---
# Create a new dataset for the binary classifier
binary_examples = []
for index, row in train_df.iterrows():
    # Correct responses get a label of 1.0
    if row['Category'] == 'True_Correct':
        binary_examples.append({'text': row['cleaned_text'], 'label': 1.0})
    # All other responses get a label of 0.0 (incorrect/misconception)
    else:
        binary_examples.append({'text': row['cleaned_text'], 'label': 0.0})

# Create a custom PyTorch Dataset for binary classification
class BinaryDataset(Dataset):
    def __init__(self, examples, tokenizer):
        self.examples = examples
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        example = self.examples[idx]
        tokenized_output = self.tokenizer(example['text'], padding='max_length', truncation=True, return_tensors="pt")
        return {
            'input_ids': tokenized_output['input_ids'].squeeze(),
            'attention_mask': tokenized_output['attention_mask'].squeeze(),
            'labels': torch.tensor(example['label'], dtype=torch.float)
        }

# Load a new binary classifier model
binary_model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=1).to("cuda")
binary_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Create the binary datasets
train_binary_dataset = BinaryDataset(binary_examples, binary_tokenizer)
val_binary_dataset = BinaryDataset(binary_examples[:100], binary_tokenizer) # Placeholder for validation

# Define training arguments
binary_training_args = TrainingArguments(
    output_dir="./binary_classifier_results",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    logging_steps=100,
    report_to="wandb",
)

# Train the binary classifier
binary_trainer = Trainer(
    model=binary_model,
    args=binary_training_args,
    train_dataset=train_binary_dataset,
    eval_dataset=val_binary_dataset
)
print("Starting binary classification training...")
binary_trainer.train()
binary_trainer.save_model("./binary_classifier")

In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from torch.utils.data import Dataset
import numpy as np

# Load a pre-trained cross-encoder model and tokenizer
# We'll use a small BERT model for efficiency
cross_encoder_name = "cross-encoder/ms-marco-TinyBERT-L-2-v2"
cross_encoder_tokenizer = AutoTokenizer.from_pretrained(cross_encoder_name)
cross_encoder_model = AutoModelForSequenceClassification.from_pretrained(cross_encoder_name, num_labels=1).to(device)

# Prepare the data for training the cross-encoder
train_examples = []
# The number of negative examples to sample for each positive example
num_hard_negatives = 3

# Map labels to their descriptions for easy access
label_to_desc = {label: data['description'] for label, data in misconception_knowledge_store.items()}

for i, row in train_df.iterrows():
    student_response = row['cleaned_text']

    # Get the true positive label(s)
    positive_labels = []
    if pd.notna(row['Category']):
        positive_labels.append(row['Category'])
    if pd.notna(row['Misconception']):
        positive_labels.append(row['Misconception'])

    # Get the positive example(s)
    for pos_label in positive_labels:
        if pos_label in label_to_desc:
            pos_desc = label_to_desc[pos_label]
            # A positive example has a score of 1
            train_examples.append({'text_a': student_response, 'text_b': pos_desc, 'label': 1.0})

    # Get hard negative examples using our hybrid retrieval function
    # We want to retrieve examples that are close, but incorrect
    retrieved_candidates = hybrid_search(student_response, k=10, alpha=0.7)

    negative_labels = [
        candidate['label'] for candidate in retrieved_candidates
        if candidate['label'] not in positive_labels and "True" not in candidate['label']
    ]

    # Sample a few hard negatives
    hard_negatives = random.sample(negative_labels, min(len(negative_labels), num_hard_negatives))

    for neg_label in hard_negatives:
        neg_desc = label_to_desc[neg_label]
        # A negative example has a score of 0
        train_examples.append({'text_a': student_response, 'text_b': neg_desc, 'label': 0.0})

# Create a custom PyTorch Dataset
class MisconceptionDataset(Dataset):
    def __init__(self, examples, tokenizer):
        self.examples = examples
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        example = self.examples[idx]
        text_a = example['text_a']
        text_b = example['text_b']
        label = example['label']

        # Tokenize the pair of texts
        tokenized_output = self.tokenizer(text_a, text_b, padding='max_length', truncation=True, return_tensors="pt")

        return {
            'input_ids': tokenized_output['input_ids'].squeeze(),
            'attention_mask': tokenized_output['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.float)
        }

# Initialize the datasets
train_dataset = MisconceptionDataset(train_examples, cross_encoder_tokenizer)
# Note: For validation, you would do a similar process on val_df
# For simplicity, we'll use a small subset of the training data as a placeholder for validation
val_dataset = MisconceptionDataset(train_examples[:100], cross_encoder_tokenizer)

print(f"Total training examples for cross-encoder: {len(train_dataset)}")

Total training examples for cross-encoder: 156644


In [None]:
# --- Step 1: Binary Classification for Correct vs. Incorrect ---
# Create a new dataset for the binary classifier
binary_examples = []
for index, row in train_df.iterrows():
    # Correct responses get a label of 1.0
    if row['Category'] == 'True_Correct':
        binary_examples.append({'text': row['cleaned_text'], 'label': 1.0})
    # All other responses get a label of 0.0 (incorrect/misconception)
    else:
        binary_examples.append({'text': row['cleaned_text'], 'label': 0.0})

# Create a custom PyTorch Dataset for binary classification
class BinaryDataset(Dataset):
    def __init__(self, examples, tokenizer):
        self.examples = examples
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        example = self.examples[idx]
        tokenized_output = self.tokenizer(example['text'], padding='max_length', truncation=True, return_tensors="pt")
        return {
            'input_ids': tokenized_output['input_ids'].squeeze(),
            'attention_mask': tokenized_output['attention_mask'].squeeze(),
            'labels': torch.tensor(example['label'], dtype=torch.float)
        }

# Load a new binary classifier model
binary_model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=1).to("cuda")
binary_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Create the binary datasets
train_binary_dataset = BinaryDataset(binary_examples, binary_tokenizer)
val_binary_dataset = BinaryDataset(binary_examples[:100], binary_tokenizer) # Placeholder for validation

# Define training arguments
binary_training_args = TrainingArguments(
    output_dir="./binary_classifier_results",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    logging_steps=100,
    report_to="wandb",
)

# Train the binary classifier
binary_trainer = Trainer(
    model=binary_model,
    args=binary_training_args,
    train_dataset=train_binary_dataset,
    eval_dataset=val_binary_dataset
)
print("Starting binary classification training...")
binary_trainer.train()
binary_trainer.save_model("./binary_classifier")

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Starting binary classification training...


Step,Training Loss
100,0.2444
200,0.2026
300,0.1943
400,0.1918
500,0.1828
600,0.1646
700,0.1606
800,0.1558
900,0.1517
1000,0.128


In [None]:
# --- REVISED STEP 7: FINE-TUNING THE CROSS-ENCODER ---

# 1. Calculate class weights for the loss function
positive_count = sum(1 for ex in train_examples if ex['label'] == 1.0)
negative_count = sum(1 for ex in train_examples if ex['label'] == 0.0)
pos_weight_value = negative_count / positive_count if positive_count > 0 else 1.0
print(f"Positive count: {positive_count}, Negative count: {negative_count}")
print(f"Calculated `pos_weight_value`: {pos_weight_value:.2f}")

# 2. Create a custom Trainer that uses a weighted loss function
from torch.nn import BCEWithLogitsLoss

class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        # Squeeze labels to match logits shape
        labels = labels.squeeze()

        # Use the calculated weight
        loss_fct = BCEWithLogitsLoss(pos_weight=torch.tensor(pos_weight_value, device=model.device))
        loss = loss_fct(logits.squeeze(), labels)

        return (loss, outputs) if return_outputs else loss

# 3. Initialize the new trainer
training_args = TrainingArguments(
    output_dir="./cross_encoder_results",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    learning_rate=2e-5,
    logging_dir='./logs',
    logging_steps=100,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    report_to="wandb",
    fp16=True
)

trainer = WeightedTrainer(
    model=cross_encoder_model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

# Start training
print("Starting cross-encoder fine-tuning with weighted loss...")
trainer.train()


Positive count: 46556, Negative count: 110088
Calculated `pos_weight_value`: 2.36
Starting cross-encoder fine-tuning with weighted loss...


Epoch,Training Loss,Validation Loss
1,0.114,3e-06
2,0.1034,1e-06
3,0.1155,2e-06


TrainOutput(global_step=58743, training_loss=0.22637566067055792, metrics={'train_runtime': 1303.3047, 'train_samples_per_second': 360.57, 'train_steps_per_second': 45.072, 'total_flos': 596856287268864.0, 'train_loss': 0.22637566067055792, 'epoch': 3.0})

In [None]:
# A simple, rule-based augmentation function
def augment_response(text, label):
    augmented_examples = []
    # Augmentation for Decimal_Comparison_Whole_Numbers
    if label == "Decimal_Comparison_Whole_Numbers":
        original_numbers = re.findall(r'\d+\.\d+|\d+', text)
        if len(original_numbers) >= 2:
            num1 = float(original_numbers[0])
            num2 = float(original_numbers[1])
            new_text = f"I think {num2} is bigger than {num1} because {int(num2*100)} is bigger than {int(num1*100)}."
            augmented_examples.append(new_text)

    # You can add more rules for other labels as needed
    return augmented_examples

# Apply augmentation to your training data and add to train_examples
augmented_count = 0
for ex in train_examples:
    augmented_texts = augment_response(ex['text_a'], ex['text_b'])
    for aug_text in augmented_texts:
        # Create a new example with the augmented text and the original label
        train_examples.append({'text_a': aug_text, 'text_b': ex['text_b'], 'label': ex['label']})
        augmented_count += 1

print(f"Added {augmented_count} augmented examples.")

NameError: name 'train_examples' is not defined

In [None]:
# --- Step 3: The New End-to-End Pipeline ---
def new_predict_misconceptions(student_response, k=10, alpha=0.7, default_threshold=0.5):
    """
    Two-stage pipeline: first classifies as correct/incorrect, then identifies misconception if needed.
    """
    # Stage 1: Check if the response is correct using the binary classifier
    binary_input = binary_tokenizer(student_response, return_tensors="pt").to("cuda")
    with torch.no_grad():
        binary_logit = binary_model(**binary_input).logits.squeeze().item()
        binary_prob = torch.sigmoid(torch.tensor(binary_logit)).item()

    # If the model is confident it's correct, stop here.
    if binary_prob > 0.8: # Use a high threshold to be cautious
        return [{'label': 'True_Correct', 'score': binary_prob, 'description': 'The student\'s response is correct and does not contain a misconception.'}]

    # Stage 2: If classified as incorrect, run the RAG pipeline for misconceptions
    # The rest of this is your original predict_misconceptions function

    # Preprocess the student response
    cleaned_response = clean_text(student_response)

    # Hybrid Candidate Retrieval (Bi-encoder + BM25)
    retrieved_candidates = hybrid_search(cleaned_response, k=k, alpha=alpha)

    # Cross-Encoder Re-ranking
    reranking_pairs = []
    retrieved_labels = [candidate['label'] for candidate in retrieved_candidates]
    for label in retrieved_labels:
        description = misconception_knowledge_store[label]['description']
        reranking_pairs.append([cleaned_response, description])

    with torch.no_grad():
        tokenized_output = cross_encoder_tokenizer(reranking_pairs, padding=True, truncation=True, return_tensors="pt").to("cuda")
        reranking_scores = cross_encoder_model(**tokenized_output).logits.squeeze().cpu().numpy()

    scored_candidates = {
        retrieved_labels[i]: reranking_scores[i] for i in range(len(retrieved_labels))
    }

    # Final Thresholding and Output
    final_predictions = []
    probabilities = torch.sigmoid(torch.tensor(list(scored_candidates.values())))

    for i, label in enumerate(scored_candidates.keys()):
        score = probabilities[i].item()
        if score > default_threshold:
            final_predictions.append({
                'label': label,
                'score': score,
                'description': misconception_knowledge_store[label]['description']
            })

    final_predictions = sorted(final_predictions, key=lambda x: x['score'], reverse=True)

    return final_predictions

# --- Test the new complete pipeline with a sample response ---
test_response = "I think 0.5 is bigger than 0.75 because 5 is bigger than 75."

print(f"Student Response: {test_response}")
predictions = new_predict_misconceptions(test_response)

if predictions:
    print("\nPredicted Misconceptions:")
    for pred in predictions:
        print(f"  - Label: {pred['label']}")
        print(f"    Score: {pred['score']:.4f}")
        print(f"    Description: {pred['description']}\n")
else:
    print("No misconceptions detected for this response.")

In [None]:
import torch
import torch.nn.functional as F

def predict_misconceptions(student_response, k=5, alpha=0.7, threshold=0.5):
    """
    End-to-end pipeline to predict mathematical misconceptions.

    Args:
        student_response (str): The text of the student's explanation.
        k (int): The number of candidates to retrieve from Stage A.
        alpha (float): The weighting for the hybrid retrieval score.
        threshold (float): The final score threshold for classifying a misconception as present.

    Returns:
        A list of dictionaries with the predicted misconceptions, their scores, and descriptions.
    """
    # 1. Preprocess the student response
    cleaned_response = clean_text(student_response)

    # 2. Stage A: Hybrid Candidate Retrieval (Bi-encoder + BM25)
    retrieved_candidates = hybrid_search(cleaned_response, k=k, alpha=alpha)

    # 3. Stage B: Cross-Encoder Re-ranking
    # Prepare pairs for the cross-encoder
    reranking_pairs = []
    retrieved_labels = [candidate['label'] for candidate in retrieved_candidates]
    for label in retrieved_labels:
        description = misconception_knowledge_store[label]['description']
        reranking_pairs.append([cleaned_response, description])

    # Tokenize and run through the cross-encoder
    with torch.no_grad():
        tokenized_output = cross_encoder_tokenizer(reranking_pairs, padding=True, truncation=True, return_tensors="pt").to(device)
        reranking_scores = cross_encoder_model(**tokenized_output).logits.squeeze().cpu().numpy()

    # Create a dictionary to map scores to labels
    scored_candidates = {
        retrieved_labels[i]: reranking_scores[i] for i in range(len(retrieved_labels))
    }

    # 4. Final Thresholding and Output
    final_predictions = []
    # Use a sigmoid function to convert logits to probabilities
    probabilities = torch.sigmoid(torch.tensor(list(scored_candidates.values())))

    for i, label in enumerate(scored_candidates.keys()):
        score = probabilities[i].item()
        if score > threshold:
            final_predictions.append({
                'label': label,
                'score': score,
                'description': misconception_knowledge_store[label]['description']
            })

    # Sort the final predictions by score in descending order
    final_predictions = sorted(final_predictions, key=lambda x: x['score'], reverse=True)

    return final_predictions

# --- Test the complete pipeline with a new student response ---
test_response = "I think 0.5 is bigger than 0.75 because 5 is bigger than 75."

print(f"Student Response: {test_response}")
predictions = predict_misconceptions(test_response, k=10, threshold=0.5)

if predictions:
    print("\nPredicted Misconceptions:")
    for pred in predictions:
        print(f"  - Label: {pred['label']}")
        print(f"    Score: {pred['score']:.4f}")
        print(f"    Description: {pred['description']}\n")
else:
    print("No misconceptions detected for this response.")

Student Response: I think 0.5 is bigger than 0.75 because 5 is bigger than 75.


AttributeError: 'int' object has no attribute 'search'