In [2]:
import torch
import torch.nn as nn #oop
import torch.nn.functional as F #functions
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import numpy as np
import matplotlib.pyplot as plt

#from sklearn import datasets
from tqdm import tqdm

import sklearn.metrics as sk_m
from sklearn.metrics import plot_confusion_matrix
import pandas as pd
import seaborn as sn

from IPython import embed

import os
import pickle
import shutil

%matplotlib notebook

In [3]:
class Pytorch_Dataset(torch.utils.data.Dataset):
    
    def __init__(self, data, transforms):
        
        super(Pytorch_Dataset, self).__init__()
        
        self.data = data
        self.transforms = transforms
        
        self.create_dataset()
        
    def create_dataset(self):
        
        self.dataset = []
        for sample, label in zip(self.data.samples, self.data.labels):
            self.dataset.append([sample, label])

    def transform_samples(self, samples):
        
        return self.transforms(samples)
    
    def transform_labels(self, labels):
        
        return torch.tensor(labels)
        
    def __getitem__(self, index):
        
        # do transforms here :)
        
        sample, label = self.dataset[index]
        
        if(not(isinstance(sample, torch.Tensor))):
            sample = self.transform_samples(sample).float()
            
        if(not(isinstance(label, torch.Tensor))):
            label = self.transform_labels(label).float()
        
        return sample, label                            
        
    def __len__(self):
        
        return len(self.dataset)

In [4]:
class Dataset:
    
    def __init__(self, samples, labels):
        self.samples = samples
        self.labels = labels

In [5]:
# methods for dealing with data, software enginnering

def create_folder(path):
    
    if(os.path.exists(path)):
        shutil.rmtree(path)
        
    os.makedirs(path)

def one_hot_encode(all_labels, num_classes):
    
    ohv_labels = np.zeros((len(all_labels), num_classes))
    
    for i, current_label in enumerate(all_labels):
        ohv_labels[i][current_label] = 1
        
    return ohv_labels
    
def get_dataset(which_dataset):
    
    if(which_dataset == "mnist".lower()):
        
        train = datasets.MNIST(root='../DATA', train=True, download=True)
        test = datasets.MNIST(root='../DATA', train=False, download=True)
        
    if(which_dataset == "fashion".lower()):
        
        train = datasets.FashionMNIST(root='../DATA', train=True, download=True)
        test = datasets.FashionMNIST(root='../DATA', train=False, download=True)
    
    train.data, test.data = train.data.numpy(), test.data.numpy()
    train.targets, test.targets = train.targets.numpy(), test.targets.numpy()
    
    # This allows MSE - if I use torch.nn.CrossEnropy(), turn the next 2 lines off :D
    
    train.targets = one_hot_encode(train.targets, 10)
    test.targets = one_hot_encode(test.targets, 10)
    
    train, test = Dataset(train.data, train.targets), Dataset(test.data, test.targets)
    
    sample_transforms = transforms.Compose([transforms.ToTensor(), 
                                           transforms.Normalize((0.5), (0.5))])
    
    train, test = Pytorch_Dataset(train, sample_transforms), Pytorch_Dataset(test, sample_transforms)
    
    # batch_size: how many samples at a time we pass to the model (generally 8 to 64). helps us generalize (each time it optimizes, optimizations that "stick around" tend to be general, rather than overfit, features). also helps training time
    # shuffle: helps with generalization. don't want to learn all 1s, then all 2s, and so on.
    
    train = torch.utils.data.DataLoader(train, batch_size=16, shuffle=True)
    test = torch.utils.data.DataLoader(test, batch_size=1, shuffle=False)
                                           
    return train, test

In [6]:
class Net(torch.nn.Module): #inherits from nn.Module
    
    def __init__(self, lr, num_features, num_classes, loss):
        
        super(Net, self).__init__() #initialize nn.Module
    
        self.alpha = lr
        self.num_features = num_features
        self.num_classes = num_classes
        self.loss_choice = loss
            
        # Create MLP
        
        self.network = torch.nn.Sequential(torch.nn.Linear(num_features, 512),
                                           torch.nn.ReLU(),
                                           torch.nn.Linear(512, 256),
                                           torch.nn.ReLU(),
                                           torch.nn.Linear(256, num_classes))
        # CNN
        
        
        
        
        # RBFN
        
        
        
    def init_optimizer(self):
        
        self.optimizer = torch.optim.Adam(self.parameters(), lr = self.alpha)
        
        #self.optimizer = torch.optim.SGD(self.parameters(), lr = self.alpha)

    def objective(self, preds, labels): # this is the loss function
        
        #preds = F.log_softmax(preds, dim = 1) # dim 1: distribute across output layer of tensors. like np's axis param
    
        loss = torch.nn.MSELoss()
        
        return loss(preds, labels)
    
    def forward(self, x): # you can complicate the network in the forward() method.
        
        x = self.network(x)
        
        return x
    
def train(model, train_dataset, test_dataset, title, num_epochs = 9, rate = 2):
    
    training_loss, training_metrics = [], []

    model.init_optimizer()

    for epoch in range(num_epochs):

        # Train network

        epoch_loss = 0
        
        for i, data, in enumerate(tqdm(train_dataset, desc = "Train Epoch %s" % epoch)):
            
            sample, label = data
            
            sample = sample.to("mps")
            label = label.to("mps")
            
            # For CNN turn off this line below!
            
            sample = sample.view(-1, 784)
            
            preds = model(sample)
            
            loss = model.objective(preds, label.float())

            epoch_loss = epoch_loss + loss.item()

            model.optimizer.zero_grad() # zero the gradients after every batch
            
            loss.backward()
            
            model.optimizer.step() # adjust the weights

        epoch_loss = epoch_loss / (i + 1)
        
        print(epoch_loss)

        training_loss.append(epoch_loss)

        # Validate network
        
        if(epoch % rate == 0):
            
            model.eval()
            
            acc = 0
            all_labels, all_preds = [], []
            for i, (sample, label) in enumerate(tqdm(test_dataset, desc = "Test Epoch %s" % epoch)):
                
                sample = sample.view(-1, 784)
                
                sample = sample.to("mps")

                logits = model(sample)
                pred = torch.argmax(logits)

                label = np.argmax(label.numpy())
                
                all_preds.append(int(pred.detach().cpu().numpy()))
                all_labels.append(label)
                
                
                if(pred == label):
                    acc += 1
                    
            acc = acc / (i + 1)
            
            #embed()
            
            print("Valid Accuracy %s" % acc)
                
            ##get metrics
            
            #all_labels
            #cf_matrix = sk_m.confusion_matrix(all_labels, all_preds)
            
            #epoch_accuracy = calculate_accuracy(np.asarray(all_preds), np.asarray(all_labels))

#             training_metrics.append(cf_matrix)
            
            model.train()
            
    return training_loss, training_metrics

In [7]:
title = 'test'

which_dataset = "mnist" # "mnist" or "fashion"

loss_choice = "mse"

batch_size = 10
num_features = 784
num_classes = 10

alpha = 1e-4

exp_name = f"MLP on {which_dataset}"

path_save = f"/Users/andyvarner/Documents/NN_Spring2023/project_1/results/MLP_{which_dataset}/"
path = os.path.join(path_save, exp_name)
create_folder(path)     

train_dataset, test_dataset = get_dataset(which_dataset)

model = Net(alpha, num_features, num_classes, loss_choice).to("mps")

#train_res, valid_res = train(model,train_dataset, test_dataset, title)

train_loss, train_metrics = train(model, train_dataset, test_dataset, title)

Train Epoch 0:  10%|█████████████▌                                                                                                                       | 382/3750 [00:02<00:22, 147.93it/s]


KeyboardInterrupt: 

In [None]:
# for batch, (x_train, y_train) in enumerate(train_dataset):
#     print(batch)
        
#     title = "task_%s" % (batch)
              
#     model = MLP(alpha, num_features, num_classes, loss_choice)
#     train_results, valid_results = train(model, train_dataset, test_dataset, title)

#     results = {}
#     results["train"] = train_results
#     results["test"] = valid_results

#     #dump train_results

#     path_save = os.path.join(path, "task_%s" % (str(batch).zfill(3)))

#     if(not(os.path.exists(path_save))):
#         create_folder(path_save)

#     title = "fold_%s.pkl" % (str(j).zfill(3))

#     filename = os.path.join(path_save, title)

#     pickle.dump(results, open(filename, "wb"))




In [None]:
type(all_samples)

In [None]:
type(all_labels)