In [None]:
#imports and load interaction

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, log_loss

df = pd.read_csv("../data/interactions.csv")
df = df.sort_values(["student_id", "time_step"]).reset_index(drop=True)
df.head()

DKT commonly uses "item" as the skill/question unit. Since the dataset already maps questions to concepts, we'll model questions as items.

In [None]:
# Encode Items

question_ids = sorted(df["question_id"].unique())
q2i = {q:i for i, q in enumerate(question_ids)}
num_questions = len(question_ids)
num_inputs = num_questions * 2  # (question, incorrect) or (question, correct)

# Helper to build the DKT input index

def encode_interaction(q_idx, correct):
    return q_idx + (num_questions if correct else 0)

We'll predict correctness at time t using history up to t-1. 

In [None]:
# Build sequences per student

student_groups = []
for sid, g in df.groupby("student_id"):
    g = g.sort_values("time_step")
    qs = [q2i[q] for q in g["question_id"].tolist()]
    cs = g["correct"].astype(int).tolist()
    student_groups.append((sid, qs, cs))

len(student_groups), student_groups[0][0], len(student_groups[0][1])


Train/test split by student

In [None]:
# prevents leakage (a must-have for credibility)

rng = np.random.RandomState(42)
idx = np.arange(len(student_groups))
rng.shuffle(idx)

split = int(0.8 * len(idx))
train_idx, test_idx = idx[:split], idx[split:]

train_students = [student_groups[i] for i in train_idx]
test_students = [student_groups[i] for i in test_idx]

len(train_students), len(test_students)

In [None]:
# Dataset and Padding collate
# sequences are padded in a batch and mask is used.

class DKTDataset(Dataset):
    def __init__(self, students):
        self.students = students

    def __len__(self):
        return len(self.students)

    def __getitem__(self, i):
        _, qs, cs = self.students[i]
        # X uses interactions up to t-1, y is correctness at t, target item is question at t
        x = [encode_interaction(qs[t-1], cs[t-1]) for t in range(1, len(qs))]
        target_q = [qs[t] for t in range(1, len(qs))]
        y = [cs[t] for t in range(1, len(qs))]
        return torch.tensor(x, dtype=torch.long), torch.tensor(target_q, dtype=torch.long), torch.tensor(y, dtype=torch.float32)

def collate_fn(batch):
    xs, tq, ys = zip(*batch)
    lengths = torch.tensor([len(x) for x in xs], dtype=torch.long)
    max_len = max(lengths).item()

    x_pad = torch.zeros(len(batch), max_len, dtype=torch.long)
    tq_pad = torch.zeros(len(batch), max_len, dtype=torch.long)
    y_pad = torch.zeros(len(batch), max_len, dtype=torch.float32)
    mask = torch.zeros(len(batch), max_len, dtype=torch.bool)

    for i in range(len(batch)):
        L = lengths[i].item()
        x_pad[i, :L] = xs[i]
        tq_pad[i, :L] = tq[i]
        y_pad[i, :L] = ys[i]
        mask[i, :L] = True

    return x_pad, tq_pad, y_pad, mask, lengths

# DataLoaders:

train_loader = DataLoader(DKTDataset(train_students), batch_size=8, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(DKTDataset(test_students), batch_size=8, shuffle=False, collate_fn=collate_fn)

In [None]:
# DKT model

class DKT(nn.Module):
    def __init__(self, num_inputs, num_questions, emb_dim=64, hidden_dim=128):
        super().__init__()
        self.embedding = nn.Embedding(num_inputs, emb_dim)
        self.lstm = nn.LSTM(input_size=emb_dim, hidden_size=hidden_dim, batch_first=True)
        self.out = nn.Linear(hidden_dim, num_questions)

    def forward(self, x, lengths):
        emb = self.embedding(x)
        packed = nn.utils.rnn.pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out, _ = self.lstm(packed)
        out, _ = nn.utils.rnn.pad_packed_sequence(packed_out, batch_first=True)
        logits = self.out(out)  # (B, T, num_questions)
        return logits

# initialize:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DKT(num_inputs=num_inputs, num_questions=num_questions).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.BCEWithLogitsLoss(reduction="none")


In [None]:
# train loop
# loss is computed only on valid (non-padded) timesteps.

def run_epoch(loader, train=True):
    model.train() if train else model.eval()

    all_probs = []
    all_labels = []

    total_loss = 0.0
    total_count = 0

    for x, tq, y, mask, lengths in loader:
        x, tq, y, mask, lengths = x.to(device), tq.to(device), y.to(device), mask.to(device), lengths.to(device)

        logits = model(x, lengths)  # (B,T,Q)
        # pick logits of the target question at each timestep
        target_logits = logits.gather(2, tq.unsqueeze(-1)).squeeze(-1)  # (B,T)

        loss_mat = criterion(target_logits, y)  # (B,T)
        loss = (loss_mat * mask.float()).sum() / mask.float().sum()

        if train:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        total_loss += loss.item() * mask.float().sum().item()
        total_count += mask.float().sum().item()

        probs = torch.sigmoid(target_logits)[mask].detach().cpu().numpy()
        labels = y[mask].detach().cpu().numpy()

        all_probs.extend(probs.tolist())
        all_labels.extend(labels.tolist())

    avg_loss = total_loss / total_count
    acc = accuracy_score(all_labels, [1 if p >= 0.5 else 0 for p in all_probs])
    ll = log_loss(all_labels, all_probs)

    return avg_loss, acc, ll

    # train for a few epochs:

    for epoch in range(1, 11):
    tr_loss, tr_acc, tr_ll = run_epoch(train_loader, train=True)
    te_loss, te_acc, te_ll = run_epoch(test_loader, train=False)
    print(f"Epoch {epoch:02d} | train acc={tr_acc:.3f} ll={tr_ll:.3f} | test acc={te_acc:.3f} ll={te_ll:.3f}")

