In [None]:
!pip -q install transformers datasets accelerate
import os
from pathlib import Path

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

from datasets import Dataset
from sklearn.metrics import accuracy_score, f1_score, classification_report

from transformers import AutoTokenizer, AutoModel

from google.colab import drive, files


drive.mount("/content/drive", force_remount=False)
CODEBERT_DIR = "/content/drive/MyDrive/semeval_task13_models/task_b/codebert_v1"


In [None]:
ROOT = Path("/content/semeval_task13")
if (ROOT / "SemEval-2026-Task13").exists():
    BASE_DIR = ROOT / "SemEval-2026-Task13"
else:
    BASE_DIR = ROOT

TASK_B_DIR = BASE_DIR / "task_b"
print("TASK_B_DIR:", TASK_B_DIR)

train_path = TASK_B_DIR / "task_b_training_set.parquet"
val_path   = TASK_B_DIR / "task_b_validation_set.parquet"
test_path  = "test.parquet"

df_train = pd.read_parquet(train_path)
df_val   = pd.read_parquet(val_path)
df_test  = pd.read_parquet(test_path)

print("Train:", df_train.shape)
print("Val  :", df_val.shape)
print("Test :", df_test.shape)
print("Train columns:", df_train.columns.tolist())

#Labels 0..10 for Subtask B
df_train = df_train.copy()
df_val   = df_val.copy()
df_test  = df_test.copy()

df_train.rename(columns={"label": "labels"}, inplace=True)
df_val.rename(columns={"label": "labels"}, inplace=True)

y_train = df_train["labels"].astype(int).values
y_val   = df_val["labels"].astype(int).values


#Tokenizer & HF Datasets from the fine-tuned CodeBERT dir
tokenizer = AutoTokenizer.from_pretrained(CODEBERT_DIR, use_fast=True)
MAX_LENGTH = 256

def make_hf_datasets(df_train_local, df_val_local, df_test_local, max_length=256):
    def tokenize_batch(batch):
        return tokenizer(
            batch["code"],
            padding="max_length",
            truncation=True,
            max_length=max_length,
        )

    train_ds = Dataset.from_pandas(df_train_local[["code", "labels"]])
    val_ds   = Dataset.from_pandas(df_val_local[["code", "labels"]])
    test_ds  = Dataset.from_pandas(df_test_local[["code"]])

    train_tok = train_ds.map(tokenize_batch, batched=True, remove_columns=["code"])
    val_tok   = val_ds.map(tokenize_batch,   batched=True, remove_columns=["code"])
    test_tok  = test_ds.map(tokenize_batch,  batched=True, remove_columns=["code"])

    train_tok.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
    val_tok.set_format(type="torch",   columns=["input_ids", "attention_mask", "labels"])
    test_tok.set_format(type="torch",  columns=["input_ids", "attention_mask"])

    return train_tok, val_tok, test_tok

train_tok, val_tok, test_tok = make_hf_datasets(df_train, df_val, df_test, max_length=MAX_LENGTH)
print("Tokenized train size:", len(train_tok))
print("Tokenized val size  :", len(val_tok))
print("Tokenized test size :", len(test_tok))


#Model: Fine-tuned CodeBERT encoder (frozen) + BiLSTM + Linear
class CodeBertBiLSTMClassifier(nn.Module):
    def __init__(self, load_dir, num_labels, lstm_hidden_size=256, freeze_codebert=True):
        super().__init__()
        #load encoder weights from the fine-tuned CodeBERT dir
        self.codebert = AutoModel.from_pretrained(load_dir)
        hidden_size = self.codebert.config.hidden_size

        if freeze_codebert:
            for p in self.codebert.parameters():
                p.requires_grad = False

        self.bilstm = nn.LSTM(
            input_size=hidden_size,
            hidden_size=lstm_hidden_size,
            num_layers=1,
            batch_first=True,
            bidirectional=True,
        )
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(2 * lstm_hidden_size, num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        #CodeBERT encoder without classification head
        outputs = self.codebert(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state #(B, L, H)

        #BiLSTM
        lstm_out, (h_n, c_n) = self.bilstm(last_hidden_state)
        # h_n: (num_directions=2, B, hidden)
        h_forward  = h_n[0]  #(B, hidden)
        h_backward = h_n[1]  #(B, hidden)
        h_cat = torch.cat([h_forward, h_backward], dim=-1) #(B, 2*hidden)

        x = self.dropout(h_cat)
        logits = self.classifier(x) #(B, num_labels)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits, labels)

        return logits, loss

#Dataloading
batch_size = 8

train_loader = DataLoader(train_tok, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_tok,   batch_size=batch_size, shuffle=False)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

num_labels = 11

model = CodeBertBiLSTMClassifier(
    load_dir=CODEBERT_DIR,
    num_labels=num_labels,
    lstm_hidden_size=256,
    freeze_codebert=True,
)
model.to(device)

optimizer = torch.optim.AdamW(
    [p for p in model.parameters() if p.requires_grad],
    lr=2e-4,
)

num_epochs = 2

#evaluation function
def eval_on_val():
    model.eval()
    all_logits = []
    all_labels = []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attn      = batch["attention_mask"].to(device)
            labels    = batch["labels"].to(device)

            logits, loss = model(input_ids=input_ids, attention_mask=attn, labels=labels)
            all_logits.append(logits.cpu().numpy())
            all_labels.append(labels.cpu().numpy())

    all_logits = np.concatenate(all_logits, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)
    preds = all_logits.argmax(axis=1)

    acc = accuracy_score(all_labels, preds)
    f1  = f1_score(all_labels, preds, average="macro")

    print(f"VAL Accuracy: {acc:.4f} | Macro-F1: {f1:.4f}")
    print("\nClassification report (val):")
    print(classification_report(all_labels, preds, digits=3))

    return acc, f1

#training loop
for epoch in range(1, num_epochs + 1):
    print(f"\nEpoch {epoch}/{num_epochs}")
    model.train()
    total_loss = 0.0

    for step, batch in enumerate(train_loader):
        input_ids = batch["input_ids"].to(device)
        attn      = batch["attention_mask"].to(device)
        labels    = batch["labels"].to(device)

        optimizer.zero_grad()
        logits, loss = model(input_ids=input_ids, attention_mask=attn, labels=labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        if (step + 1) % 200 == 0:
            avg = total_loss / (step + 1)
            print(f"  step {step+1}, train loss: {avg:.4f}")

    avg_epoch_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch} finished, avg train loss: {avg_epoch_loss:.4f}")
    eval_on_val()




In [None]:
#test prediction
test_loader = DataLoader(test_tok, batch_size=batch_size, shuffle=False)

model.eval()
all_test_logits = []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attn      = batch["attention_mask"].to(device)

        logits, _ = model(input_ids=input_ids, attention_mask=attn, labels=None)
        all_test_logits.append(logits.cpu().numpy())

all_test_logits = np.concatenate(all_test_logits, axis=0)
test_preds = all_test_logits.argmax(axis=1).astype(int)

print("Num test preds:", len(test_preds))
print("First 10 labels:", test_preds[:10])

print("\nupload Task B sample_submission.csv")


sample_filename = "sample_submission_b.csv"
sample_sub = pd.read_csv(sample_filename)
print("Sample submission shape:", sample_sub.shape)
print(sample_sub.head())

if len(sample_sub) != len(test_preds):
    print("Length mismatch:", len(sample_sub), "vs", len(test_preds))
else:
    if "label" in sample_sub.columns:
        label_col = "label"
    else:
        label_col = sample_sub.columns[1]

    sample_sub[label_col] = test_preds
    print("\nSubmission preview (CodeBERT+BiLSTM):")
    print(sample_sub.head())

    out_name = "subtask_b_codebert_bilstm.csv"
    sample_sub.to_csv(out_name, index=False)
    print(f"\nsaved submission file: {out_name}")