In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torch.optim.lr_scheduler import CosineAnnealingLR
# from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F

from itertools import permutations 
from sklearn.metrics import multilabel_confusion_matrix
import seaborn as sns
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, precision_recall_fscore_support, hamming_loss, jaccard_score

import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd

import trimesh

from moduler import *

import sys
sys.path.append('/data/users2/yxiao11/mangoDB/wirehead')
from wirehead import WireheadGenerator
from wirehead import MongoTupleheadDataset, MongoheadDataset

torch.set_printoptions(sci_mode=False)


In [2]:
data_type = 'mixed'
# data_type = 'Pristine'
# data_type = 'Irradiated'

blur_dir = '/data/users2/yxiao11/model/satellite_project/database/' +data_type + '/blur_cube/'
# spectral_dir = '/data/users2/yxiao11/model/satellite_project/database/' +data_type + '/spectral_cube/'
label_dir = '/data/users2/yxiao11/model/satellite_project/database/' +data_type + '/label/'

blur_file = []
label_file = []
spectral_file = []
for i in range(len(os.listdir(blur_dir))):
    blur_file.append(blur_dir + f"{i}.npy")
    label_file.append(label_dir + f"{i}.npy")
#     spectral_file.append(spectral_dir + f"{i}.npy")


In [19]:

class get_dataset(Dataset):
    def __init__(self, data_dir, labels_dir):
        self.data_dir = data_dir
        self.labels_dir = labels_dir

    def __len__(self):
        return len(self.data_dir)

    def __getitem__(self, idx):
        
#         print(idx)
        ipt = torch.from_numpy(np.load(self.data_dir[idx]))
      
#         ipt = ipt.permute(2, 0, 1)
        
        label_index = torch.from_numpy(np.load(self.labels_dir[idx]))-1
        
        label = torch.tensor([0, 1]).repeat(19, 1)
        label[label_index] = torch.tensor([1, 0])
        
#         label = torch.zeros(19)
#         label[label_index] = 1
        
        return ipt.float(), label.float()
    



In [20]:
# Load dataset
my_dataset = get_dataset(blur_file, label_file)

# Define split ratio
train_size = int(0.8 * len(my_dataset))  # 80% training
test_size = len(my_dataset) - train_size  # 20% testing

# Randomly split dataset
train_dataset, test_dataset = random_split(my_dataset, [train_size, test_size])

# Print dataset sizes
print(f"Total samples: {len(my_dataset)}")
print(f"Training samples: {len(train_dataset)}")
print(f"Testing samples: {len(test_dataset)}")

Total samples: 1000
Training samples: 800
Testing samples: 200


In [21]:
# num_epochs = 100
batch_size = 100

# # Define DataLoaders for training and testing
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  # Shuffle training data
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)  # No shuffle for testing

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = CubeModel(52, 19).to(device)
# model = AlexNet(7).to(device)
# model = unet().to(device)
model = RNNFeatureExtractor().to(device)

# Define Loss Function and Optimizer
# criterion = torch.nn.CrossEntropyLoss()
criterion = torch.nn.MSELoss()
# criterion = torch.nn.BCEWithLogitsLoss()

optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=30, eta_min=0.000001)




In [22]:
ipt, y = next(iter(train_loader))
ipt = ipt.to(device)

In [23]:
ipt.shape

torch.Size([100, 52, 32, 32])

In [24]:
y.shape

torch.Size([100, 19, 2])

In [25]:
my_train_loss = []
my_test_loss = []
num_epochs=1000
for epoch in range(num_epochs):
    # Training Phase
    model.train()
    running_loss = 0.0

    for cube, labels in train_loader:
        cube, labels = cube.to(device), labels.to(device)

        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(cube)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    my_train_loss.append(avg_train_loss)
    
    # Testing Phase
    model.eval()
    test_loss = 0.0


    with torch.no_grad():  # Disable gradients for validation/testing
        for cube, labels in test_loader:
            cube, labels = cube.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(cube)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            

    avg_test_loss = test_loss / len(test_loader)
    my_test_loss.append(avg_test_loss)
    
    # Print results for this epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}")


Epoch 1/1000, Train Loss: 0.0504, Test Loss: 0.0000
Epoch 2/1000, Train Loss: 0.0000, Test Loss: 0.0000
Epoch 3/1000, Train Loss: 0.0000, Test Loss: 0.0000
Epoch 4/1000, Train Loss: 0.0000, Test Loss: 0.0000
Epoch 5/1000, Train Loss: 0.0000, Test Loss: 0.0000


KeyboardInterrupt: 

In [None]:
# my_train_loss = []
# my_test_loss = []

# for epoch in range(num_epochs):
#     # Training Phase
#     model.train()
#     running_loss = 0.0
#     for batch_idx, (cube, labels) in enumerate(train_loader):
#         cube, labels = cube.to(device), labels.to(device)
#         # Zero gradients
#         optimizer.zero_grad()
#         # Forward pass
#         outputs = model(cube)
#         loss = criterion(outputs, labels)
#         # Backward pass and optimization
#         loss.backward()
#         optimizer.step()

#         running_loss += loss.item()
        
#     avg_train_loss = running_loss / len(train_loader)
#     my_train_loss.append(avg_train_loss)

#     # Testing Phase
#     model.eval()
#     test_loss = 0.0

#     with torch.no_grad():
#         for batch_idx, (cube, labels) in enumerate(test_loader):
#             cube, labels = cube.to(device), labels.to(device)
            
#             # Forward pass
#             outputs = model(cube)
#             loss = criterion(outputs, labels)
#             test_loss += loss.item()

#     avg_test_loss = test_loss / len(test_loader)
#     my_test_loss.append(avg_test_loss)



#     # Print results for this epoch
#     print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}")



In [None]:
# np.save(f"/data/users2/yxiao11/model/satellite_project/resluts_n_model/{data_type}_train_loss.npy",
#         np.array(my_train_loss))
        
# np.save(f"/data/users2/yxiao11/model/satellite_project/resluts_n_model/{data_type}_test_loss.npy",
#         np.array(my_test_loss))

In [None]:
# torch.save(model, f"/data/users2/yxiao11/model/satellite_project/resluts_n_model/{data_type}.pth")
# print("Entire model saved!")

In [None]:
# sample_data, label = next(iter(test_loader))

In [None]:
# torch.onnx.export(
#     model.cpu(), sample_data, "cube_model.onnx",
#     input_names=["input"], output_names=["output"],
#     dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
#     opset_version=11
# )

# print("Model saved as cube_model.onnx")

In [None]:
model.eval()
label_list = []
preds_list = []

with torch.no_grad():
    for batch_idx, (cube, labels) in enumerate(test_loader):
        cube, labels = cube.to(device), labels.to(device)

        # Forward pass
        outputs = torch.round(model(cube)).cpu()
        labels = labels.cpu()
        
#         for i in range(batch_size):
#             label_list.append(torch.argmin(labels[0], dim=1).detach().numpy())
#             preds_list.append(torch.argmin(outputs[0], dim=1).detach().numpy())
            
        
        for i in range(batch_size):
            label_list.append(labels[i].detach().numpy())
            preds_list.append(outputs[i].detach().numpy())


In [None]:
# Function to smooth the loss curve using Exponential Moving Average (EMA)
def smooth_curve(data, weight=0.9):
    smoothed = []
    last = data[0]  # Initialize with the first value
    for point in data:
        last = weight * last + (1 - weight) * point  # EMA formula
        smoothed.append(last)
    return smoothed

# Smooth both training and test loss
smooth_train_loss = smooth_curve(my_train_loss, weight=0.9)
smooth_test_loss = smooth_curve(my_test_loss, weight=0.9)

# Create the figure
plt.figure(figsize=(20, 5))

# Plot original training loss (faint color)
plt.plot(my_train_loss, label="Train Loss (Original)", color="royalblue", alpha=0.3, linewidth=1)

# Plot smoothed training loss
plt.plot(smooth_train_loss, label="Train Loss (Smoothed)", color="royalblue", linewidth=2)

# Plot original test loss (faint color)
plt.plot(my_test_loss, label="Test Loss (Original)", color="darkorange", alpha=0.3, linewidth=1, linestyle="--")

# Plot smoothed test loss
plt.plot(smooth_test_loss, label="Test Loss (Smoothed)", color="darkorange", linewidth=2, linestyle="--")

# Improve visualization
plt.xlabel("Epochs", fontsize=14)
plt.ylabel("Loss", fontsize=14)
plt.title(f"{data_type}", fontsize=16, fontweight="bold")
plt.legend(fontsize=12)
plt.grid(True, linestyle="--", alpha=0.6)  # Add grid for better readability


# Show the plot
plt.show()

In [None]:

part_list = ['antenna', 'antenna', 'antenna',
            'body top', 'body top', 'body top',
             'body bottom', 'body bottom', 'body bottom',
             'lateral surface', 'lateral surface','lateral surface',
             'lateral surface','lateral surface','lateral surface',
             'Connectors', 'Panel', 'Panel', 'Panel'
            ]

# Stack predictions and true labels
y_true = np.vstack(label_list)  # True labels (multi-label)
y_pred = np.vstack(preds_list)  # Predicted labels

# Compute multi-label confusion matrices (one per class)
conf_matrices = multilabel_confusion_matrix(y_true, y_pred)

# 📌 **1️⃣ Better Confusion Matrix Layout (Grid instead of Single Row)**
num_classes = len(conf_matrices)  # Should be 19
cols = 5  # Set number of columns for grid
rows = (num_classes // cols) + (num_classes % cols > 0)  # Auto adjust rows

fig, axes = plt.subplots(rows, cols, figsize=(15, rows * 3))  # Dynamic grid size
axes = axes.flatten()  # Flatten grid for easy indexing

for i, cm in enumerate(conf_matrices):
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", ax=axes[i], cbar=False)
    axes[i].set_title(f"Material {i+1}\n{part_list[i]}", fontsize=15)
    axes[i].set_xlabel("Predicted", fontsize=10)
    axes[i].set_ylabel("Actual", fontsize=10)
    axes[i].tick_params(axis='both', labelsize=6)

# Hide unused subplots (in case 19 is not a perfect grid)
for j in range(i + 1, len(axes)):
    fig.delaxes(axes[j])

plt.tight_layout()
plt.show()

# Compute classification metrics
accuracy_per_class = np.mean(y_true == y_pred, axis=0)  # Per-class accuracy
hamming = hamming_loss(y_true, y_pred)  # Hamming loss
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average=None)  # Per-class metrics
macro_f1 = np.mean(f1)  # Macro F1-score
micro_f1 = precision_recall_fscore_support(y_true, y_pred, average="micro")[2]  # Micro F1-score
jaccard = jaccard_score(y_true, y_pred, average="samples")  # Jaccard similarity

# Display Metrics
print(f"Hamming Loss: {hamming:.4f}")
print(f"Macro F1-Score: {macro_f1:.4f}")
print(f"Micro F1-Score: {micro_f1:.4f}")
print(f"Jaccard Similarity Score: {jaccard:.4f}")

# Plot accuracy, precision, recall, and F1-score per class
metrics = {"Accuracy": accuracy_per_class, "Precision": precision, "Recall": recall, "F1-Score": f1}
fig, ax = plt.subplots(figsize=(10, 5))

for metric, values in metrics.items():
    ax.plot(range(1, len(values)+1), values, marker="o", label=metric)

ax.set_xticks(np.arange(19)+1)
ax.set_xlabel("Material Class")
ax.set_ylabel("Score")
ax.set_title("Performance Metrics per Class",fontsize=16, fontweight="bold")
ax.legend()
ax.grid(True, linestyle="--", alpha=0.6)
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, precision_recall_fscore_support, hamming_loss, jaccard_score

# Define material categories
part_list = [
    'Antenna', 'Antenna', 'Antenna',
    'Body Top', 'Body Top', 'Body Top',
    'Body Bottom', 'Body Bottom', 'Body Bottom',
    'Lateral Surface', 'Lateral Surface', 'Lateral Surface',
    'Lateral Surface', 'Lateral Surface', 'Lateral Surface',
    'Connectors', 'Panel', 'Panel', 'Panel'
]

# Stack predictions and true labels
y_true = np.vstack(label_list)  # True labels (multi-label)
y_pred = np.vstack(preds_list)  # Predicted labels

# Compute multi-label confusion matrices (one per class)
conf_matrices = multilabel_confusion_matrix(y_true, y_pred)

# 📌 **1️⃣ Better Confusion Matrix Layout (Grid View)**
num_classes = len(conf_matrices)  
cols = 5  
rows = (num_classes // cols) + (num_classes % cols > 0)  

fig, axes = plt.subplots(rows, cols, figsize=(15, rows * 3))
axes = axes.flatten()  

for i, cm in enumerate(conf_matrices):
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", ax=axes[i], cbar=False)
    axes[i].set_title(f"Material {i+1}\n{part_list[i]}", fontsize=15)
    axes[i].set_xlabel("Predicted", fontsize=10)
    axes[i].set_ylabel("Actual", fontsize=10)
    axes[i].tick_params(axis='both', labelsize=6)

for j in range(i + 1, len(axes)):
    fig.delaxes(axes[j])

plt.tight_layout()
plt.show()

# 📌 **2️⃣ Compute Classification Metrics with Clearer Names**
accuracy_per_class = np.mean(y_true == y_pred, axis=0)  # Accuracy per class
hamming = hamming_loss(y_true, y_pred)  # Average label error rate
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average=None)  # Per-class scores
macro_f1 = np.mean(f1)  # Overall Class Balance Score
micro_f1 = precision_recall_fscore_support(y_true, y_pred, average="micro")[2]  # Weighted Class Balance Score
jaccard = jaccard_score(y_true, y_pred, average="samples")  # Label Overlap Score

# 📌 **3️⃣ Print More Understandable Metrics**
# print(f"Average Label Error Rate (Hamming Loss): {hamming:.4f}")
# print(f"Overall Class Balance Score (Macro F1-Score): {macro_f1:.4f}")
# print(f"Weighted Class Balance Score (Micro F1-Score): {micro_f1:.4f}")
print(f"Label Overlap Score (Jaccard Similarity Score): {jaccard:.4f}")

# 📌 **4️⃣ Plot Metrics for Each Class**
metrics = {
#     "Accuracy per Material": accuracy_per_class,
#     "Label Correctness Rate (Precision)": precision,
#     "Label Coverage Rate (Recall)": recall,
    "Prediction Quality Score (F1-Score)": f1
}

fig, ax = plt.subplots(figsize=(10, 5))

for metric, values in metrics.items():
    ax.plot(range(1, len(values)+1), values, marker="o", label=metric)

ax.set_xticks(np.arange(len(part_list))+1)
ax.set_xticklabels(part_list, rotation=45, ha="right", fontsize=9)
ax.set_xlabel("Material Type", fontsize=12)
ax.set_ylabel("Score", fontsize=12)
ax.set_title("f1 score per Material", fontsize=16, fontweight="bold")
ax.legend(fontsize=10)
ax.grid(True, linestyle="--", alpha=0.6)
plt.show()
