In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

map_data_path = kagglehub.competition_download('map-charting-student-math-misunderstandings')

print('Data source import complete.')

In [None]:
# gemma_2_9b_it_cv945 = kagglehub.dataset_download("cdeotte/gemma2-9b-it-cv945")
# gemma2_9b_it_bf16 = kagglehub.dataset_download("cdeotte/gemma2-9b-it-bf16")
# print(gemma_2_9b_it_cv945)
# print(gemma2_9b_it_bf16)

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

VER=1
model_name = "google/gemma-2-9b-it"
# model_name = gemma_2_9b_it_cv945
EPOCHS = 3

DIR = f"ver_{VER}"
os.makedirs(DIR, exist_ok=True)

In [None]:
import pandas as pd, numpy as np
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
train = pd.read_csv(f'{map_data_path}/train.csv')
test = pd.read_csv(f'{map_data_path}/test.csv')


train.Misconception = train.Misconception.fillna('NA')
train['target'] = train.Category+":"+train.Misconception
train['label'] = le.fit_transform(train['target'])
target_classes = le.classes_
n_classes = len(target_classes)
print(f"Train shape: {train.shape} with {n_classes} target classes")
train.head()

In [None]:
train.target.value_counts()

In [None]:
idx = train.apply(lambda row: row.Category.split('_')[0],axis=1)=='True'
correct = train.loc[idx].copy()
correct['c'] = correct.groupby(['QuestionId','MC_Answer']).MC_Answer.transform('count')
correct = correct.sort_values('c',ascending=False)
correct = correct.drop_duplicates(['QuestionId'])
correct = correct[['QuestionId','MC_Answer']]
correct['is_correct'] = 1

train = train.merge(correct, on=['QuestionId','MC_Answer'], how='left')
train.is_correct = train.is_correct.fillna(0)

In [None]:
import torch
from transformers import AutoTokenizer, GemmaTokenizerFast
from sklearn.model_selection import train_test_split
from datasets import Dataset
import numpy as np

tokenizer = AutoTokenizer.from_pretrained(model_name)
MAX_LEN = 392
# MAX_LEN = 296


In [None]:
import random

def seed_everything(seed: int):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [None]:
def format_input(row):
    x = "Yes"
    if not row['is_correct']:
        x = "No"

    return (
        "You are a meticulous AI assistant analyzing a student's mathematical reasoning. Follow these steps to arrive at your classification:\n"
        "1.  **Understand the Question:** First, read the math problem to understand what is being asked.\n"
        "2.  **Evaluate the Answer:** Note the student's selected answer and whether it was correct or incorrect.\n"
        "3.  **Diagnose the Explanation:** This is the most critical step. Carefully read the student's written explanation to diagnose their thought process. Does it justify the answer? Does it reveal a specific mathematical error? Is it irrelevant or a guess?\n\n"
        "### Math Problem\n"
        f"Question: {row['QuestionText']}\n"
        f"Student's Answer: {row['MC_Answer']} (This was the **{ 'correct' if x == 'Yes' else 'incorrect' }** choice)\n\n"
        "### Student's Reasoning\n"
        f"Explanation: \"{row['StudentExplanation']}\"\n\n"
        "### Diagnosis\n"
        "Based on the explanation, provide the classification for the student's reasoning."
    )


train['text'] = train.apply(format_input,axis=1)
print("Example prompt for our LLM:")
print()
print( train.text.values[0] )


# choices_df = train.groupby('QuestionId')['MC_Answer'].apply(list).reset_index(name='all_choices')
# choices_df['all_choices'] = choices_df['all_choices'].apply(lambda x: sorted(list(set(x))))
# train = train.merge(choices_df, on='QuestionId', how='left')

# def format_input_with_choices(row):
#     x = "Yes"
#     if not row['is_correct']:
#         x = "No"

#     choice_str = "\n".join([f"({chr(65+i)}) {choice}" for i, choice in enumerate(row['all_choices'])])

#     return (
#         f"Analyze the student's reasoning for the following math problem.\n\n"
#         f"Question: {row['QuestionText']}\n"
#         f"Available Choices:\n{choice_str}\n\n"
#         f"Student's Selected Answer: {row['MC_Answer']}\n"
#         f"Was this answer correct? {x}\n"
#         f"Student's Explanation: {row['StudentExplanation']}"
#     )

# train['text'] = train.apply(format_input_with_choices, axis=1)


In [None]:
lengths = [len(tokenizer.encode(t, truncation=False)) for t in train["text"]]
import matplotlib.pyplot as plt

plt.hist(lengths, bins=50)
plt.title("Token Length Distribution")
plt.xlabel("Number of tokens")
plt.ylabel("Frequency")
plt.grid(True)
plt.show()


In [None]:
L = (np.array(lengths)>MAX_LEN).sum()
print(f"There are {L} train sample(s) with more than {MAX_LEN} tokens")
np.sort( lengths )

In [None]:
train_df, val_df = train_test_split(train, test_size=0.2, random_state=42)

COLS = ['text','label']
train_ds = Dataset.from_pandas(train_df[COLS])
val_ds = Dataset.from_pandas(val_df[COLS])

In [None]:
# Tokenization function
# def tokenize(batch):
#     return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=256)

def tokenize(batch):
    return tokenizer(batch["text"], truncation=True, max_length=MAX_LEN)


train_ds = train_ds.map(tokenize, batched=True)
val_ds = val_ds.map(tokenize, batched=True)

columns = ['input_ids', 'attention_mask', 'label']
train_ds.set_format(type='torch', columns=columns)
val_ds.set_format(type='torch', columns=columns)

In [None]:
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
from peft import get_peft_model, LoraConfig, TaskType
import torch
import json


peft_config = LoraConfig(
    r=32,
    lora_alpha=4,
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.SEQ_CLS,
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
    modules_to_save=["score","classifier_head1", "classifier_head2"]
)

# base_model_name = "google/gemma-2-9b-it"

model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=n_classes,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    # attn_implementation="flash_attention_2"
)

model = get_peft_model(model, peft_config)

In [None]:
from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

training_args = TrainingArguments(
    output_dir = f"./{DIR}",
    do_train=True,
    do_eval=True,
    eval_strategy="steps",
    save_strategy="steps", #no for no saving
    num_train_epochs=EPOCHS,
    per_device_train_batch_size=8,
    gradient_accumulation_steps=2,
    per_device_eval_batch_size=16,
    learning_rate=1e-4,
    logging_dir="./logs",
    logging_steps=50,
    save_steps=200,
    eval_steps=200,
    save_total_limit=1,
    metric_for_best_model="map@3",
    greater_is_better=True,
    load_best_model_at_end=True,
    report_to="none",
    bf16=True,
    fp16=False,

    lr_scheduler_type="cosine",
    weight_decay=0.01,
    warmup_ratio=0.03,
)



# training_args = TrainingArguments(
#     output_dir = f"./{DIR}",
#     do_train=True,
#     do_eval=False,

#     # eval_strategy="steps",
#     # save_strategy="steps", #no for no saving
#     # save_total_limit=1,
#     # metric_for_best_model="map@3",
#     # greater_is_better=True,
#     # eval_steps=200,
#     # load_best_model_at_end=True,

#     save_strategy="epoch",
#     num_train_epochs=EPOCHS,
#     per_device_train_batch_size=8,
#     gradient_accumulation_steps=2,
#     # per_device_eval_batch_size=16,
#     learning_rate=1e-4,
#     logging_dir="./logs",
#     logging_steps=50,
#     save_steps=200,
#     report_to="none",
#     bf16=True,
#     fp16=False,
#     save_total_limit=1,
#     lr_scheduler_type="cosine",
#     weight_decay=0.01,
#     warmup_ratio=0.03,
# )

In [None]:
# CUSTOM MAP@3 METRIC

from sklearn.metrics import average_precision_score

def compute_map3(eval_pred):
    logits, labels = eval_pred
    probs = torch.nn.functional.softmax(torch.tensor(logits), dim=-1).numpy()

    top3 = np.argsort(-probs, axis=1)[:, :3]  # Top 3 predictions
    match = (top3 == labels[:, None])

    # Compute MAP@3 manually
    map3 = 0
    for i in range(len(labels)):
        if match[i, 0]:
            map3 += 1.0
        elif match[i, 1]:
            map3 += 1.0 / 2
        elif match[i, 2]:
            map3 += 1.0 / 3
    return {"map@3": map3 / len(labels)}

In [None]:
# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_map3,
)

trainer.train()


In [None]:
## 0.9448
    # lr_scheduler_type="cosine",
    # weight_decay=0.01,
    # warmup_ratio=0.03,
    #     r=32,  # The rank of the update matrices. Lower rank means fewer trainable parameters.
    # lora_alpha=4,  # A scaling factor for the LoRA weights.

## 0.941
    # lr_scheduler_type="cosine",
    # weight_decay=0.01,
    # warmup_ratio=0.05,
    #     r=32,  # The rank of the update matrices. Lower rank means fewer trainable parameters.
    # lora_alpha=4,  # A scaling factor for the LoRA weights.

# alpha 8:
#0.943

# alpha 2:
# 0.942

# alpha 16:
# 0.943

## 0.945073 3.prompt


In [None]:
trainer.save_model(f"ver_{VER}")
tokenizer.save_pretrained(f"ver_{VER}")

In [None]:
test = test.merge(correct, on=['QuestionId','MC_Answer'], how='left')
test.is_correct = test.is_correct.fillna(0)

test['text'] = test.apply(format_input,axis=1)

test.head()

In [None]:
ds_test = Dataset.from_pandas(test[['text']])
ds_test = ds_test.map(tokenize, batched=True)

predictions = trainer.predict(ds_test)
probs = torch.nn.functional.softmax(torch.tensor(predictions.predictions), dim=1).numpy()

In [None]:
# Get top 3 predicted class indices
top3 = np.argsort(-probs, axis=1)[:, :3]   # shape: [num_samples, 3]

# Decode numeric class indices to original string labels
flat_top3 = top3.flatten()
decoded_labels = le.inverse_transform(flat_top3)
top3_labels = decoded_labels.reshape(top3.shape)

# Join 3 labels per row with space
joined_preds = [" ".join(row) for row in top3_labels]

# Save submission
sub = pd.DataFrame({
    "row_id": test.row_id.values,
    "Category:Misconception": joined_preds
})
sub.to_csv("submission.csv", index=False)
sub.head()