In [5]:
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

In [15]:
DATA_DIR = '/home/veeraitalinna/thesis/data'
TRAIN_DATA_FPATH = os.path.join(DATA_DIR, 'normative_data.csv')
BATCH_SIZE = 32
TEST_SPLIT = 0.2
RANDOM_SEED = 42
N_EPOCHS = 4
SKIP_TRAINING = False
LEARNING_RATE = 0.001

In [10]:
# get the computation device
def get_device():
    if torch.cuda.is_available():
        device = 'cuda:0'
    else:
        device = 'cpu'
    return device
device = get_device()

## Loading data

In [16]:
dataset = torch.Tensor(np.genfromtxt(TRAIN_DATA_FPATH, delimiter=','))
dataset.shape

  """Entry point for launching an IPython kernel.


torch.Size([0])

In [None]:
# Creating data indices for training and validation splits:
dataset_size = len(dataset)
test_size = int(TEST_SPLIT * dataset_size)
train_size = dataset_size - test_size
trainset, testset = torch.utils.data.random_split(dataset, [train_size, test_size])

In [None]:
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)

## Defining the model

In [17]:
class Encoder(nn.Module):
    def __init__(self, n_components):
        super(Encoder, self).__init__()
        self.fc1 = nn.Linear(784, 1000)
        self.fc2 = nn.Linear(1000, 500)
        self.fc3 = nn.Linear(500, 250)
        self.fc4 = nn.Linear(250, n_components)

    def forward(self, x):
        x = x.reshape(-1, 784)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [18]:
def test_Encoder_shapes():
    n_components = 2
    encoder = Encoder(n_components)
    
    x = torch.randn(3, 1, 28, 28)
    y = encoder(x)
    assert y.shape == torch.Size([3, n_components]), f"Bad y.shape: {y.shape}"
    print('Success')

test_Encoder_shapes()

Success


In [19]:
class Decoder(nn.Module):
    def __init__(self, n_components):
        # YOUR CODE HERE
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(n_components, 250)
        self.fc2 = nn.Linear(250, 500)
        self.fc3 = nn.Linear(500, 1000)
        self.fc4 = nn.Linear(1000, 784)

    def forward(self, x):
        # YOUR CODE HERE
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        x = x.reshape(-1, 1, 28, 28)
        return x

In [20]:
def test_Decoder_shapes():
    n_components = 2
    decoder = Decoder(n_components)
    
    x = torch.randn(3, n_components)
    y = decoder(x)
    assert y.shape == torch.Size([3, 1, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)
    print('Success')

test_Decoder_shapes()

Success


## Training

In [None]:
# Create a deep autoencoder
n_components = 10
encoder = Encoder(n_components)
encoder.to(device)

decoder = Decoder(n_components)
decoder.to(device)

In [None]:
def fit(encoder, decoder, dataloader, epoch):
    print('Training')
    model.train()
    running_loss = 0.0

    for i, (images, _) in tqdm(enumerate(dataloader), total=int(len(trainset)/dataloader.batch_size)):
        optimizer.zero_grad()
        
        z = encoder(images)
        x_hat = decoder(z)
        
        loss = criterion(images, x_hat)

        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    epoch_loss = running_loss / len(dataloader)
    print(f"Train Loss: {epoch_loss:.3f}")

    return epoch_loss

In [None]:
def validate(encoder, decoder, dataloader, epoch):
    print('Validating')
    model.eval()
    running_loss = 0.0
    
    with torch.no_grad():
        for i, (images, _) in tqdm(enumerate(dataloader), total=int(len(testset)/dataloader.batch_size)):
            z = encoder(images)
            x_hat = decoder(z)

            loss = criterion(images, x_hat)
            
    epoch_loss = running_loss / len(dataloader)
    print(f"Val Loss: {epoch_loss:.3f}")  

    return epoch_loss

In [None]:
# Training loop
if not SKIP_TRAINING:
    parameters = list(encoder.parameters()) + list(decoder.parameters())
    optimizer = optim.Adam(parameters, lr=LEARNING_RATE)
    criterion = nn.MSELoss()
    
    train_loss = []
    val_loss = []
    
    start = time.time()
    for epoch in range(N_EPOCHS):
        print(f"Epoch {epoch+1} of {EPOCHS}")
        
        train_epoch_loss = fit(encoder, decoder, trainloader, epoch)
        val_epoch_loss = validate(encoder, decoder, testloader, epoch)
        
        train_loss.append(train_epoch_loss)
        val_loss.append(val_epoch_loss)
        
    end = time.time()

    print(f"{(end-start)/60:.3} minutes")

In [None]:
if not SKIP_TRAINING:
    tools.save_model(encoder, 'encoder.pth')
    tools.save_model(decoder, 'decoder.pth')
else:
    encoder = Encoder(n_components=10)
    tools.load_model(encoder, 'encoder.pth', device)

    decoder = Decoder(n_components=10)
    tools.load_model(decoder, 'decoder.pth', device)

In [None]:
# loss plots
plt.figure(figsize=(10, 7))
plt.plot(train_loss, color='orange', label='training loss')
plt.plot(val_loss, color='red', label='validataion loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()