In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn import metrics
from tqdm import tqdm # progress bar
import pandas as pd
import seaborn as sn # fancy plots

# Deep Learning imports
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from torchvision import transforms

In [None]:
# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Training device: {device}")

In [None]:
trained = True
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

**DataSets**

In [None]:
train_dataset = torchvision.datasets.FashionMNIST('classifier_data', train=True, download=True)
print("Train dataset loaded!")
test_dataset  = torchvision.datasets.FashionMNIST('classifier_data', train=False, download=True)
print("Test dataset loaded!")

In [None]:
print("Random examples from the dataset: \n")
label_names=['t-shirt/top','trouser','pullover','dress','coat','sandal','shirt',
             'sneaker','bag','ankle boot']
fig, axs = plt.subplots(3, 3, figsize=(8,8))
for ax in axs.flatten():
    img, label = random.choice(train_dataset)
    ax.imshow(np.array(img), cmap='gist_gray')
    ax.set_title(f'Label: {label_names[label]} [{label}]')
    ax.set_xticks([])
    ax.set_yticks([])
plt.tight_layout()

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])
train_dataset.transform = transform
test_dataset.transform = transform

In [None]:
# check what is going on AFTER the transformation is in place!
print(train_dataset)
print(train_dataset[0])
print(train_dataset[0][0])
print(train_dataset[0][1])

**DataLoader**

In [None]:
batch_size = 128
validation_split = .2

# Creating data indices for training and validation splits:
dataset_size = len(train_dataset)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
train_indices, val_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)

# Define train dataloader
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
# Define validation dataloader
validation_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=valid_sampler, shuffle = False)
# Define test dataloader
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle = False)


batch_data, batch_labels = next(iter(train_loader))
print(f"TRAIN BATCH SHAPE")
print(f"\t Data: {batch_data.shape}")
print(f"\t Labels: {batch_labels.shape}")

batch_data, batch_labels = next(iter(validation_loader))
print(f"VALIDATION BATCH SHAPE")
print(f"\t Data: {batch_data.shape}")
print(f"\t Labels: {batch_labels.shape}")

batch_data, batch_labels = next(iter(test_loader))
print(f"TEST BATCH SHAPE")
print(f"\t Data: {batch_data.shape}")
print(f"\t Labels: {batch_labels.shape}")

**CLASSIFICATION USING CNN**

In [None]:
class FashionNet(nn.Module):
    def __init__(self, n_classes, p1=0.5, p2=0.1, act=nn.ELU(True), batch_norm=True):
        
        super().__init__()
        # First convolutional layer
        c_hid1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, stride=1, padding=2)
        conv1_bn = nn.BatchNorm2d(16)

        # Second convolutional layer
        c_hid2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)
        conv2_bn = nn.BatchNorm2d(32)

        # Third convolutional layer
        c_hid3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2, padding=1)
        conv3_bn = nn.BatchNorm2d(64)

        # Dropout layer
        drop1 = nn.Dropout2d(p1)
        drop2 = nn.Dropout(p2)
        
        # Max pooling
        pool = nn.MaxPool2d(kernel_size=2)
        
        if batch_norm:
            self.cnn = nn.Sequential(c_hid1, pool, act, conv1_bn, c_hid2, pool, act, conv2_bn, c_hid3, act, conv3_bn, drop1)
        else:
            self.cnn = nn.Sequential(c_hid1, pool, act, c_hid2, pool, act, c_hid3, act, drop1)
        # Flatten Layer
        self.flatten = nn.Flatten(start_dim=1)
        
        # Fully connected layers
        self.fc = nn.Sequential(
            #First linear layer
            nn.Linear(in_features=64*4*4, out_features=392),
            nn.ReLU(True),
            drop2,
            # Second linear layer
            nn.Linear(in_features=392, out_features=98),
            nn.ReLU(True),
            drop2,
            # Third linear layer
            nn.Linear(in_features=98, out_features=n_classes))
        print("Network Initialized!")
        
    def forward(self, x):
        # Apply convolutions
        x = self.cnn(x)
        # Flatten
        x = self.flatten(x)
        # # Apply linear layers
        x = self.fc(x)
        return x

In [None]:
def training_step(model, train_loader, loss_fn, optimizer, train_loss_log, printer=True):
        
    model.train()
    train_loss = []
    train_correct = 0
    for sample_batched in train_loader:
        
        # Move data to device
        x_batch = sample_batched[0].to(device)
        label_batch = sample_batched[1].to(device)
        
        # Forward pass
        out = model(x_batch)

        # Compute loss
        loss = loss_fn(out, label_batch)

        # Backpropagation
        model.zero_grad()
        loss.backward()

        # Update the weights
        optimizer.step()

        # Save train loss for this batch
        loss_batch = loss.detach().cpu().numpy()
        train_loss.append(loss_batch) 
        
        scores, predictions = torch.max(out.data, 1)
        train_correct += (predictions == label_batch).sum().item()
        
    # Save average train loss over the batches
    train_loss = np.mean(train_loss)
    if(printer): print(f"AVERAGE TRAIN LOSS: {train_loss}")
    if(printer): print(f"TRAINING ACCURACY: {train_correct*100/len(train_loader.sampler)}")
    train_loss_log.append(train_loss)
    
def validation_step(model, val_loader, loss_fn, val_loss_log, printer = True):

    val_loss = []
    val_correct = 0
    model.eval() #evaluation mode
    with torch.no_grad():
        for sample_batched in val_loader:
            x_batch = sample_batched[0].to(device)
            label_batch = sample_batched[1].to(device)
            
            # Predict using the current model
            y_pred = model(x_batch)
            
            # Compute and save the val_loss for this batch 
            loss_batch = loss_fn(y_pred, label_batch).detach().cpu().numpy()
            val_loss.append(loss_batch)
            
            # Accuracy for this batch
            scores, predictions = torch.max(y_pred.data, 1)
            val_correct += (predictions == label_batch).sum().item()
            
        # Save average train loss over the batches
        val_loss = np.mean(val_loss)
        if(printer): print(f"AVERAGE VALIDATION LOSS: {val_loss}")
        if(printer): print(f"VALIDATION ACCURACY: {val_correct*100/len(val_loader.sampler)}")   
        val_loss_log.append(val_loss)

In [None]:
cnn = FashionNet(10, 0, 0, batch_norm=False).to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()

In [None]:
optimizer = optim.SGD(cnn.parameters(), lr = 1e-2)

In [None]:
if not trained:
    num_epochs = 30
    train_loss_log = []
    validation_loss_log = []
    for i in range(num_epochs):
        print('#################')
        print(f'# EPOCH {i}')
        print('#################')
        #Train pass
        training_step(cnn, train_loader, loss_fn, optimizer, train_loss_log, printer=True)
        #Validation pass
        validation_step(cnn, validation_loader, loss_fn, validation_loss_log, printer = True)

    #save the model               
    net_state_dict = cnn.state_dict()
    print(net_state_dict.keys())
    # Save the state dict to a file
    torch.save(net_state_dict, 'simple_classifier.torch')
else:
    # Load the state dict previously saved
    net_state_dict = torch.load('simple_classifier.torch', map_location=torch.device('cpu'))
    # Update the network parameters
    cnn.load_state_dict(net_state_dict)

## Result analysis

In [None]:
### Testing function
def test_step(model, device, dataloader, loss_fn):
    # Set evaluation mode
    model.eval()
    with torch.no_grad():
        # Define the lists to store the outputs for each batch
        conc_out = []
        conc_label = []
        for image_batch, label_batch in tqdm(dataloader):
            # Move tensor to the proper device
            image_batch = image_batch.to(device)
            label_batch = label_batch.to(device)
            # Forward pass
            out = model(image_batch)
            # Append the network output and the original image label to the lists
            conc_out.append(out)
            conc_label.append(label_batch)

        # Create a single tensor with all the values in the lists
        conc_out = torch.cat(conc_out)
        conc_label = torch.cat(conc_label)
        # Evaluate global loss
        val_loss = loss_fn(conc_out, conc_label)
    return conc_out, conc_label, val_loss.data # We return all the outputs, all the labels and the global loss

In [None]:
### Test loss
test_outputs, test_labels, test_loss = test_step(
    model=cnn,
    device=device, 
    dataloader=test_loader, 
    loss_fn=loss_fn)

# Compute accuracy
accuracy = 0
_, predictions = torch.max(test_outputs.data, 1)
accuracy += (predictions == test_labels).sum().item()
accuracy = accuracy/len(test_loader.sampler)*100
# Print Test loss
print(f"\n\nTEST LOSS : {test_loss}")
# Print accuracy
print(f"\nTEST ACCURACY : {accuracy}")

## Optimization

In [None]:
if not trained:
    ###########OPTIMIZATION###########
    ! pip install optuna
    import optuna
    from optuna.integration import PyTorchLightningPruningCallback
    EPOCHS = 10

    def objective(trial):

        # Lerning rate
        lr = trial.suggest_float("learning rate", 1e-5, 1e-1)

        # Dropout probability
        drop_prob1 = trial.suggest_float("dropout1", 0.1, 0.6)
        drop_prob2 = trial.suggest_float("dropout2", 0.1, 0.6)

        # Optimizer
        optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "Adagrad"])

        loss_func = nn.CrossEntropyLoss()

        model = FashionNet(10, drop_prob1, drop_prob2).to(device)
        optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)

        train_loss_log = []
        val_loss_log = []

        for epoch in range(EPOCHS):
            training_step(model, train_loader, loss_func, optimizer, train_loss_log, printer=False)
            validation_step(model, validation_loader, loss_func, val_loss_log, printer = False)

            trial.report(val_loss_log[-1], epoch)

            # Handle pruning based on the intermediate value.
            if trial.should_prune():
                raise optuna.exceptions.TrialPruned()

        return val_loss_log[-1]

    pruner: optuna.pruners.BasePruner = optuna.pruners.MedianPruner(n_startup_trials=3, n_warmup_steps=0, interval_steps=1)

    study = optuna.create_study(study_name="CNN", direction="minimize", pruner=pruner)
    study.optimize(objective, n_trials=40)

    print("Number of finished trials: {}".format(len(study.trials)))

    print("Best trial:")
    trial = study.best_trial

    print("  Value: {}".format(trial.value))

    print("  Params: ")
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))

In [None]:
print('#############')
print("BEST TRIAL")
print('#############')
print("\nParameters:")
print("learning rate: 0.009857165743292085")
print("dropout1: 0.4799839200065334")
print("dropout2: 0.24844218906669455")
print("optimizer: Adagrad")


In [None]:
learning_rate = 0.010723917200331065
dropout1 = 0.4540275192876409
dropout2 = 0.16484825460665797

cnn_adv = FashionNet(10,dropout1,dropout2).to(device)
optimizer = optim.Adagrad(cnn_adv.parameters(), lr=learning_rate)

In [None]:
if not trained:    
    num_epochs = 50
    train_loss_log = []
    validation_loss_log = []
    best_loss = np.infty
    patience = 5
    for i in range(num_epochs):
        print('#################')
        print(f'# EPOCH {i}')
        print('#################')
        #Train pass
        training_step(cnn_adv, train_loader, loss_fn, optimizer, train_loss_log, printer=True)
        #Validation pass
        validation_step(cnn_adv, validation_loader, loss_fn, validation_loss_log, printer = True)

        # Implement early stopping
        if(validation_loss_log[-1] < best_loss):
            best_loss = validation_loss_log[-1]
            patience = 5
        else:
            patience -= 1
            if(patience == 0): 
                print("#################\nLearning stopped because the validation error was not improving\n#################")
                break 
    #save the model               
    net_state_dict = cnn_adv.state_dict()
    print(net_state_dict.keys())
    # Save the state dict to a file
    torch.save(net_state_dict, 'classifier.torch')
else:
    # Load the state dict previously saved
    net_state_dict = torch.load('classifier.torch', map_location=torch.device('cpu'))
    # Update the network parameters
    cnn_adv.load_state_dict(net_state_dict)

In [None]:
test_outputs, test_labels, test_loss = test_step(
    model=cnn_adv,
    device=device, 
    dataloader=test_loader, 
    loss_fn=loss_fn)

# Compute accuracy
accuracy = 0
_, predictions = torch.max(test_outputs.data, 1)
accuracy += (predictions == test_labels).sum().item()
accuracy = accuracy/len(test_loader.sampler)*100
# Print Test loss
print(f"\n\nTEST LOSS : {test_loss}")
# Print accuracy
print(f"\nTEST ACCURACY : {accuracy}")

### Analyze network weights

In [None]:
print(cnn_adv)

In [None]:
### Extract weights 
# Layer 1 weights
weights_l1 = cnn_adv.cnn[0].weight.data.cpu().numpy()
# Layer 2 weights
weights_l2 = cnn_adv.cnn[4].weight.data.cpu().numpy()
# Layer 3 weights
weights_l3 = cnn_adv.cnn[8].weight.data.cpu().numpy()

### Plot the weights (this is a utility function, no need to analyze this code)
def plot_nchw_data(data, h_num, v_num, title):
    fig, axs = plt.subplots(h_num, v_num, figsize=(8,8))
    shape = data.shape
    data = data.reshape(shape[0]*shape[1], shape[2], shape[3])
    for idx, ax in enumerate(axs.flatten()):
        ax.set_xticks([])
        ax.set_yticks([])
        if idx < len(data):
            ax.imshow(data[idx,:,:], cmap='gist_yarg')
    plt.suptitle(title, fontsize = 18)
    plt.tight_layout(rect=[0, 0, 1, 0.97], h_pad=0, w_pad=0)
    #plt.savefig(title + ".pdf", format='pdf', bbox_inches = 'tight')
    plt.show()

plot_nchw_data(weights_l1, 4, 4, 'Layer 1 convolutional kernels')
plot_nchw_data(weights_l2, 4, 8, 'Layer 2 convolutional kernels')
plot_nchw_data(weights_l3, 8, 8, 'Layer 3 convolutional kernels')

## Feature maps

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

In [None]:
hook = cnn_adv.cnn[0].register_forward_hook(get_activation('conv1'))
data, _ = train_dataset[10]
data=data.to(device)
data.unsqueeze_(0)
output = cnn_adv(data)

k=0
act = activation['conv1'].squeeze()
fig,ax = plt.subplots(4,4,figsize=(12, 15))

for i in range(act.size(0)//4):
        for j in range(act.size(0)//4):
            ax[i,j].set_xticks([])
            ax[i,j].set_yticks([])
            ax[i,j].imshow(act[k].detach().cpu().numpy(), cmap='gray')
            k+=1
plt.suptitle('Layer 1 feature maps', fontsize = 22)
plt.tight_layout(rect=[0, 0, 1, 0.97], h_pad=0, w_pad=0)
#plt.savefig('conv1_feature_map.pdf', format='pdf', bbox_inches = 'tight')
hook.remove()

In [None]:
cnn_adv.cnn[4].register_forward_hook(get_activation('conv2'))
data, _ = train_dataset[10]
data=data.to(device)
data.unsqueeze_(0)
output = cnn_adv(data)
act = activation['conv2'].squeeze()
print(act.shape)

fig, axarr = plt.subplots(act.size(0)//4,4,figsize=(12, 16))
k=0
for i in range(act.size(0)//4):
        for j in range(4):
            axarr[i,j].set_xticks([])
            axarr[i,j].set_yticks([])
            axarr[i,j].imshow(act[k].detach().cpu().numpy(), cmap='gray')
            k+=1 
plt.suptitle('Layer 2 feature maps', fontsize = 22)
#plt.savefig('conv2_feature_map.pdf', format='pdf', bbox_inches = 'tight')
plt.tight_layout(rect=[0, 0, 1, 0.97], h_pad=0, w_pad=0)

## Confusion matrix

In [None]:
# Predicted labels
y_true = test_labels.cpu().data.numpy()
y_pred = test_outputs.cpu().argmax(dim=1).numpy()
cm = metrics.confusion_matrix(y_true, y_pred)

# Convert confusion matrices to pandas data frames
CM_df = pd.DataFrame(cm)

# Plot confusion matrices
fig, ax = plt.subplots(figsize=(8,6))
ax = sn.heatmap(CM_df, annot=True, cmap='rocket_r', vmax=450, fmt='d')
ax.set_xlabel("Predicted label", fontsize=12)
ax.set_ylabel("True label", fontsize = 12)
#plt.savefig("confusion_matrix.pdf", format='pdf', bbox_inches = 'tight')