<a href="https://colab.research.google.com/github/sanggusti/30-days-kaggle/blob/main/jokian_jedoy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pytorch-lightning
!pip install medmnist
!pip install wandb

In [None]:
import os
import sys
from tqdm import trange
from tqdm import tqdm
from skimage.util import montage
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets


import medmnist
from medmnist.dataset import PathMNIST, ChestMNIST, DermaMNIST, OCTMNIST, PneumoniaMNIST, RetinaMNIST, BreastMNIST, OrganMNISTAxial, OrganMNISTCoronal, OrganMNISTSagittal
from medmnist.evaluator import getAUC, getACC
from medmnist.info import INFO

import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger


In [None]:
print("Version:", medmnist.__version__)

In [None]:
flag_to_class = {
    "pathmnist": PathMNIST,
    "chestmnist": ChestMNIST,
    "dermamnist": DermaMNIST,
    "octmnist": OCTMNIST,
    "pneumoniamnist": PneumoniaMNIST,
    "retinamnist": RetinaMNIST,
    "breastmnist": BreastMNIST,
    "organmnist_axial": OrganMNISTAxial,
    "organmnist_coronal": OrganMNISTCoronal,
    "organmnist_sagittal": OrganMNISTSagittal,
}

## B. Logistic Regression on BreastMNIST

Objectives:

Built 2 models with these criteria:
  - Using the built-in logistic regression functions in scikit-learn, train a logistic regression model with L2 regularisation on the training set, use the validation set to choose a good regularisation parameter (a hyperparameter) from at least three choices, and test the chosen model on the test set. Report the three metrics M1 to M3
  - Using PyTorch (see Question 5 of Lab 6), train a logistic regression model with L2 regularisation on the training set, use the validation set to choose a good regularisation parameter (a hyperparameter) from at least three choices, and test the chosen model on the test set. Report the three metrics M1 to M3

Get these metrics too

1. Data loading and Inspection
2. Logistic Regression using this [model](https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html)
3. Track metrics:
  - M1 Training accuracy
  - M2 Validation accuracy
  - M3 Testing accuracy
4. Performance comparison:
  - summarise each of the three metrics from the two mdoels using one or more bar graphs
  - Describe at least two observations interesting to you


## Tuning Here

Current: *BreastMNIST*

In [None]:
data_flag = 'breastmnist'
download = True
input_root = 'breastmnist/'

NUM_EPOCHS = 10
BATCH_SIZE = 128
lr = 0.001

In [None]:
!mkdir breastmnist

In [None]:
DataClass = flag_to_class[data_flag]

info = INFO[data_flag]
task = info['task']
n_channels = info['n_channels']
n_classes = len(info['label'])

In [None]:
# preprocessing
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])
])

# load the data
train_dataset = DataClass(root=input_root, split='train', transform=data_transform, download=download)
validation_dataset = DataClass(root=input_root, split='val', transform=data_transform, download=download)
test_dataset = DataClass(root=input_root, split='test', transform=data_transform, download=download)
nonorm_dataset = DataClass(root=input_root, split='train', transform=transforms.ToTensor(), download=download)

# encapsulate data into dataloader form
train_loader = data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = data.DataLoader(dataset=validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = data.DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
print(train_dataset)
print("===================")
print(validation_dataset)
print("===================")
print(test_dataset)
print("===================")
print(nonorm_dataset)

In [None]:
# visualization

img, target = nonorm_dataset[0]
if n_channels == 1:
    img = img.reshape(28, 28)
    plt.imshow(img, cmap='gray')
else:
    img = img.permute(1, 2, 0)
    plt.imshow(img)

In [None]:
# montage

def process(n_channels, length=20):
    scale = length * length

    image = np.zeros((scale, 28, 28, 3)) if n_channels == 3 else np.zeros((scale, 28, 28))
    index = [i for i in range(scale)]
    np.random.shuffle(index)
    
    for idx in range(scale):
        img, _ = nonorm_dataset[idx]
        if n_channels == 3:
            img = img.permute(1, 2, 0).numpy()
        else:
            img = img.reshape(28, 28).numpy()
        image[index[idx]] = img

    if n_channels == 1:
        image = image.reshape(scale, 28, 28)
        arr_out = montage(image)
        plt.imshow(arr_out, cmap='gray')
    else:
        image = image.reshape(scale, 28, 28, 3)
        arr_out = montage(image, multichannel=3)
        plt.imshow(arr_out)
    
process(n_channels=n_channels, length=20)

In [None]:
from sklearn.linear_model import LogisticRegression
np.random.seed(2020) #set a seed for reproducibility
clf = LogisticRegression(solver='lbfgs')  #clf: classifier
clf.fit(sgx, sgy)

In [None]:
# here lies the logistic regression model for BreastMNIST
# Logistic Regression using PyTorch
class LogisticRegression(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(LogisticRegression, self).__init__()
        self.fc1 = nn.Linear(input_dim, num_classes)

    def forward(self, x_in):
        z = self.fc1(x_in)
        return z

model = LogisticRegression(input_dim=INPUT_DIM, num_classes=NUM_CLASSES)
print (model.named_parameters)

In [None]:
# write a function to get random samples (a batch) from sgx and sgy
def get_batch_logi_regress(sgx,sgy,batch_size=32):

    #create a vector with the indexes: from 0 to the size of the input data
    indexes= torch.linspace(0,sgx.shape[0]-1,steps=sgx.shape[0])
    
    #sample random indicies from the vector above, these 32 
    #random numbers are the row indexes for the batch data.
    
    random_indexes=torch.multinomial(indexes,32)
    
    batch_x= sgx[random_indexes]
    batch_y= sgy[random_indexes]
    
    return batch_x, batch_y

In [None]:
criterion = torch.nn.BCELoss()


from itertools import count
for batch_idx in count(1):
    # Get data
    batch_x, batch_y = get_batch_logi_regress(sgx,sgy)
    # Reset gradients
    model.zero_grad()

    # Forward pass
    # however we need to change the above ndarray to torch
    # tensors before calling the function LR.forward
    
    #warping ndarray to torch tensors
    batch_x=(torch.from_numpy(batch_x.astype(np.float32)))
    batch_y=(torch.from_numpy(batch_y.astype(np.float32)))
    
    
    output = criterion(torch.squeeze(LR(batch_x)), batch_y)
    loss = output.item()
    
    # Backward pass
    output.backward()

    # Apply gradients
    for param in model.parameters():
        param.data.add_(-0.5 * param.grad.data)

    # Stop criterion
    if abs(loss) < 1e-2:
        break

print('Loss: {:.6f} after {} batches'.format(loss, batch_idx))

## C. Convolutional Neural Network on OctMNIST
1. Data loading and Inspection
2. CNN with metrics:
  - Training accuracy
  - Validation accuracy
  - Testing accuracy
  - Training time

Design two design of CNN with specified architecture
- Design a CNN with two Conv layers and two FC layers. Train the model on the training set, use the validation set to choose the best design among at least three different choices, and test the chosen model on the test set. Report the four metrics M1 to M4
- Design a CNN with three Conv layers and three FC layers. Train the model on the training set, use the validation set to choose the best design among at least three different choices, and test the chosen model on the test set. Report the four metrics M1 to M4

3. Performance comparison
  - Summarise each of the four metrics from the two models in 2 using one or more bar graphs
  - Describe at least two observations

In [None]:
# Preprocessing, Data Loading and Inspection
data_flag = 'octmnist'
download = True
input_root = 'octmnist/'

NUM_EPOCHS = 10
BATCH_SIZE = 128
lr = 0.001

In [None]:
!mkdir octmnist

In [None]:
DataClass = flag_to_class[data_flag]

info = INFO[data_flag]
task = info['task']
n_channels = info['n_channels']
n_classes = len(info['label'])

In [None]:
# preprocessing
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])
])

# load the data
train_dataset = DataClass(root=input_root, split='train', transform=data_transform, download=download)
validation_dataset = DataClass(root=input_root, split='val', transform=data_transform, download=download)
test_dataset = DataClass(root=input_root, split='test', transform=data_transform, download=download)
nonorm_dataset = DataClass(root=input_root, split='train', transform=transforms.ToTensor(), download=download)

# encapsulate data into dataloader form
train_loader = data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = data.DataLoader(dataset=validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = data.DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
print(train_dataset)
print("===================")
print(validation_dataset)
print("===================")
print(test_dataset)
print("===================")
print(nonorm_dataset)

In [None]:
# visualization

img, target = nonorm_dataset[0]
if n_channels == 1:
    img = img.reshape(28, 28)
    plt.imshow(img, cmap='gray')
else:
    img = img.permute(1, 2, 0)
    plt.imshow(img)

In [None]:
# montage

def process(n_channels, length=20):
    scale = length * length

    image = np.zeros((scale, 28, 28, 3)) if n_channels == 3 else np.zeros((scale, 28, 28))
    index = [i for i in range(scale)]
    np.random.shuffle(index)
    
    for idx in range(scale):
        img, _ = nonorm_dataset[idx]
        if n_channels == 3:
            img = img.permute(1, 2, 0).numpy()
        else:
            img = img.reshape(28, 28).numpy()
        image[index[idx]] = img

    if n_channels == 1:
        image = image.reshape(scale, 28, 28)
        arr_out = montage(image)
        plt.imshow(arr_out, cmap='gray')
    else:
        image = image.reshape(scale, 28, 28, 3)
        arr_out = montage(image, multichannel=3)
        plt.imshow(arr_out)
    
process(n_channels=n_channels, length=20)

Design a CNN with **two Conv layers** and **two FC layers**. Train the model on the training set, use the validation set to choose the best design among at least three different choices, and test the chosen model on the test set. Report the four metrics M1 to M4

In [None]:
# define a simple CNN model

class CNN_A(nn.Module):
    """
    2 Convolutional layers and 2 FC layers
    """
    def __init__(self, in_channels, num_classes):
        super(CNN_A, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=3),
            nn.BatchNorm2d(32),
            nn.ReLU())

        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        # self.layer3 = nn.Sequential(
        #     nn.Conv2d(16, 64, kernel_size=3),
        #     nn.BatchNorm2d(64),
        #     nn.ReLU())
        
        # self.layer4 = nn.Sequential(
        #     nn.Conv2d(64, 64, kernel_size=3),
        #     nn.BatchNorm2d(64),
        #     nn.ReLU())

        # self.layer5 = nn.Sequential(
        #     nn.Conv2d(64, 64, kernel_size=3, padding=1),
        #     nn.BatchNorm2d(64),
        #     nn.ReLU(),
        #     nn.MaxPool2d(kernel_size=2, stride=2))

        self.fc = nn.Sequential(
            nn.Linear(64 * 4 * 4, 128),
            nn.ReLU()
        )


        self.fc2 = nn.Sequential(
            nn.Linear(128, num_classes)
        )


    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        x = self.fc2(x)
        return x

model = CNN_A(in_channels=n_channels, num_classes=n_classes)
print(model)
    
# define loss function and optimizer
if task == "multi-label, binary-class":
    criterion = nn.BCEWithLogitsLoss()
else:
    criterion = nn.CrossEntropyLoss()
    
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

In [None]:
# wandb_logger = WandbLogger()
# trainer = pl.Trainer(
#     gpus = min(1, torch.cuda.device_count()),
#     max_epochs=NUM_EPOCHS,
#     progress_bar_refresh_rate=20,
#     logger=wandb_logger
# )

# trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=validation_loader)

In [None]:
wandblogger = WandbLogger()

for epoch in range(NUM_EPOCHS):
    train_correct = 0
    train_total = 0
    test_correct = 0
    test_total = 0
    
    model.train()
    for inputs, targets in tqdm(train_loader):
        # forward + backward + optimize
        optimizer.zero_grad()
        outputs = model(inputs)
        
        if task == 'multi-label, binary-class':
            targets = targets.to(torch.float32)
            loss = criterion(outputs, targets)
        else:
            targets = targets.squeeze().long()
            loss = criterion(outputs, targets)
        
        loss.backward()
        optimizer.step()

In [None]:
# evaluation

def test(split):
    model.eval()
    y_true = torch.tensor([])
    y_score = torch.tensor([])
    
    data_loader = train_loader if split == 'train' else test_loader

    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)

            if task == 'multi-label, binary-class':
                targets = targets.to(torch.float32)
                m = nn.Sigmoid()
                outputs = m(outputs)
            else:
                targets = targets.squeeze().long()
                m = nn.Softmax(dim=1)
                outputs = m(outputs)
                targets = targets.float().resize_(len(targets), 1)

            y_true = torch.cat((y_true, targets), 0)
            y_score = torch.cat((y_score, outputs), 0)

        y_true = y_true.numpy()
        y_score = y_score.detach().numpy()
        auc = getAUC(y_true, y_score, task)
        acc = getACC(y_true, y_score, task)
    
        print('%s  acc: %.3f  auc:%.3f' % (split, acc, auc))

        
print('==> Evaluating ...')
test('train')
test('test')

Design a CNN with **three Conv layers** and **three FC layers**. Train the model on the training set, use the validation set to choose the best design among at least three different choices, and test the chosen model on the test set. Report the four metrics M1 to M4

In [None]:
# define a simple CNN model

class CNN_A(nn.Module):
    """
    3 Convolutional layers and 3 FC layers
    """
    def __init__(self, in_channels, num_classes):
        super(CNN_A, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=3),
            nn.BatchNorm2d(32),
            nn.ReLU())

        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer3 = nn.Sequential(
            nn.Conv2d(16, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU())
        
        # self.layer4 = nn.Sequential(
        #     nn.Conv2d(64, 64, kernel_size=3),
        #     nn.BatchNorm2d(64),
        #     nn.ReLU())

        # self.layer5 = nn.Sequential(
        #     nn.Conv2d(64, 64, kernel_size=3, padding=1),
        #     nn.BatchNorm2d(64),
        #     nn.ReLU(),
        #     nn.MaxPool2d(kernel_size=2, stride=2))

        self.fc = nn.Sequential(
            nn.Linear(64 * 4 * 4, 128),
            nn.ReLU()
        )

        self.fc2 = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU()
        )

        self.fc3 = nn.Sequential(
            nn.Linear(128, num_classes)
        )


    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

model = CNN_A(in_channels=n_channels, num_classes=n_classes)
print(model)
    
# define loss function and optimizer
if task == "multi-label, binary-class":
    criterion = nn.BCEWithLogitsLoss()
else:
    criterion = nn.CrossEntropyLoss()
    
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

In [None]:
wandblogger = WandbLogger()

for epoch in range(NUM_EPOCHS):
    train_correct = 0
    train_total = 0
    test_correct = 0
    test_total = 0
    
    model.train()
    for inputs, targets in tqdm(train_loader):
        # forward + backward + optimize
        optimizer.zero_grad()
        outputs = model(inputs)
        
        if task == 'multi-label, binary-class':
            targets = targets.to(torch.float32)
            loss = criterion(outputs, targets)
        else:
            targets = targets.squeeze().long()
            loss = criterion(outputs, targets)
        
        loss.backward()
        optimizer.step()

In [None]:
# evaluation

def test(split):
    model.eval()
    y_true = torch.tensor([])
    y_score = torch.tensor([])
    
    data_loader = train_loader if split == 'train' else test_loader

    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)

            if task == 'multi-label, binary-class':
                targets = targets.to(torch.float32)
                m = nn.Sigmoid()
                outputs = m(outputs)
            else:
                targets = targets.squeeze().long()
                m = nn.Softmax(dim=1)
                outputs = m(outputs)
                targets = targets.float().resize_(len(targets), 1)

            y_true = torch.cat((y_true, targets), 0)
            y_score = torch.cat((y_score, outputs), 0)

        y_true = y_true.numpy()
        y_score = y_score.detach().numpy()
        auc = getAUC(y_true, y_score, task)
        acc = getACC(y_true, y_score, task)
    
        print('%s  acc: %.3f  auc:%.3f' % (split, acc, auc))

        
print('==> Evaluating ...')
test('train')
test('test')

## D. Unsupervised learning on Fashion-MNIST

- Apply PCA to all images of these two chosen classes. Visualise the top 5 eigenvectors as images and display them in the order of descending corresponding values (the one corresponding to the largest eigenvalue first). [1 marks]
- Use the top 30 PCs to reconstruct 10 images, with 5 from each class (any 5 images are fine from each class). Show these 10 pairs of reconstructed and original images. [1 marks]

- Visualise the two-dimensional PCA representations of all data points in a 2D plane (i.e. using the top two PCs). Use different colours/markers for the two classes for better visualisation (Hint: You need to use the class labels here for visualisation). [1 marks]
- Use spectral clustering to cluster all data points as represented by the top two PCs (clustering of two-dimensional vectors, where each vector has two values, PC1 and PC2). Visualise the two clusters with different colours/markers in 2D. [2 marks].
- Design a new autoencoder with five Conv2d layers and five ConvTranspose2d layers. You are free to choose the activation functions and settings such as stride and padding. Train this new autoencoder on all images of these two chosen classes for at least 20 epochs. Plot the loss against the epoch. [2 marks]

In [None]:
from sklearn.decomposition import PCA
import seaborn as sns
from torch.utils.data import DataLoader

In [None]:
def get_fashion_mnist_test_loaders(batch_size=128):
    """Fashion MNIST dataloader with (32, 32) sized images."""
    # Resize images so they are a power of 2
    all_transforms = transforms.Compose([
        transforms.Resize(32),
        transforms.ToTensor()
    ])
    # Get test data
    test_data = datasets.FashionMNIST('../fashion_data', train=False,
                                      transform=all_transforms, download=True)
    # Create dataloaders
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)
    return test_loader

In [None]:
get_fashion_mnist_test_loaders()

In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import numpy as np
X = np.array(mnist_data[:][0][0].numpy()).reshape(1, 28*28)
for i in range(1, len(mnist_data[:])):
    X = np.append(X,np.array(mnist_data[:][i][0].numpy()).reshape(1, 28*28),axis = 0)
X = StandardScaler().fit_transform(X)

pca = PCA(n_components=0.95)
pca.fit(X)
cov_matrix = np.dot(X.T, X) / len(X)

for eigenvector in pca.components_[:30]:
    print(np.dot(eigenvector.T, np.dot(cov_matrix, eigenvector)))

In [None]:
class Autoencoder(nn.Module):
    """
    An Autoencoder with 5 Conv2d layers and 5 ConvTranspose2d layers
    """
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            # 1 input image channel, 16 output channel, 3x3 square convolution
            nn.Conv2d(1, 16, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 16, 3, stride=2, padding=1),
            nn.ReLu(),
            nn.Conv2d(16, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, 7)
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 7),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 32, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 16, 3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, 3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()  #to range [0, 1]
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
myAE=Autoencoder()
print(myAE)

In [None]:
params = list(myAE.parameters())
print(len(params))
print(params[0].size())  # First Conv2d's .weight
print(params[1].size())  # First Conv2d's .bias
print(params[1])

In [None]:
#Training (optimisation) parameters
batch_size=64
learning_rate=1e-3
max_epochs = 20

#Choose mean square error loss
criterion = nn.MSELoss() 
#Choose the Adam optimiser
optimizer = torch.optim.Adam(myAE.parameters(), lr=learning_rate, weight_decay=1e-5)
#Specify how the data will be loaded in batches (with random shuffling)
train_loader = torch.utils.data.DataLoader(mnist_data, batch_size=batch_size, shuffle=True)
#Storage
outputs = []

#Start training
for epoch in range(max_epochs):
    for data in train_loader:
        img, label = data
        optimizer.zero_grad()
        recon = myAE(img)
        loss = criterion(recon, img)
        loss.backward()
        optimizer.step()            
    if (epoch % 2) == 0:
        print('Epoch:{}, Loss:{:.4f}'.format(epoch+1, float(loss)))
    outputs.append((epoch, img, recon),)