In [4]:
import torch
import torch.nn as nn
import torch.utils.data as data
import torchvision
from torchvision import datasets, models, transforms

import os
import numpy as np
import matplotlib.pyplot as plt

#needed for evaluation metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import pandas as pd

#needed for calibration metrics
from sklearn.calibration import calibration_curve
from torchmetrics.classification import MulticlassCalibrationError
from sklearn.metrics import brier_score_loss

#new loss
from calibratedclassification.DominoLoss import DOMINO_Loss
from calibratedclassification.DominoLossW import DOMINO_Loss_W

from calibratedclassification.reliability_diagrams import *

import random
random.seed(1)
random_seed = random.seed(1)

from calibratedclassification.RCR import RCRMetric#, RCRMMetric

from PIL import Image
from sklearn.model_selection import train_test_split

In [5]:
BATCH_SIZE_train = 100#500#
BATCH_SIZE_test = 100

object_categories = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "]
object_dict = {}
for o in range(len(object_categories)):
    object_dict[object_categories[o]] = o

def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.1307,])
    std = np.array([0.3081,])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    fig = plt.figure(figsize=(200., 200.))
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are update

In [7]:
# Define transformations to be applied to each image
data_transform = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.RandomApply([transforms.RandomHorizontalFlip()], p=0.25),  # Apply with 50% probability
    transforms.RandomApply([transforms.RandomRotation(15)], p=0.25),  # Apply with 50% probability
    transforms.RandomApply([transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)], p=0.25),  # Apply with 50% probability
    transforms.ToTensor()
])

# Define the root directory of your dataset
train_dir = '/home/kyle/Desktop/Misc/Datasets/StanfordCarsClassFolders/car_data/train/'
valid_dir = '/home/kyle/Desktop/Misc/Datasets/StanfordCarsClassFolders/car_data/test/'
#test_dir = '/blue/ruogu.fang/skylastolte4444/Airplanes/Diffusion/Data/cars/test/'

# Extract class names from folder names
#class_names = [folder_name for folder_name in os.listdir(data_dir) 
#               if os.path.isdir(os.path.join(data_dir, folder_name)) 
#               and not folder_name.endswith('.txt')]

#class_names = []

# Create a dataset from the CustomDataset class
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self.dataset[idx]

def ball_data(data_dir):
    class_names = []
    folder_names = os.listdir(data_dir)
    folder_names.sort()

    for folder_name in folder_names:
        full_path = os.path.join(data_dir, folder_name)
        if os.path.isdir(full_path) and not os.path.isfile(full_path) and 'ipynb' not in folder_name:
            class_names.append(folder_name)

    # Filter out images containing "inverted" in their name
    dataset_filtered = []
    for class_name in class_names:
        class_dir = os.path.join(data_dir, class_name)
        image_files = [file for file in os.listdir(class_dir) if file.endswith(('.png', '.jpg')) and "inverted" not in file]
        class_images = [(os.path.join(class_dir, image_file), class_names.index(class_name)) for image_file in image_files]
        dataset_filtered.extend(class_images)

    # Define the number of augmented samples to be generated for each original image
    num_augmented_samples = 1

    # Apply augmentation to each original image and add to the dataset
    dataset_augmented = []
    for image_path, label in dataset_filtered:
        image = Image.open(image_path).convert("RGB")
        for _ in range(num_augmented_samples):
            augmented_image = data_transform(image)
            dataset_augmented.append((augmented_image, label))

    # Create a dataset from the augmented list
    dataset = CustomDataset(dataset_augmented)
    return dataset, class_names

print(f"Number of images training: {len(ball_data(train_dir)[0])}")

# Define batch size
BATCH_SIZE = 30

train = ball_data(train_dir)
val = ball_data(valid_dir)

# Create a DataLoader to load the data in batches
train_loader = torch.utils.data.DataLoader(train[0], batch_size=BATCH_SIZE, shuffle=True)
val_loader = torch.utils.data.DataLoader(val[0], batch_size=BATCH_SIZE, shuffle=True)
test_loader = val_loader
#test_loader = torch.utils.data.DataLoader(ball_data(test_dir)[0], batch_size=BATCH_SIZE, shuffle=True)

# Iterate through the DataLoader only once
#for batch in train_loader:
#    images, labels = batch
#    # Display a few sample images
#    num_samples = 5
#    fig, axes = plt.subplots(1, num_samples, figsize=(20, 5))
#    for i in range(num_samples):
#        image = images[i].permute(1, 2, 0).numpy()  # Convert tensor to numpy array and rearrange dimensions
#        label = ball_data(train_dir)[1][labels[i]]  # Get the class label
#        axes[i].imshow(image)
#        axes[i].set_title(f'Label: {label}')
#        axes[i].axis('off')
#    plt.show()
#    break  # Break after displaying one batch
    
#N_CLASSES = 18 #len(torch.unique(labels))
IMG_SIZE = train[0].shape[2]
IMG_CH = train[0].shape[1]

print(f"Image size: {IMG_SIZE}")
print(f"Number of channels: {IMG_CH}")

class_names = train[1]

#class_names = os.listdir(data_dir)
N_CLASSES = len(np.unique(class_names))

print(f"Number of classes: {N_CLASSES}")

print("Class names:", class_names)

Number of images training: 8144


AttributeError: 'CustomDataset' object has no attribute 'shape'

class_names = ball_data(train_dir)[1]

#class_names = os.listdir(data_dir)
N_CLASSES = len(np.unique(class_names))

print(f"Number of classes: {N_CLASSES}")

print("Class names:", class_names)

for i, data in enumerate(matrix_maker):
  image, label = data
  print(torch.unique(label))

np.random.seed(42)
from mpl_toolkits.axes_grid1 import ImageGrid

fig = plt.figure(1, figsize=(18, 16))
grid = ImageGrid(fig, 111, nrows_ncols=(5, 5), axes_pad=0.05)
for i, data in enumerate(test_loader):
  image, label = data

images = image[0:25, :, :, :]
labels = label[0:25]

images = torch.transpose(images, 1, 3)
images = torch.transpose(images, 1, 2)

for i in range(25):

  ax = grid[i]
  ax.imshow(images[i,:,:,:])
  ax.text(0, 32, 'Label: {}'.format(labels[i]), color='k', backgroundcolor='w', alpha=0.8)
  ax.axis(False)

plt.show()

In [None]:
#add directories for models and results

#generic directories
working_root = '/blue/ruogu.fang/skylastolte4444/Airplanes/SAR_for_Uncertainty-main/SAR_for_Uncertainty-main/'
model_save_path = working_root + 'models_cars/'
results_save_path = working_root + 'results_cars/'

#specific model definitions
model_name = 'resnet50_CE'#DOMINO-SSIM_REAL'
DOMINO = False
DOMINOW = False

#specific model path
results_model = results_save_path + model_name

#make all non-existing directories
isModelPath = os.path.exists(model_save_path)
isResultsPath = os.path.exists(results_save_path)
isModelResultsPath = os.path.exists(results_model)

if not isModelPath:
    os.mkdir(model_save_path)
    
if not isResultsPath:
    os.mkdir(results_save_path)

if not isModelResultsPath:
    os.mkdir(results_model)

#os.makedirs(results_save_path + model_name

In [None]:
if DOMINO or DOMINOW:
    #matrix_dir = working_root + 'scripts/OxfordSubDir/' #matrix_penalties_pet/'#matrix_penalties_pet/'
    matrix_dir = '/blue/ruogu.fang/skylastolte4444/Airplanes/Diffusion/'
    #matrix_vals = pd.read_csv(matrix_dir + 'oxfordpets_ssim_matrix_norm4.csv', header = 0, index_col=0) #'Dictionary_matrixpenalty_inv_patches_v1_1024.csv', index_col = 0) #header=None
    #matrix_vals = pd.read_csv(matrix_dir + 'hc_matrixpenalty.csv', index_col = None, header=None)
    matrix_vals = pd.read_csv(matrix_dir + 'cars_s64_e100_t900.csv', index_col=0, header=0)
    matrix_penalty = 3.0 * torch.from_numpy(matrix_vals.to_numpy())
    matrix_penalty = matrix_penalty.float().cuda()
    print(matrix_penalty.shape)
    
if DOMINO:
    a = 0.5
    b = 0.5

In [None]:
# load a pre-trained model
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
#model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
#model.fc.out_features = len(object_categories)
model.fc = nn.Linear(2048, len(class_names))
#model.fc = nn.Linear(512, len(object_categories))

#pretrained = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
#backbone = torch.nn.Sequential(*(list(pretrained.children())[:-1]))
#model = torch.nn.Sequential(backbone, nn.Linear(2048, len(object_categories)))

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# Specify Loss
if DOMINO:
    criterion = DOMINO_Loss()
elif DOMINOW:
    criterion = DOMINO_Loss_W()
else:
    criterion = nn.CrossEntropyLoss()

 # construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# let's train it for 1 epoch
num_epochs = 50
logging_freq = 147
save_freq = 5

# Get a batch of training data
##inputs, classes = next(iter(train_loader))
#inputs, classes = next(iter(dataloaders['train']))
#print(inputs.shape)
##inputs_min = inputs[0:32, :, :, :]
# Make a grid from batch
##out = torchvision.utils.make_grid(inputs_min)
##imshow(out, title=[class_names[x] for x in classes])
#plt.show()

#print("Train size: %d Val size: %d" % (train_size, valid_size))#dataset_sizes['train'], dataset_sizes['val']))

In [None]:
val_acc_best = 0
for epoch in range(num_epochs):
    running_loss = 0.
    correct = 0.
    seen = 0.
    val_correct = 0.
    val_seen = 0.
    logging_step = 1
    
    for i, data in enumerate(train_loader, 0):##dataloaders['train'], 0):
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)

        bs, c, h, w = inputs.shape
        if c == 1:
            inputs = inputs.repeat(1, 3, 1, 1).float()

        optimizer.zero_grad()
        outputs = model(inputs)
        #outputs = outputs.logits
        if DOMINO:
            loss = criterion(outputs, labels, matrix_penalty, a, b)
        elif DOMINOW:
            loss = criterion(outputs, labels, matrix_penalty, 1)
        else:
            loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        correct += (outputs.argmax(dim=1) == labels).float().sum()
        seen += len(labels)

    for i, data in enumerate(val_loader, 0):##dataloaders['val'], 0):
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)

        bs, c, h, w = inputs.shape
        if c == 1:
            inputs = inputs.repeat(1, 3, 1, 1).float()

        outputs = model(inputs)
    
        val_correct += (outputs.argmax(dim=1) == labels).float().sum()
        val_seen += len(labels)
    
    #changed to only save models when validation improves
    val_acc = val_correct/val_seen
    if val_acc>val_acc_best:
        torch.save(model.state_dict(), model_save_path + model_name + '.pth')
        val_acc_best=val_acc
        print('The new best validation accuracy is %.4f, saving model' % (val_acc_best))

    print("Epoch %d, loss: %.3f, Train acc: %.4f, Val acc: %.4f" % (epoch + 1,  running_loss/seen, correct/seen, val_correct/val_seen))
 

    #if (epoch +1) % save_freq == 0:
    #    torch.save(model.state_dict(),'./models/checkpoint_%d.pth'%(epoch+1))

In [None]:
#reload best model
model.load_state_dict(torch.load(model_save_path + model_name + '.pth'))

#recompute the validation using only the best performing model 
#(hopefully the following steps may be replaced by a testing dataset)

val_correct = 0.
val_seen = 0.

for i, data in enumerate(matrix_maker, 0):##dataloaders['val'], 0):
    inputs, labels = data
    inputs = inputs.to(device)
    labels = labels.to(device)

    bs, c, h, w = inputs.shape
    if c == 1:
        inputs = inputs.repeat(1, 3, 1, 1).float()

    outputs = model(inputs)
    
    val_correct += (outputs.argmax(dim=1) == labels).float().sum()
    val_seen += len(labels)
    
    #save targets, predictions, and outputs for analysis
    if i==0:
        outputs_total = outputs
        #preds_total = outputs.argmax(dim=1)
        labels_total = labels
    else:
        outputs_total = torch.cat((outputs_total, outputs), dim=0)
        #preds_total = torch.cat((preds_total, outputs.argmax(dim=1)), dim=0)
        labels_total = torch.cat((labels_total, labels), dim=0)

preds_total = outputs_total.argmax(dim=1)        
        
#verify sizes
print(outputs_total.shape)
print(preds_total.shape)
print(labels_total.shape)

#outputs_total = outputs_total.cpu().detach().numpy()
preds_total = preds_total.cpu().detach().numpy()
labels_total = labels_total.cpu().detach().numpy()

print('The accuracy on the validation set is: %.4f' % (val_correct/val_seen))

#confusion matrix on validation data

def plot_confusion_matrix(labels, pred_labels, classes):

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(1, 1, 1)
    cm = confusion_matrix(labels, pred_labels)
    cm = ConfusionMatrixDisplay(cm, display_labels=classes)
    cm.plot(values_format='d', cmap='Blues', ax=ax)
    plt.grid(False)
    plt.xticks(rotation=90)

plot_confusion_matrix(labels_total, preds_total, class_names)
plt.tight_layout()
plt.savefig(results_model + '/confusionmatrix_validmat.png')

#will need this to compute loss term
df_cm = pd.DataFrame(confusion_matrix(labels_total,preds_total))
df_cm.to_csv(results_model + '/confusionmatrix_validmat.csv')

del outputs_total 
#del preds_total
del labels_total

torch.cuda.empty_cache()

In [None]:
#compute test using only the best performing model 
#(hopefully the following steps may be replaced by a testing dataset)

test_correct = 0.
test_seen = 0.
#rcrm_metric_total = 0.
num_batches = 0.

for i, data in enumerate(test_loader, 0):##dataloaders['val'], 0):
    
    torch.cuda.empty_cache()
    
    inputs, labels = data
    inputs = inputs.to(device)
    labels = labels.to(device)

    bs, c, h, w = inputs.shape
    #print(inputs.shape)
    if c == 1:
        inputs = inputs.repeat(1, 3, 1, 1).float()

    outputs = model(inputs)
    
    test_correct += (outputs.argmax(dim=1) == labels).float().sum()
    test_seen += len(labels)
    
    # Calculate RCR-M metric
    #rcrm_metric = RCRMMetric().calculate_rcr_metric(model = model, data = torch.Tensor(inputs).cuda(), target = torch.Tensor(labels).cuda(), num_components=3)
    #print(f"RCR Metric: {rcrm_metric}")
    #rcrm_metric_total += rcrm_metric
    
    #save targets, predictions, and outputs for analysis
    if i==0:
        outputs_total = outputs.cpu().detach().numpy()
        #preds_total = outputs.argmax(dim=1)
        labels_total = labels.cpu().detach().numpy()
        inputs_total = inputs.cpu().detach().numpy()
    else:
        outputs_total = np.concatenate((outputs_total, outputs.cpu().detach().numpy()), axis=0)
        #preds_total = torch.cat((preds_total, outputs.argmax(dim=1)), dim=0)
        labels_total = np.concatenate((labels_total, labels.cpu().detach().numpy()), axis=0)
        inputs_total = np.concatenate((inputs_total, inputs.cpu().detach().numpy()), axis=0)
        
    num_batches += 1
        
    torch.cuda.empty_cache()
    
    #print(i)
    
    #if i > 945:
    #    break

preds_total = torch.Tensor(outputs_total).argmax(dim=1)

#rcrm_metric_total = rcrm_metric_total / num_batches
rcrm_metric = RCRMMetric().calculate_rcr_metric(model = model, data = torch.Tensor(inputs_total).cuda(), target = torch.Tensor(labels_total).cuda(), num_components=1)
print(f"RCR Metric: {rcrm_metric}")
        
#verify sizes
print(outputs_total.shape)
print(preds_total.shape)
print(labels_total.shape)

#outputs_total = outputs_total.cpu().detach().numpy()
preds_total = preds_total.cpu().detach().numpy()
#labels_total = labels_total.cpu().detach().numpy()

print('The accuracy on the testing set is: %.4f' % (test_correct/test_seen))

In [None]:
#confusion matrix on test data

def plot_confusion_matrix(labels, pred_labels, classes):

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(1, 1, 1)
    cm = confusion_matrix(labels, pred_labels)
    cm = ConfusionMatrixDisplay(cm, display_labels=classes)
    cm.plot(values_format='d', cmap='Blues', ax=ax)
    plt.grid(False)
    plt.xticks(rotation=90)

plot_confusion_matrix(labels_total, preds_total, class_names)
plt.tight_layout()
plt.savefig(results_model + '/confusionmatrix_test.png')

#will need this to compute loss term
df_cm = pd.DataFrame(confusion_matrix(labels_total,preds_total))
df_cm.to_csv(results_model + '/confusionmatrix_test.csv')

In [None]:
#classification report on test data

report =  classification_report(labels_total, preds_total, target_names = class_names, output_dict = True)
print(classification_report(labels_total, preds_total, target_names = class_names))

df = pd.DataFrame(report).transpose()
df.to_csv(results_model + '/classificationreport.csv')

In [None]:
# Calculate RCR-G metric
rcrg_metric = RCRGMetric().calculate_rcr_metric(output = torch.Tensor(outputs_total).cuda(), target = torch.Tensor(preds_total).cuda())
print(f"RCR Metric: {rcrg_metric}")

In [None]:
#calibration metrics need output softmax

m = nn.Softmax(dim=1)
outputs_total = m(torch.Tensor(outputs_total))
outputs_total = outputs_total.cpu().detach().numpy()

In [None]:
plt.style.use("seaborn")

plt.rc("font", size=12)
plt.rc("axes", labelsize=12)
plt.rc("xtick", labelsize=12)
plt.rc("ytick", labelsize=12)
plt.rc("legend", fontsize=12)

plt.rc("axes", titlesize=16)
plt.rc("figure", titlesize=16)
title = "Total Calibration Curve"

output_conf = np.max(outputs_total, axis=1)

fig = reliability_diagram(labels_total, preds_total, output_conf, num_bins=10, draw_ece=True,
                          draw_bin_importance="alpha", draw_averages=True,
                          title=title, figsize=(6, 6), dpi=100, 
                          return_fig=True)
fig.tight_layout()

fig.savefig(results_model + '/' + 'allclass_calibrationcurve' + '.pdf')
plt.close()

In [None]:
#calibration curves and Brier Loss

for i in range(len(class_names)):
    
    labels_binary = np.zeros(len(labels_total))
    labels_binary[np.where(labels_total == i)] = 1
    #current_label = class_names[i]
    
    prob_true, prob_pred = calibration_curve(labels_binary, outputs_total[:,i], n_bins=10, strategy = 'quantile')
    
    #print('prob_true:')
    #print(prob_true)
    
    #print('prob_pred:')
    #print(prob_pred)
    
    clf_score = brier_score_loss(labels_binary, outputs_total[:,i], pos_label=1)
    clf_score = np.round(clf_score,3)
    
    # Plot perfectly calibrated
    plt.plot([0, 1], [0, 1], linestyle = '--', label = 'Ideally Calibrated')# + str(Standard))

    # Plot model's calibration curve
    plt.plot(prob_pred, prob_true, marker = '.', label = 'Baseline Model')
    leg = plt.legend(loc = 'upper left')
    plt.xlabel('Average Predicted Probability in each bin')
    plt.ylabel('Ratio of positives')
    plt.title('Calibration Curve for ' + class_names[i] + ' with Brier Loss: ' + str(clf_score))
    plt.savefig(results_model + '/Calibration-Curve_Class-' + class_names[i] + '.pdf')
    plt.close()
    
    #Plot all Confidence scores for this class
    plt.hist(outputs_total[:,i])
    plt.title('Full Histogram for ' + class_names[i])
    plt.savefig(results_model + '/Histogram_Full_Class-' + class_names[i] + '.pdf')
    plt.close()
    
    #Plot only Confidence scores for this class in which this was the correct class
    outputs_total_pos = np.delete(outputs_total[:,i], np.where(labels_binary == 0))
    plt.hist(outputs_total_pos)
    plt.title('Positive only Histogram for ' + class_names[i])
    plt.savefig(results_model + '/Histogram_Pos_Class-' + class_names[i] + '.pdf')
    plt.close()

In [None]:
#other calibration scores

o = torch.Tensor(outputs_total)
l = torch.Tensor(labels_total)

metric1 = MulticlassCalibrationError(num_classes=len(class_names), n_bins=10, norm='l1')
ECE = metric1(o,l)
metric2 = MulticlassCalibrationError(num_classes=len(class_names), n_bins=10, norm='l2')
RMSCE = metric2(o,l)
metric3 = MulticlassCalibrationError(num_classes=len(class_names), n_bins=10, norm='max')
MCE = metric3(o,l)

print('ECE: %.4f' % (ECE))
print('RMSCE: %.4f' % (RMSCE))
print('MCE: %.4f' % (MCE))

#will need this to compute loss term
data = [['ECE', ECE], ['RMSCE', RMSCE], ['MCE', MCE]]
df_calmet = pd.DataFrame(data=data, columns=['Metric', 'Value'])
df_calmet.to_csv(results_model + '/calibrationmetrics.csv')