In [1]:
import numpy as np
import matplotlib.pyplot as plt
import os.path
from time import time, strftime, gmtime

from sklearn.model_selection import train_test_split

from torch import nn
from torch.utils.data import TensorDataset, DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose, Normalize

from tqdm import tqdm

import sys
sys.path.append('../')

from loss_functions import *
from train_test import train, test

In [2]:
def get_data(eta=0.0):
    
    # 100 features
    # 10 classes
    
    X_train = np.load('data/Xtrain-eta{}-data.npy'.format(eta))
    X_test  = np.load('data/Xtest-eta{}-data.npy'.format(eta))
    y_train = np.load('data/ytrain-eta{}-label.npy'.format(eta))
    y_test  = np.load('data/ytest-eta{}-label.npy'.format(eta))

    train_data = TensorDataset(torch.tensor(X_train, dtype=float), torch.tensor(y_train, dtype=int))
    test_data  = TensorDataset(torch.tensor(X_test, dtype=float), torch.tensor(y_test, dtype=int))

    # Create data loaders
    batch_size_train = 20
    batch_size_test  = 20

    train_dataloader = DataLoader(train_data, batch_size=batch_size_train, shuffle=True)
    test_dataloader  = DataLoader(test_data,  batch_size=batch_size_test,  shuffle=True)

    for X, y in test_dataloader:
        print("Shape of X [N, C, H, W]: ", X.shape)
        print("Shape of y: ", y.shape, y.dtype)
        break
        
    return train_dataloader, test_dataloader

In [3]:
# Create model

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

# Define model: feedforward network 100->80->40->20->10
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.layers = nn.Sequential(
            nn.Linear(100, 80),
            nn.ReLU(),
            nn.Linear(80, 40),
            nn.ReLU(),
            nn.Linear(40, 20),
            nn.ReLU(),
            nn.Linear(20, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.layers(x)
        return logits
    
model = NeuralNetwork().to(device)
print(model)

Using cpu device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (layers): Sequential(
    (0): Linear(in_features=100, out_features=80, bias=True)
    (1): ReLU()
    (2): Linear(in_features=80, out_features=40, bias=True)
    (3): ReLU()
    (4): Linear(in_features=40, out_features=20, bias=True)
    (5): ReLU()
    (6): Linear(in_features=20, out_features=10, bias=True)
  )
)


In [4]:
def run(train_dataloader, test_dataloader, model, loss_fn, optimizer, epochs):

    train_loss0 = []
    train_acc0  = []

    train_loss = []
    train_acc  = []
    test_loss  = []
    test_acc   = []

    tr_ls, tr_ac = test(train_dataloader, model, loss_fn)
    train_loss.append(tr_ls)
    train_acc.append(tr_ac)

    te_ls, te_ac = test(test_dataloader, model, loss_fn)
    test_loss.append(te_ls)
    test_acc.append(te_ac)

    for t in tqdm(range(epochs)):
        #print(f"Epoch {t+1}")

        tr_ls0, tr_ac0 = train(train_dataloader, model, loss_fn, optimizer)
        train_loss0.extend(tr_ls0)
        train_acc0.extend(tr_ac0)  

        tr_ls, tr_ac = test(train_dataloader, model, loss_fn)
        train_loss.append(tr_ls)
        train_acc.append(tr_ac)

        te_ls, te_ac = test(test_dataloader, model, loss_fn)
        test_loss.append(te_ls)
        test_acc.append(te_ac)

        #print(f"Test accuracy: {(100*te_ac):>0.1f}%, Test loss: {te_ls:>8f}")
        
    return train_loss0, train_acc0, train_loss, train_acc, test_loss, test_acc

In [5]:
# Create folder
folder = 'results-test'
if not os.path.exists(folder):
    os.mkdir(folder)

def my_MSE_loss(x,y):
    return MSE_loss(x,y,10)

def my_MAE_loss(x,y):
    return MAE_loss(x,y,10)

n_trials = 5
epochs   = 20

# Loss functions
Loss_fn = [my_MSE_loss, my_MAE_loss, CE_loss, FR_loss, H_loss]
titles  = ['MSE', 'MAE', 'Cross-entropy', 'Fisher-Rao', 'Hellinger']

# Learning rates
Lr_all   = [[0.4], [0.2], [0.04], [0.04], [0.04]]

# Noise rates
eta_all = [0.0]

# For each eta
for eta in eta_all:
    
    print('\n***eta={}***'.format(eta))

    # Load data
    train_dataloader, test_dataloader = get_data(eta=eta)
    
    # For each experiment (trial)
    #for i_trial in range(n_trials):
    for i_trial in range(1,5):

        tic = time()
        print('\n**Experiment: {}/{}**'.format(i_trial,n_trials))

        # Choose loss function
        for i_loss in range(len(Loss_fn)):

            loss_fn = Loss_fn[i_loss]
            print(titles[i_loss])

            # Choose learning rate
            for lr in Lr_all[i_loss]:

                model = NeuralNetwork().to(device)

                optimizer = torch.optim.SGD(model.parameters(), lr=lr)

                train_loss0, train_acc0, train_loss, train_acc, test_loss, test_acc\
                        = run(train_dataloader, test_dataloader, model, loss_fn, optimizer, epochs)

                filename = folder+'/synthetic_{}_lr{}_eta{}_trial{}'.format(titles[i_loss],lr,eta,i_trial)
                np.save(filename, [train_loss0, train_acc0, train_loss, train_acc, test_loss, test_acc])

        toc = time()
    print('Elapsed time: ' + strftime("%H:%M:%S", gmtime(toc-tic)))


***eta=0.0***
Shape of X [N, C, H, W]:  torch.Size([20, 100])
Shape of y:  torch.Size([20]) torch.int64

**Experiment: 1/5**
MSE


 15%|██████▌                                     | 3/20 [00:02<00:14,  1.14it/s]


KeyboardInterrupt: 