# MAP - Charting Student Math Misunderstandings

## Imports and Data Loading

In [27]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import RobertaTokenizer, RobertaModel, RobertaConfig
from tqdm import tqdm
import joblib
import torch.optim as optim
from transformers import get_linear_schedule_with_warmup
from transformers import AutoModel, AutoTokenizer, AutoConfig
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
from nltk.util import trigrams
from sklearn.feature_extraction.text import TfidfVectorizer
import string
import re
import os
import zipfile
from IPython.display import FileLink

In [28]:
#  Load & train and test data
data = pd.read_csv('/kaggle/input/map-charting-student-math-misunderstandings/train.csv')
test_df_original = pd.read_csv('/kaggle/input/map-charting-student-math-misunderstandings/test.csv')

In [29]:
data.duplicated().sum()

0

In [30]:
# Impute missing values for the `Misconception` column with NA
data['Misconception'] = data['Misconception'].fillna('NA')

In [31]:
# Create the is_correct feature
idx = data.apply(lambda row: row.Category.split('_')[0], axis=1) == 'True'
tmp = data.loc[idx].copy()
tmp['c'] = tmp.groupby(['QuestionId', 'MC_Answer']).MC_Answer.transform('count')
tmp = tmp.sort_values('c', ascending=False)
tmp = tmp.drop_duplicates(['QuestionId'])
tmp = tmp[['QuestionId', 'MC_Answer']]
tmp['is_correct'] = 1

# Create for train dataset
data = data.merge(tmp, on=['QuestionId', 'MC_Answer'], how='left')
data.is_correct = data.is_correct.fillna(0)

# Create for test dataset
test_df_original = test_df_original.merge(tmp, on=['QuestionId','MC_Answer'], how='left')
test_df_original.is_correct = test_df_original.is_correct.fillna(0)

In [32]:
# Create the train_text feature
data['train_text'] = (
data['QuestionId'].astype(str) + " " +
data['QuestionText'].astype(str) + " " +
data['MC_Answer'].astype(str) + " " +
data['StudentExplanation'].astype(str)
)

In [33]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
from nltk.util import trigrams
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger_eng') 

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /usr/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /usr/share/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package omw-1.4 to /usr/share/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /usr/share/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!


True

In [34]:
lemmatizer = WordNetLemmatizer()

def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return 'a'  # adjective
    elif treebank_tag.startswith('V'):
        return 'v'  # verb
    elif treebank_tag.startswith('N'):
        return 'n'  # noun
    elif treebank_tag.startswith('R'):
        return 'r'  # adverb
    else:
        return 'n'  # default to noun

def preprocess_text(row):
    # Extract and preprocess each component
    question_id = str(row['QuestionId']).strip()
    question_text = str(row['QuestionText']).lower().strip()
    mc_answer = str(row['MC_Answer']).lower().strip()
    student_explanation = str(row['StudentExplanation']).lower().strip()

    # Convert frac notation to proper fraction
    question_text = re.sub(r'frac(\d+)/(\d+)', r'\1/\2', question_text)
    mc_answer = re.sub(r'frac(\d+)/(\d+)', r'\1/\2', mc_answer)
    student_explanation = re.sub(r'frac(\d+)/(\d+)', r'\1/\2', student_explanation)

    # Remove excessive noise, preserve numbers and fractions
    question_text = re.sub(r'[^a-z0-9\s/]', '', question_text)
    mc_answer = re.sub(r'[^a-z0-9\s/]', '', mc_answer)
    student_explanation = re.sub(r'[^a-z0-9\s/]', '', student_explanation)

    # Tokenize and lemmatize (optional for explanation, skip for IDs and answers to preserve exact terms)
    tokens = word_tokenize(student_explanation)
    tagged_tokens = pos_tag(tokens)
    lemmatized_explanation = ' '.join([lemmatizer.lemmatize(token, get_wordnet_pos(tag)) if not token.isdigit() and '/' not in token else token for token, tag in tagged_tokens])

    # Construct structured train_text
    train_text = f"[QID] {question_id} [QTXT] {question_text} [ANS] {mc_answer} [EXP] {lemmatized_explanation}"
    return train_text.strip()

# Apply preprocessing to train_text feature
data['train_text'] = data.apply(preprocess_text, axis=1)

In [35]:
data.head(2)

Unnamed: 0,row_id,QuestionId,QuestionText,MC_Answer,StudentExplanation,Category,Misconception,is_correct,train_text
0,0,31772,What fraction of the shape is not shaded? Give...,\( \frac{1}{3} \),0ne third is equal to tree nineth,True_Correct,,1.0,[QID] 31772 [QTXT] what fraction of the shape ...
1,1,31772,What fraction of the shape is not shaded? Give...,\( \frac{1}{3} \),1 / 3 because 6 over 9 is 2 thirds and 1 third...,True_Correct,,1.0,[QID] 31772 [QTXT] what fraction of the shape ...


In [36]:
data.iloc[8, 8]

'[QID] 31772 [QTXT] what fraction of the shape is not shaded give your answer in its simplest form image a triangle split into 9 equal smaller triangles 6 of them are shaded [ANS]  frac13  [EXP] 1/3 because i work out that 3/9 be not shade and because 3 and 9 have a common factor of 3 i divide the numerator and denominator by 3 to give me 1/3'

In [37]:
# Create the combined_label feature (multi-class target)
data['combined_label'] = data['Category'].astype(str) + ':' + data['Misconception'].astype(str)

# Drop labels that have less than 2 instances
vc = data['combined_label'].value_counts()
classes_to_keep = vc[vc >= 2].index
train_filtered = data[data['combined_label'].isin(classes_to_keep)].copy()

# Encode the combined_label to numeric values
label_le = LabelEncoder()
train_filtered['combined_label_encoded'] = label_le.fit_transform(train_filtered['combined_label'])
y_encoded = train_filtered['combined_label_encoded']

In [38]:
# Encode QuestionId with a stable mapping (0 reserved for UNK/PAD)
unique_question_ids = sorted(train_filtered['QuestionId'].unique())
question_id_to_encoded = {q_id: i + 1 for i, q_id in enumerate(unique_question_ids)}
question_id_to_encoded['UNSEEN_ID'] = 0 # reserve 0 for unknown/pad
train_filtered['QuestionId_encoded'] = train_filtered['QuestionId'].map(question_id_to_encoded).fillna(0).astype(int)

In [39]:
# Save label mappings 
joblib.dump(dict(enumerate(label_le.classes_)), 'label_to_original.pkl')
joblib.dump(question_id_to_encoded, 'question_id_to_encoded.pkl')
print(f"Number of misconception classes: {len(label_le.classes_)}")
print(f"Number of unique Question IDs: {len(unique_question_ids)}")

Number of misconception classes: 60
Number of unique Question IDs: 15


In [40]:
# Perform Train-Test split
train_features_df = train_filtered[['train_text', 'QuestionId_encoded', 'is_correct', 'combined_label_encoded']]
train_df, val_df = train_test_split(
train_features_df,
test_size=0.2,
random_state=42,
stratify=y_encoded
)
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

# define num_classes (indices go from 0..U where 0 is UNK/PAD and max is len(unique_question_ids))
num_classes = len(label_le.classes_)

## Modelling

In [41]:
import torch
from torch.utils.data import Dataset

# Define dataset classes
num_question_ids = len(unique_question_ids) + 1  

class MathMisconceptionDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        """
        Dataset for Math Misconception task, compatible with SciBERT tokenizer
        (allenai/scibert_scivocab_uncased).
        """
        self.texts = dataframe['train_text'].values
        self.question_ids = dataframe['QuestionId_encoded'].values
        self.is_corrects = dataframe['is_correct'].values
        self.labels = dataframe['combined_label_encoded'].values
        self.tokenizer = tokenizer  # Expected to be SciBERT tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        question_id = int(self.question_ids[idx])
        is_correct = float(self.is_corrects[idx])
        label = int(self.labels[idx])

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'question_id': torch.tensor(question_id, dtype=torch.long),
            'is_correct': torch.tensor(is_correct, dtype=torch.float),
            'label': torch.tensor(label, dtype=torch.long)
        }

In [42]:
# Define custom SciBERT model
class CustomSciBERTModel(nn.Module):
    def __init__(self, scibert_model, num_classes, num_question_ids, question_id_embedding_dim=64):
        super().__init__()
        self.scibert = scibert_model
        self.question_id_embedding = nn.Embedding(num_question_ids, question_id_embedding_dim, padding_idx=0)
        self.is_correct_layer = nn.Linear(1, 16)
        hidden = self.scibert.config.hidden_size + question_id_embedding_dim + 16
        self.classifier = nn.Sequential(
            nn.Linear(hidden, 256),
            nn.ReLU(),
            nn.Dropout(self.scibert.config.hidden_dropout_prob),
            nn.Linear(256, num_classes)
        )

    def forward(self, input_ids, attention_mask, question_id, is_correct):
        scibert_out = self.scibert(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        qid_emb = self.question_id_embedding(question_id)
        ic_emb = self.is_correct_layer(is_correct.unsqueeze(1).float()) # ensure float for Linear
        features = torch.cat((scibert_out, qid_emb, ic_emb), dim=1)
        logits = self.classifier(features)
        return logits

In [43]:
# Define function to compute MAP@3 metric
def map_at_3(predictions, true_labels):
    top_3_pred = np.argsort(predictions, axis=1)[:, -3:][:, ::-1]
    aps = []
    for i in range(len(true_labels)):
        true_label = true_labels[i]
        preds = top_3_pred[i]
        hits = (preds == true_label)
        if not np.any(hits):
            aps.append(0.0)
        else:
            rank = np.where(hits)[0][0] + 1
            aps.append(1.0 / rank)
    return float(np.mean(aps))

In [44]:
# Train / Eval loops
def train_epoch(model, dataloader, optimizer, scheduler, device, accumulation_steps=4):
    model.train()
    total_loss = 0.0
    all_preds, all_labels = [], []
    loss_fn = nn.CrossEntropyLoss()

    optimizer.zero_grad(set_to_none=True)

    for step, batch in tqdm(enumerate(dataloader), total=len(dataloader), desc='Training'):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        question_id = batch['question_id'].to(device)
        is_correct = batch['is_correct'].to(device)
        labels = batch['label'].to(device)

        logits = model(input_ids=input_ids, attention_mask=attention_mask,
                       question_id=question_id, is_correct=is_correct)
        loss = loss_fn(logits, labels)
        (loss / accumulation_steps).backward()

        if (step + 1) % accumulation_steps == 0 or (step + 1) == len(dataloader):
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            optimizer.zero_grad(set_to_none=True)
            if scheduler is not None:
                scheduler.step()

        total_loss += loss.item()
        all_preds.append(logits.detach().cpu().numpy())
        all_labels.append(labels.detach().cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    preds_np = np.concatenate(all_preds, axis=0)
    labels_np = np.concatenate(all_labels, axis=0)
    train_map3 = map_at_3(preds_np, labels_np)
    return avg_loss, train_map3


def eval_model(model, dataloader, device):
    model.eval()
    total_loss = 0.0
    all_preds, all_labels = [], []
    loss_fn = nn.CrossEntropyLoss()

    with torch.no_grad():
        for batch in tqdm(dataloader, total=len(dataloader), desc='Evaluating'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            question_id = batch['question_id'].to(device)
            is_correct = batch['is_correct'].to(device)
            labels = batch['label'].to(device)

            logits = model(input_ids=input_ids, attention_mask=attention_mask,
                           question_id=question_id, is_correct=is_correct)
            loss = loss_fn(logits, labels)
            total_loss += loss.item()

            all_preds.append(logits.detach().cpu().numpy())
            all_labels.append(labels.detach().cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    preds_np = np.concatenate(all_preds, axis=0)
    labels_np = np.concatenate(all_labels, axis=0)
    val_map3 = map_at_3(preds_np, labels_np)
    return avg_loss, val_map3

In [None]:
# Initialize Tokenizer, Pretrained SciBERT transformer and training

# Define hyperparameters
MAX_LEN = 256
BATCH_SIZE = 8
ACCUMULATION_STEPS = 4
LEARNING_RATE = 2e-5
EPOCHS = 3
WARMUP_STEPS = 0

# Tokenizer & base model
tokenizer = AutoTokenizer.from_pretrained('allenai/scibert_scivocab_uncased')
config = AutoConfig.from_pretrained('allenai/scibert_scivocab_uncased')
base_scibert_model = AutoModel.from_pretrained('allenai/scibert_scivocab_uncased', config=config)

# Model
aodel = CustomSciBERTModel(
    scibert_model=base_scibert_model,
    num_classes=num_classes,
    num_question_ids=num_question_ids,
)

# Optimizer, device, loaders
optimizer = optim.AdamW(aodel.parameters(), lr=LEARNING_RATE, eps=1e-8)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
aodel.to(device)
print(f"Using device: {device}")

train_dataset = MathMisconceptionDataset(train_df, tokenizer, MAX_LEN)
val_dataset = MathMisconceptionDataset(val_df, tokenizer, MAX_LEN)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)

# Scheduler (step per optimizer step)
update_steps_per_epoch = int(np.ceil(len(train_dataloader) / ACCUMULATION_STEPS))
total_steps = update_steps_per_epoch * EPOCHS
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=WARMUP_STEPS,
    num_training_steps=total_steps,
)

best_val_map3 = 0.0
for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch + 1}/{EPOCHS}")
    print('-' * 30)
    train_loss, train_map3 = train_epoch(
        aodel, train_dataloader, optimizer, scheduler, device,
        accumulation_steps=ACCUMULATION_STEPS,
    )
    print(f"Train Loss: {train_loss:.4f} | Train MAP@3: {train_map3:.4f}")

    val_loss, val_map3 = eval_model(aodel, val_dataloader, device)
    print(f"Validation Loss: {val_loss:.4f} | Validation MAP@3: {val_map3:.4f}")

    if val_map3 > best_val_map3:
        best_val_map3 = val_map3
        # Save model and tokenizer
        torch.save(aodel.state_dict(), './SciBERT_math_misconception_custom.pth')
        tokenizer.save_pretrained('./SciBERT_math_misconception_tokenizer')
        
        # Create zip file containing model and tokenizer
        zip_filename = 'scibert-finetuned.zip'
        with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
            zipf.write('./SciBERT_math_misconception_custom.pth')
            for root, _, files in os.walk('./SciBERT_math_misconception_tokenizer'):
                for file in files:
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, './')
                    zipf.write(file_path, arcname)
        
        print(f"Saved improved model and tokenizer as {zip_filename} with Validation MAP@3: {val_map3:.4f}")
        
        # Prompt automatic download
        display(FileLink(zip_filename))

print("\nFine-tuning completed.")
print(f"Best Validation MAP@3 achieved: {best_val_map3:.4f}")

Using device: cuda

Epoch 1/3
------------------------------


Training:  62%|██████▏   | 2281/3669 [12:57<07:51,  2.94it/s]

In [None]:
# Inference finetuned model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load mappings
label_to_original = joblib.load('label_to_original.pkl')
question_id_to_encoded = joblib.load('question_id_to_encoded.pkl')
num_classes = len(label_to_original)
print(f"Number of classes: {num_classes}")

# Prepare test dataframe (create is_correct & text, and ENCODE QuestionId)
test_df = test_df_original.copy()

# try:
    test_df = test_df.merge(tmp, on=['QuestionId', 'MC_Answer'], how='left')
    test_df['is_correct'] = test_df['is_correct'].fillna(0.0)
    print("Merged with `tmp` to get is_correct.")
except NameError:
    print("Warning: `tmp` not found. Setting `is_correct` = 0 for all samples.")
    test_df['is_correct'] = 0.0

test_df['text_test'] = (
    test_df['QuestionId'].astype(str) + " " +
    test_df['QuestionText'].astype(str) + ' ' +
    test_df['MC_Answer'].astype(str) + ' ' +
    test_df['StudentExplanation'].astype(str)
)
# Apply preprocessing to text_test feature
test_df['text_test'] = test_df.apply(preprocess_text, axis=1)

# Encode QuestionId for test (unseen -> 0)
test_df['QuestionId_encoded'] = test_df['QuestionId'].map(lambda q: question_id_to_encoded.get(q, 0)).astype(int)

class InferenceDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len=256):
        self.texts = dataframe['text_test'].values
        self.question_ids = dataframe['QuestionId_encoded'].values
        self.is_corrects = dataframe['is_correct'].values
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        qid = int(self.question_ids[idx])
        ic = float(self.is_corrects[idx])
        enc = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': enc['input_ids'].squeeze(0),
            'attention_mask': enc['attention_mask'].squeeze(0),
            'question_id': torch.tensor(qid, dtype=torch.long),
            'is_correct': torch.tensor(ic, dtype=torch.float)
        }

inference_dataset = InferenceDataset(test_df, tokenizer)
inference_dataloader = DataLoader(inference_dataset, batch_size=8, shuffle=False, num_workers=2, pin_memory=True)

# Reload model for inference with the SAME shapes
loaded_base_scibert_model = AutoModel.from_pretrained('allenai/scibert_scivocab_uncased', config=AutoConfig.from_pretrained('allenai/scibert_scivocab_uncased'))
model_inference = CustomSciBERTModel(
    scibert_model=loaded_base_scibert_model,
    num_classes=num_classes,
    num_question_ids=num_question_ids,
)
model_inference.load_state_dict(torch.load('./SciBERT_math_misconception_custom.pth', map_location=device))
model_inference.to(device)
model_inference.eval()
print('Fine-tuned SciBERT model loaded successfully for inference.')

@torch.no_grad()
def predict_with_scibert(model, dataloader, device):
    model.eval()
    all_probs = []
    for batch in tqdm(dataloader, desc='Predicting'):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        question_id = batch['question_id'].to(device)
        is_correct = batch['is_correct'].to(device)
        logits = model(input_ids=input_ids, attention_mask=attention_mask,
                       question_id=question_id, is_correct=is_correct)
        probs = torch.softmax(logits, dim=1)
        all_probs.append(probs.detach().cpu().numpy())
    return np.concatenate(all_probs, axis=0)

pred_proba = predict_with_scibert(model_inference, inference_dataloader, device)

# Decode predictions to top-3 labels
submission_strings = []
for prob in pred_proba:
    top_3_idx = np.argsort(prob)[-3:][::-1]
    top_3_labels = [label_to_original[i] for i in top_3_idx]
    submission_strings.append(' '.join(top_3_labels))

submission = pd.DataFrame({
    'row_id': test_df['row_id'],
    'Category:Misconception': submission_strings
})

submission.to_csv('submission.csv', index=False)
print('Saved submission.csv')

## Make Predictions on Test Set Using Finetuned Model

In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Define paths 
FINE_TUNED_DIR = "/kaggle/input/scibert_math_misconception/transformers/default/1/scibert_math_misconception"
SCIBERT_BASE_DIR = "/kaggle/input/scibert-base-offline/transformers/default/1/scibert-base-offline"

# Validate paths
assert os.path.exists(FINE_TUNED_DIR), f"Fine-tuned model dir not found: {FINE_TUNED_DIR}"
assert os.path.exists(SCIBERT_BASE_DIR), f"SciBERT-base dir not found: {SCIBERT_BASE_DIR}"

# Define the model architecture
class CustomSciBERTModel(nn.Module):
    def __init__(self, scibert_model, num_classes, num_question_ids, question_id_embedding_dim=64):
        super(CustomSciBERTModel, self).__init__()
        self.scibert = scibert_model
        self.question_id_embedding = nn.Embedding(num_question_ids, question_id_embedding_dim, padding_idx=0)
        self.is_correct_layer = nn.Linear(1, 16)
        self.classifier = nn.Sequential(
            nn.Linear(self.scibert.config.hidden_size + question_id_embedding_dim + 16, 256),
            nn.ReLU(),
            nn.Dropout(self.scibert.config.hidden_dropout_prob),
            nn.Linear(256, num_classes)
        )

    def forward(self, input_ids, attention_mask, question_id, is_correct):
        # SciBERT outputs
        scibert_output = self.scibert(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        
        # Embed question_id
        question_id_emb = self.question_id_embedding(question_id)
        
        # Embed is_correct (scalar)
        is_correct_emb = self.is_correct_layer(is_correct.unsqueeze(1))
        
        # Concatenate all features
        combined_features = torch.cat((scibert_output, question_id_emb, is_correct_emb), dim=1)
        
        # Final classification
        logits = self.classifier(combined_features)
        return logits

In [None]:
# Load mappings
label_to_original = joblib.load(os.path.join(FINE_TUNED_DIR, 'label_to_original.pkl'))
question_id_to_encoded = joblib.load(os.path.join(FINE_TUNED_DIR, 'question_id_to_encoded.pkl'))

num_classes = len(label_to_original)
num_question_ids = len(question_id_to_encoded)  # includes padding index

print(f"Number of classes: {num_classes}")
print(f"Number of unique Question IDs: {num_question_ids}")

# Load tokenizer from fine-tuned model directory
tokenizer = AutoTokenizer.from_pretrained("/kaggle/input/scibert_math_misconception/transformers/default/1/scibert_math_misconception/tokenizer")
print("Tokenizer loaded from fine-tuned SciBERT model directory.")

# Prepare test data
test_df_original = pd.read_csv('/kaggle/input/map-charting-student-math-misunderstandings/test.csv')
test_df = test_df_original.copy()

try:
    test_df = test_df.merge(tmp, on=['QuestionId', 'MC_Answer'], how='left')
    test_df['is_correct'] = test_df['is_correct'].fillna(0.0)
    print("Merged with `tmp` to get is_correct.")
except NameError:
    print("Warning: `tmp` not found. Setting `is_correct` = 0 for all samples.")
    test_df['is_correct'] = 0.0

# Create input text
test_df['text_test'] = (
    test_df['QuestionId'].astype(str) + " " +
    test_df['QuestionText'].astype(str) + ' ' +
    test_df['MC_Answer'].astype(str) + ' ' +
    test_df['StudentExplanation'].astype(str)
)

# Apply preprocessing to text_test feature
test_df['text_test'] = test_df.apply(preprocess_text, axis=1)

# Encode QuestionId using saved mapping (unseen → 0)
test_df['QuestionId_encoded'] = test_df['QuestionId'].map(
    lambda q: question_id_to_encoded.get(q, 0)
).astype(int)

In [None]:
# Inference Dataset
class InferenceDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len=256):
        self.texts = dataframe['text_test'].values
        self.question_ids = dataframe['QuestionId_encoded'].values
        self.is_corrects = dataframe['is_correct'].values
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        qid = int(self.question_ids[idx])
        ic = float(self.is_corrects[idx])
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'question_id': torch.tensor(qid, dtype=torch.long),
            'is_correct': torch.tensor(ic, dtype=torch.float)
        }

# Create DataLoader
inference_dataset = InferenceDataset(test_df, tokenizer)
inference_dataloader = DataLoader(
    inference_dataset,
    batch_size=8,
    shuffle=False,
    num_workers=2,
    pin_memory=True)

In [None]:
# Load base SciBERT model FROM LOCAL OFFLINE FILES
print("Loading SciBERT-base from local offline directory...")
base_scibert_model = AutoModel.from_pretrained(SCIBERT_BASE_DIR)
print("Base SciBERT model loaded successfully (offline).")

# Instantiate and load custom model
model_inference = CustomSciBERTModel(
    scibert_model=base_scibert_model,
    num_classes=num_classes,
    num_question_ids=num_question_ids
)

# Load your fine-tuned weights
model_weights_path = os.path.join(FINE_TUNED_DIR, 'model_weights.pth')
print(f"Loading fine-tuned weights from: {model_weights_path}")

model_inference.load_state_dict(torch.load(model_weights_path, map_location=device))
model_inference.to(device)
model_inference.eval()
print('Fine-tuned SciBERT model loaded successfully for inference.')

In [None]:
# Prediction function
@torch.no_grad()
def predict_with_scibert(model, dataloader, device):
    model.eval()
    all_probs = []
    for batch in tqdm(dataloader, desc="Predicting"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        question_id = batch['question_id'].to(device)
        is_correct = batch['is_correct'].to(device)
        
        logits = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            question_id=question_id,
            is_correct=is_correct
        )
        probs = torch.softmax(logits, dim=1)
        all_probs.append(probs.cpu().numpy())
    return np.concatenate(all_probs, axis=0)

# Run inference
pred_proba = predict_with_scibert(model_inference, inference_dataloader, device)

# Generate submission
submission_strings = []
for prob in pred_proba:
    top_3_idx = np.argsort(prob)[-3:][::-1]
    top_3_labels = [label_to_original[i] for i in top_3_idx]
    submission_strings.append(" ".join(top_3_labels))

submission = pd.DataFrame({
    'row_id': test_df['row_id'],
    'Category:Misconception': submission_strings
})

submission.to_csv('submission.csv', index=False)
print('Saved submission.csv')

In [None]:
submission.head()