In [1]:
import os
import sys

notebook_dir = os.getcwd()
project_root_path = os.path.dirname(notebook_dir)
sys.path.insert(0, project_root_path)

from config import PROJECT_ROOT
import numpy as np

# Load and Transform Data

In [2]:
# C_train = np.load(os.path.join(PROJECT_ROOT, 'output', 'C_train_instance.npy'))
# C_hat_train = np.load(os.path.join(PROJECT_ROOT, 'output', 'C_hat_sigmoid_train_instance.npy'))
# one_hot_Y_train = np.load(os.path.join(PROJECT_ROOT, 'output', 'Y_train_instance.npy'))

# C_test = np.load(os.path.join(PROJECT_ROOT, 'output', 'C_test_instance.npy'))
# C_hat_test = np.load(os.path.join(PROJECT_ROOT, 'output', 'C_hat_sigmoid_test_instance.npy'))
# one_hot_Y_test = np.load(os.path.join(PROJECT_ROOT, 'output', 'Y_test_instance.npy'))

C_hat_train = np.load(os.path.join(PROJECT_ROOT, 'output', 'C_hat_sigmoid_train.npy'))
one_hot_Y_train = np.load(os.path.join(PROJECT_ROOT, 'output', 'Y_train.npy'))

C_hat_test = np.load(os.path.join(PROJECT_ROOT, 'output', 'C_hat_sigmoid_test.npy'))
one_hot_Y_test = np.load(os.path.join(PROJECT_ROOT, 'output', 'Y_test.npy'))

class_level_concepts = np.load(os.path.join(PROJECT_ROOT, 'output', 'class_level_concepts.npy'))

In [3]:
Y_train = np.argmax(one_hot_Y_train, axis=1)
Y_test = np.argmax(one_hot_Y_test, axis=1)

In [4]:
C_train = []
for y in Y_train:
    C_train.append(class_level_concepts[y])

C_train = np.array(C_train)

In [5]:
# C_hat_train.shape, one_hot_Y_train.shape, C_hat_test.shape, one_hot_Y_test.shape

In [6]:
# C_hat_train[C_hat_train < 0.1] = 0
# C_hat_test[C_hat_test < 0.1] = 0

# Classic Models

## Logistic Regression

In [7]:
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(max_iter=1000)
model.fit(C_hat_train, Y_train)
print(f"Logistic Regression Test accuracy: {model.score(C_hat_test, Y_test)}")

Logistic Regression Test accuracy: 0.664998274076631


## k-NN

In [8]:
from sklearn.neighbors import KNeighborsClassifier

model = KNeighborsClassifier()
model.fit(C_hat_train, Y_train)
print(f"k-NN Test accuracy: {model.score(C_hat_test, Y_test)}")

k-NN Test accuracy: 0.6467034863652054


## Decision Tree

In [9]:
from sklearn.tree import DecisionTreeClassifier

model = DecisionTreeClassifier()
model.fit(C_hat_train, Y_train)
print(f"Decision Tree Test accuracy: {model.score(C_hat_test, Y_test)}")

Decision Tree Test accuracy: 0.5957887469796341


## MLP

In [10]:
from sklearn.neural_network import MLPClassifier

mlp = MLPClassifier(hidden_layer_sizes=(512,256, 128), max_iter=1000)
mlp.fit(C_hat_train, Y_train)
print(f"MLP Test accuracy: {mlp.score(C_hat_test, Y_test)}")

MLP Test accuracy: 0.6487745944080083


# Accuracy Using Class-Level Concepts

In [11]:
# Function to find the closest concept vector and predict the label
def predict_nearest_concept(instance, reference_concepts, reference_labels):
    distances = np.sqrt(np.sum((reference_concepts - instance)**2, axis=1))
    min_idx = np.argmin(distances)
    return reference_labels[min_idx]

# Use C_train as reference concepts and evaluate on C_hat_test
correct_predictions = 0
total_predictions = len(C_hat_test)

for i, test_instance in enumerate(C_hat_test):
    predicted_label = predict_nearest_concept(test_instance, C_train, Y_train)
    true_label = Y_test[i]

    if predicted_label == true_label:
        correct_predictions += 1

# Calculate and print accuracy
accuracy = correct_predictions / total_predictions
print(f"\nOverall accuracy using concept-based nearest neighbor: {accuracy:.4f}")


Overall accuracy using concept-based nearest neighbor: 0.6124


# Prototype-Based Model


In [12]:
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

Using device: mps


In [14]:
# class PrototypeLearner(nn.Module):
#     def forward(self, C_hat, Y_true=None, lambda_bin=0.1, lambda_spars=0.01):
#         # Get continuous prototypes
#         prototypes = self.prototypes.weight
#         prototypes_sigmoid = torch.sigmoid(prototypes)  # Shape: [num_classes, num_concepts]

#         # Calculate absolute difference between concepts and prototypes
#         concept_distances = torch.abs(C_hat.unsqueeze(1) - prototypes_sigmoid)  # Shape: [batch_size, num_classes, num_concepts]
#         # Sum distances across concept dimension
#         label_distances = concept_distances.sum(dim=2)  # Shape: [batch_size, num_classes]
#         pred_label = label_distances.argmin(dim=1)  # Shape: [batch_size]

#         # Classification loss - using the distances for labeled classes
#         loss_class = torch.mean(torch.sum(label_distances * Y_true, dim=1))

#         # Binarization loss - encourages prototypes to be binary (0 or 1)
#         loss_bin = torch.mean(prototypes_sigmoid * (1 - prototypes_sigmoid))

#         # Sparsity loss - encourages fewer active concepts
#         loss_spars = torch.mean(torch.abs(prototypes_sigmoid))

#         # Combine losses
#         total_loss = loss_class + (lambda_bin * loss_bin) + (lambda_spars * loss_spars)

#         return pred_label, total_loss

#     def get_binary_prototypes(self):
#         with torch.no_grad():
#             binary_prototypes = (torch.sigmoid(self.prototypes.weight) > 0.5).float()
#         return binary_prototypes

#     def get_sigmoid_prototypes(self):
#         return torch.sigmoid(self.prototypes.weight)

In [35]:
class PrototypeClassifier(nn.Module):
    def __init__(self, num_features, num_classes):
        super().__init__()
        self.protoypes = nn.Parameter(torch.rand(num_classes, num_features))  # initialize the protptype matrix P

    def forward(self, x):
        # x: (batch_size, num_features)
        # L1distance：|x_i - M_m|_1
        # (batch_size, num_classes, num_features)
        dist = torch.abs(x.unsqueeze(1) - torch.sigmoid(self.protoypes))
        dist = dist.sum(dim=2)
        return dist  # (batch_size, num_classes)

    def binary_regularization(self):
        sigmoid_protos = torch.sigmoid(self.protoypes)
        return (sigmoid_protos * (1 - sigmoid_protos)).mean()

    def sparsity_regularization(self):
        return torch.sum(torch.sigmoid(self.protoypes))

    def predict(self, x):
        with torch.no_grad():
            Prototypes = torch.sigmoid(self.protoypes)
            Prototypes[Prototypes>=0.5] = 1
            Prototypes[Prototypes<0.5]= 0
            dists = torch.abs(x.unsqueeze(1) - Prototypes)
            dists = dists.sum(dim=2)
            predictions = dists.argmin(dim=1)
        return predictions

    def get_sigmoid_prototypes(self):
        return torch.sigmoid(self.protoypes)

    def concept_wise_dist(self, x):
        with torch.no_grad():
            Prototypes = torch.sigmoid(self.protoypes)
            Prototypes[Prototypes>=0.5] = 1
            Prototypes[Prototypes<0.5]= 0
            dists = x.unsqueeze(1) - Prototypes
            # predictions = self.predict(x)
            # dists = dists[torch.arange(x.shape[0]), predictions,:]
        return dists

    def threshold(self, val_x, val_y):
        pass

    def conformal_predict(self, x):
        pass

    def explanation(self, x):
        pass

### Helper Functions

In [36]:
def calculate_loss(model, distances, y_true, lambda_binary, lambda_L1):
    # label loss
    loss_cls = (distances*y_true).sum(axis=1)
    loss_cls = loss_cls.mean()

    # regularization loss
    loss_binary = model.binary_regularization()
    loss_sparsity = model.sparsity_regularization()

    # total loss
    loss = loss_cls + (lambda_binary * loss_binary) + (lambda_L1 * loss_sparsity)

    return loss

## Epoch Functions

In [37]:
def train_epoch(model, train_dataloader, optimizer, lambda_binary, lambda_L1, device='cpu'):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0

    for x_batch, y_batch in train_dataloader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)

        distances = model(x_batch)

        # total loss
        loss = calculate_loss(model, distances, y_batch, lambda_binary, lambda_L1)

        # back propagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # accumulative training loss
        total_loss += loss.item()

        # calculate the prediction accuracy
        predicted = distances.argmin(dim=1)
        real_labels = y_batch.argmax(dim=1)
        correct += (predicted == real_labels).sum().item()
        total += x_batch.size(0)

    avg_loss = total_loss / len(train_dataloader)
    accuracy = correct / total * 100
    return avg_loss, accuracy

In [38]:
# test function
def val_epoch(model, val_dataloader):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for x_batch, y_batch in val_dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)

            dist = model(x_batch)

            predicted = dist.argmin(dim=1)
            real_labels = y_batch.argmax(dim=1)
            correct += (predicted == real_labels).sum().item()
            total += x_batch.size(0)

    accuracy = correct / total * 100
    return accuracy

## Create Dataloaders

In [39]:
# # --- Training and Validation Setup ---
# C_hat_full_tensor = torch.tensor(C_hat_train, dtype=torch.float32)
# Y_full_tensor = torch.tensor(one_hot_Y_train, dtype=torch.long)

# # Split data into training and validation
# val_split_ratio = 0.2
# random_seed = 42

# C_hat_train_tensor, C_hat_val_tensor, Y_train_tensor, Y_val_tensor = train_test_split(
#     C_hat_full_tensor, Y_full_tensor,
#     test_size=val_split_ratio,
#     random_state=random_seed,
#     stratify=Y_full_tensor
# )

# # 2. Create DataLoaders
# batch_size = 64
# train_dataset = TensorDataset(C_hat_train_tensor, Y_train_tensor)
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# val_dataset = TensorDataset(C_hat_val_tensor, Y_val_tensor)
# val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [40]:
C_hat_train, C_hat_val, Y_train, Y_val = train_test_split(C_hat_train, one_hot_Y_train, test_size=0.2, random_state=None)

X_train = torch.tensor(C_hat_train, dtype=torch.float32)
Y_train = torch.tensor(Y_train, dtype=torch.float32)
train_dataset = TensorDataset(X_train, Y_train)
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

X_val = torch.tensor(C_hat_val, dtype=torch.float32)
Y_val = torch.tensor(Y_val, dtype=torch.float32)
val_dataset = TensorDataset(X_val, Y_val)
batch_size = 64
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

X_test = torch.tensor(C_hat_test, dtype=torch.float32, device=device)
Y_test = torch.tensor(one_hot_Y_test, dtype=torch.float32, device=device)
test_dataset = TensorDataset(X_test, Y_test)
batch_size = 64
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

ValueError: Found input variables with inconsistent numbers of samples: [4795, 5994]

## Learn Prototypes

In [41]:
num_concepts = 112
num_classes = 200
model = PrototypeClassifier(num_concepts, num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
lambda_binary = 0.01
lambda_L1 = 0.001

In [42]:
# train and test
num_epochs = 200
for epoch in range(num_epochs):
    train_loss, train_accuracy = train_epoch(model, train_loader, optimizer, lambda_binary, lambda_L1, device=device)
    val_accuracy = val_epoch(model, val_loader)
    if((epoch+1)%10==0):
        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Validation Accuracy: {val_accuracy:.2f}%")

Epoch [10/200]
Train Loss: 65.5953, Train Accuracy: 76.12%, Validation Accuracy: 77.90%
Epoch [20/200]
Train Loss: 53.6675, Train Accuracy: 88.86%, Validation Accuracy: 89.66%
Epoch [30/200]
Train Loss: 43.0676, Train Accuracy: 91.51%, Validation Accuracy: 91.66%
Epoch [40/200]
Train Loss: 34.4253, Train Accuracy: 93.28%, Validation Accuracy: 93.33%
Epoch [50/200]
Train Loss: 27.7046, Train Accuracy: 93.91%, Validation Accuracy: 93.83%
Epoch [60/200]
Train Loss: 22.5938, Train Accuracy: 94.08%, Validation Accuracy: 93.83%
Epoch [70/200]
Train Loss: 18.7308, Train Accuracy: 94.10%, Validation Accuracy: 93.99%
Epoch [80/200]
Train Loss: 15.8056, Train Accuracy: 94.12%, Validation Accuracy: 93.99%
Epoch [90/200]
Train Loss: 13.5815, Train Accuracy: 94.10%, Validation Accuracy: 94.16%
Epoch [100/200]
Train Loss: 11.8813, Train Accuracy: 94.10%, Validation Accuracy: 94.08%
Epoch [110/200]
Train Loss: 10.5754, Train Accuracy: 94.04%, Validation Accuracy: 94.16%
Epoch [120/200]
Train Loss: 9.

In [43]:
real_labels = Y_test.argmax(dim=1)
predictions = model.predict(X_test)
(predictions == real_labels).sum().item()/len(predictions)

0.6579219882637211

# MY OLD CODE

In [44]:
close_to_zero = (torch.sum((model.get_sigmoid_prototypes() < 0.1) | (model.get_sigmoid_prototypes() > 0.9)) / (200*112)).cpu().numpy()
print(f"{close_to_zero*100}% of the values are close to 0 or 1")

99.88838958740234% of the values are close to 0 or 1


In [None]:
# # --- Plotting ---
# from matplotlib import pyplot as plt

# plt.figure(figsize=(10, 5))
# epochs_range = range(1, epochs + 1)
# plt.plot(epochs_range, train_losses, label='Training Loss', marker='o', linestyle='-')
# plt.plot(epochs_range, val_losses, label='Validation Loss', marker='x', linestyle='--')
# plt.title('Training and Validation Loss Over Epochs')
# plt.xlabel('Epoch')
# plt.ylabel('Average Loss')
# plt.legend()
# plt.grid(True)
# plt.show()

# # Optional: Plot validation accuracy as well
# plt.figure(figsize=(10, 5))
# plt.plot(epochs_range, val_accuracies, label='Validation Accuracy', marker='s', linestyle='-', color='green')
# plt.title('Validation Accuracy Over Epochs')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy (%)')
# plt.legend()
# plt.grid(True)
# plt.show()

In [27]:
# prototypes = []
# for y in Y_train:
#     prototypes.append(final_binary_prototypes[y])

# prototypes = np.array(prototypes)

In [28]:
# # Function to find the closest concept vector and predict the label
# def predict_nearest_concept(instance, reference_concepts, reference_labels):
#     distances = np.sum(np.abs(reference_concepts - instance), axis=1)
#     min_idx = np.argmin(distances)
#     return reference_labels[min_idx]

# # Use prototypes as reference concepts and evaluate on C_hat_test
# correct_predictions = 0
# total_predictions = len(C_hat_test)

# for i, test_instance in enumerate(C_hat_test):
#     predicted_label = predict_nearest_concept(test_instance, prototypes, Y_train)
#     true_label = Y_test[i]

#     if predicted_label == true_label:
#         correct_predictions += 1

# # Calculate and print accuracy
# accuracy = correct_predictions / total_predictions
# print(f"\nOverall accuracy using prototype-based nearest neighbor: {accuracy:.4f}")