In [1]:
import pandas as pd
import numpy as np
import torch
import pickle
import os
import json
import gc
from torch.distributions import Bernoulli
from torch.optim import LBFGS
from tqdm import tqdm
from scipy.stats import pearsonr
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import multiprocessing as mp

import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from tueplots import bundles
bundles.icml2024()

from torchmetrics import AUROC
auroc = AUROC(task="binary")

import warnings
warnings.filterwarnings("ignore")

torch.manual_seed(0)

device = "cuda:0" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

def visualize_response_matrix(results, value, filename):
    # Extract the groups labels in the order of the columns
    group_values = results.columns.get_level_values("scenario")

    # Identify the boundaries where the group changes
    boundaries = []
    for i in range(1, len(group_values)):
        if group_values[i] != group_values[i - 1]:
            boundaries.append(i - 0.5)  # using 0.5 to place the line between columns

    # Visualize the results with a matrix: red is 0, white is -1 and blue is 1
    cmap = mcolors.ListedColormap(["white", "red", "blue"])
    bounds = [-1.5, -0.5, 0.5, 1.5]
    norm = mcolors.BoundaryNorm(bounds, cmap.N)

    # Calculate midpoints for each group label
    groups_list = list(group_values)
    group_names = []
    group_midpoints = []
    current_group = groups_list[0]
    start_index = 0
    for i, grp in enumerate(groups_list):
        if grp != current_group:
            midpoint = (start_index + i - 1) / 2.0
            group_names.append(current_group)
            group_midpoints.append(midpoint)
            current_group = grp
            start_index = i
    # Add the last group
    midpoint = (start_index + len(groups_list) - 1) / 2.0
    group_names.append(current_group)
    group_midpoints.append(midpoint)

    # Define the minimum spacing between labels (e.g., 100 units)
    min_spacing = 100
    last_label_pos = -float("inf")
    # Plot the matrix
    with plt.rc_context(bundles.icml2024(usetex=True, family="serif")):
        fig, ax = plt.subplots(figsize=(20, 10))
        cax = ax.matshow(value, aspect="auto", cmap=cmap, norm=norm)

        # Add vertical lines at each boundary
        for b in boundaries:
            ax.axvline(x=b, color="black", linewidth=0.25, linestyle="--", alpha=0.5)
        
        # Add group labels above the matrix, only if they're spaced enough apart
        for name, pos in zip(group_names, group_midpoints):
            if pos - last_label_pos >= min_spacing:
                ax.text(pos, -5, name, ha='center', va='bottom', rotation=90, fontsize=3)
                last_label_pos = pos

        # Add model labels on the y-axis
        ax.set_yticks(range(len(results.index)))
        ax.set_yticklabels(results.index, fontsize=3)

        # Add a colorbar
        cbar = plt.colorbar(cax)
        cbar.set_ticks([-1, 0, 1])
        cbar.set_ticklabels(["-1", "0", "1"])
        plt.savefig(filename, dpi=600, bbox_inches="tight")
        plt.close()

def trainer(parameters, optim, closure, n_iter=100, verbose=True):
    pbar = tqdm(range(n_iter)) if verbose else range(n_iter)
    for iteration in pbar:
        if iteration > 0:
            previous_parameters = [p.clone() for p in parameters]
            previous_loss = loss.clone()
        
        loss = optim.step(closure)
        
        if iteration > 0:
            d_loss = (previous_loss - loss).item()
            d_parameters = sum(
                torch.norm(prev - curr, p=2).item()
                for prev, curr in zip(previous_parameters, parameters)
            )
            grad_norm = sum(torch.norm(p.grad, p=2).item() for p in parameters if p.grad is not None)
            if verbose:
                pbar.set_postfix({"grad_norm": grad_norm, "d_parameter": d_parameters, "d_loss": d_loss})
            
            if d_loss < 1e-5 and d_parameters < 1e-5 and grad_norm < 1e-5:
                break
    return parameters

def compute_auc(probs, data, train_idtor, test_idtor):
    train_probs = probs[train_idtor.bool()]
    test_probs = probs[test_idtor.bool()]
    train_labels = data[train_idtor.bool()]
    test_labels = data[test_idtor.bool()]
    train_auc = auroc(train_probs, train_labels)
    test_auc = auroc(test_probs, test_labels)
    print(f"train auc: {train_auc}")
    print(f"test auc: {test_auc}")
    
    return train_auc, test_auc

def compute_cttcorr(probs, data, train_idtor, test_idtor):
    train_probs  = probs.clone()
    test_probs   = probs.clone()
    train_labels = data.clone()
    test_labels  = data.clone()

    train_mask = ~train_idtor.bool()
    train_probs[train_mask]  = float('nan')
    train_labels[train_mask] = float('nan')

    test_mask = ~test_idtor.bool()
    test_probs[test_mask]   = float('nan')
    test_labels[test_mask]  = float('nan')
    
    train_prob_ctt = torch.nanmean(train_probs, dim=1).detach().cpu().numpy()
    train_label_ctt = torch.nanmean(train_labels, dim=1).detach().cpu().numpy()
    train_mask = ~np.isnan(train_prob_ctt) & ~np.isnan(train_label_ctt)
    train_cttcorr = pearsonr(train_prob_ctt[train_mask], train_label_ctt[train_mask]).statistic
    
    test_prob_ctt = torch.nanmean(test_probs, dim=1).detach().cpu().numpy()
    test_label_ctt = torch.nanmean(test_labels, dim=1).detach().cpu().numpy()
    test_mask = ~np.isnan(test_prob_ctt) & ~np.isnan(test_label_ctt)
    test_cttcorr = pearsonr(test_prob_ctt[test_mask], test_label_ctt[test_mask]).statistic
    
    print(f"train cttcorr: {train_cttcorr}")
    print(f"test cttcorr: {test_cttcorr}")

    return train_cttcorr, test_cttcorr

In [3]:
with open(f"../data/resmat.pkl", "rb") as f:
    results = pickle.load(f)

dtype = torch.float64 if device.startswith("cuda") else torch.float32

# data_withnan, missing=nan
# data_withneg1, missing=-1
# data_with0, missing=0
data_withnan = torch.tensor(results.values, dtype=dtype, device=device)
data_idtor = (~torch.isnan(data_withnan)).to(dtype)
data_withneg1 = data_withnan.nan_to_num(nan=-1.0)
data_with0 = data_withneg1 * data_idtor
data_with0 = data_with0.nan_to_num(nan=0.0)
n_test_takers, n_items = data_with0.shape
scenarios = results.columns.get_level_values("scenario").unique()

In [4]:
# data_idtor = train_idtor + test_idtor
# apply random train/test mask to the matrix, and ensure no one row or column is fully masked
valid_condition = False
trial = 0
while not valid_condition:
    train_idtor = torch.bernoulli(data_idtor * 0.8).int()
    test_idtor = data_idtor - train_idtor
    valid_condition = (train_idtor.sum(axis=1) != 0).all() and (train_idtor.sum(axis=0) != 0).all()
    print(f"trial {trial} valid condition: {valid_condition}")
    trial += 1

# fit z
B = 50000
optimized_zs = []
thetas_nuisance = torch.randn(150, n_test_takers, device=device)
for i in tqdm(range(0, n_items, B)):
    data_batch = data_with0[:, i:i+B]
    train_idtor_batch = train_idtor[:, i:i+B]
    current_B = data_batch.shape[1]
    z_i = torch.randn(current_B, requires_grad=True, device=device)
    optim_z_i = LBFGS([z_i], lr=0.1, max_iter=20, history_size=10, line_search_fn="strong_wolfe")
    def closure_z_i():
        optim_z_i.zero_grad()
        probs = torch.sigmoid(thetas_nuisance[:, :, None] + z_i[None, None, :])
        loss = -(Bernoulli(probs=probs).log_prob(data_batch)*train_idtor_batch).mean()
        loss.backward()
        return loss
    z_i_optimized = trainer([z_i], optim_z_i, closure_z_i)[0].detach()
    optimized_zs.append(z_i_optimized)
zs = torch.cat(optimized_zs)

# fit theta
thetas = torch.randn(n_test_takers, requires_grad=True, device=device)
optim_theta = LBFGS([thetas], lr=0.1, max_iter=20, history_size=10, line_search_fn="strong_wolfe")
def closure_theta():
    optim_theta.zero_grad()
    probs = torch.sigmoid(thetas[:, None] + zs[None, :])
    loss = -(Bernoulli(probs=probs).log_prob(data_with0)*train_idtor).mean()
    loss.backward()
    return loss
thetas = trainer([thetas], optim_theta, closure_theta)[0]

# calculate metrics
probs = torch.sigmoid(thetas[:, None] + zs[None, :])

train_auc, test_auc = compute_auc(probs, data_with0, train_idtor, test_idtor)
metric_results["combined_data"]["train_auc"] = train_auc.item()
metric_results["combined_data"]["test_auc"] = test_auc.item()

train_cttcorr, test_cttcorr = compute_cttcorr(probs, data_with0, train_idtor, test_idtor)
metric_results["combined_data"]["train_cttcorr"] = train_cttcorr.item()
metric_results["combined_data"]["test_cttcorr"] = test_cttcorr.item()

del optim_theta, thetas, z_i, thetas_nuisance, optim_z_i
gc.collect()
torch.cuda.empty_cache()

trial 0 valid condition: True


  0%|          | 0/100 [00:24<?, ?it/s]
  0%|          | 0/2 [00:25<?, ?it/s]


KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn

class NeuralIRT_1PL(nn.Module):
    def __init__(self, n_test_takers, n_items, n_dimensions, q_matrix):
        super().__init__()
        
        # Person embedding layer -> learns the multidimensional thetas
        self.person_embedding = nn.Embedding(n_test_takers, n_dimensions)
        
        # <<< REMOVED: The item_a_embedding layer is gone for the 1PL model >>>
        
        # Item difficulty embedding layer -> learns the 'b' parameters
        self.item_b_embedding = nn.Embedding(n_items, 1)
        
        # <<< NEW: Store the fixed Q-matrix as a non-trainable buffer >>>
        # This is the correct way to include fixed data in a PyTorch model.
        self.register_buffer('q_matrix', q_matrix)

    def forward(self, person_ids, item_ids):
        # Look up the vectors for the given persons and items
        theta = self.person_embedding(person_ids)
        b = self.item_b_embedding(item_ids).squeeze()
        
        # <<< NEW: Look up the Q-matrix vector for the given items >>>
        q_vector = self.q_matrix[item_ids]
        
        # --- The MIRT 1PL Formula ---
        # 1. Apply the Q-matrix to select the relevant theta dimensions
        effective_theta = theta * q_vector
        
        # 2. Sum the relevant abilities (this is the dot product with a=1)
        #    and subtract the item difficulty 'b'
        logits = torch.sum(effective_theta, dim=1) - b
        
        # Apply sigmoid to get the probability
        prob = torch.sigmoid(logits)
        
        return prob

In [None]:
# --- Build the Q-Matrix for your MASTER TEST ---
# Assume master_test_df is your (120, ...) DataFrame with the 'component' column
n_dimensions = 6 # Your MASTER TEST has 6 components

component_names = master_test_df['component'].unique()
Q_matrix_df = pd.get_dummies(master_test_df['component'])
Q_matrix_df = Q_matrix_df[component_names]
Q_matrix_tensor = torch.tensor(Q_matrix_df.values, device=device, dtype=torch.float32)

print(f"Shape of Q-Matrix: {Q_matrix_tensor.shape}") # Should be (120, 6)

# --- Prepare the "long format" training data (same as before) ---
# Assume 'final_test_data_matrix' (183, 120) and 'data_idtor' are available
# ... (code to create 'training_data' goes here) ...

In [None]:
from torch.utils.data import DataLoader, TensorDataset

# --- Setup ---
# <<< Instantiate the new 1PL model and pass the Q-matrix to it >>>
model = NeuralIRT_1PL(n_test_takers, n_items, n_dimensions, q_matrix=Q_matrix_tensor).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.BCELoss()

# Use a DataLoader for efficient batching (same as before)
dataset = TensorDataset(training_data[:, 0], training_data[:, 1], training_data[:, 2].float())
data_loader = DataLoader(dataset, batch_size=1024, shuffle=True)

# --- Training (this loop is exactly the same as before) ---
n_epochs = 5
model.train()
for epoch in range(n_epochs):
    for person_ids, item_ids, labels in tqdm(data_loader, desc=f"Epoch {epoch+1}"):
        person_ids, item_ids, labels = person_ids.to(device), item_ids.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(person_ids, item_ids)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

# --- Get your final multidimensional thetas ---
model.eval()
final_thetas_1pl = model.person_embedding.weight.detach().cpu().numpy()
print(f"\nShape of final 1PL thetas: {final_thetas_1pl.shape}") # Should be (183, 6)