In [1]:
%load_ext autoreload

import os
import sys
import glob

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

import torch
import torch.nn as nn

module_path = os.path.abspath(os.path.join('../../py-conjugated/'))
if module_path not in sys.path:
    sys.path.append(module_path)
import morphology_networks as net
import model_training as train
import model_testing as test
import physically_informed_loss_functions as pilf
import network_utils as nuts

torch.manual_seed(28)

<torch._C.Generator at 0x1a211101b0>

In [2]:
train_data_path = '/Volumes/Tatum_SSD-1/Grad_School/m2py/Morphology_labels/OPV_morph_maps/train_set/'
test_data_path = '/Volumes/Tatum_SSD-1/Grad_School/m2py/Morphology_labels/OPV_morph_maps/test_set/'

In [4]:
train_dataset = nuts.local_OPV_ImDataset(train_data_path)
test_dataset = nuts.local_OPV_ImDataset(test_data_path)

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size = 13)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size = 10)

In [15]:
class ConvolutionalEncoder(nn.Module):
    """
    ConvolutionalEncoder() relies on PyTorch API to convolve and compress image-like data
    from SPM analyses. The stack of 2-dimensional SPM channels is encoded into a 1-dimensional
    torch.Tensor(), which describes the image encoding into feature-space
    """
    
    def __init__(self, im_z, fc_nodes):
        super(ConvolutionalEncoder, self).__init__()
        
        self.z = im_z
        self.fc_nodes = fc_nodes
        
        self.conv_pool1 = nn.Sequential(
            nn.Conv2d(im_z, 32, kernel_size = 5, stride = 1, padding = 4),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )
        
        self.conv_pool2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )
        
        self.conv_pool3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),
            nn.Flatten()
        )
        
        self.linear_layers = nn.Sequential(
            nn.Dropout(),               #to avoid over-fitting
            nn.Linear(self.fc_nodes, 5000),
            nn.ReLU(),
            nn.Linear(5000, 100),
            nn.ReLu()
        )
        
    def forward(self, im):
        conv_enc = self.conv_pool1(im)
        conv_enc = self.conv_pool2(conv_enc)
        conv_enc = self.conv_pool3(conv_enc)
        
        linear_enc = self.linear_layers(conv_enc)
        
        return linear_enc

In [16]:
class ConvolutionalDecoder(nn.Module):
    """
    ConvolutionalDecoder() relies on PyTorch API to convolve and decompress encodings of
    image-like data from SPM analyses. The stack of 2-dimensional SPM channels are decoded 
    from a 1-dimensional torch.Tensor(), which reconstructs the image encoding from feature-space
    """
    
    def __init__(self, im_z, fc_nodes):
        super(ConvolutionalDecoder, self).__init__()
        
        self.z = im_z
        self.fc_nodes = fc_nodes
        
        self.linear_layers = nn.Sequential(
            nn.Linear(100, 5000),
            nn.ReLu(),
            nn.Linear(5000, self.fc_nodes),
            nn.ReLU(),
            
        )
        
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 2, stride = 2),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 2, stride = 2),
            nn.ReLU(),
            nn.ConvTranspose2d(32, im_z, 2, stride = 2),
            nn.ReLU()
        )
        
        
    def forward(self, linear_enc):
        im_enc = self.linear_layers(linear_enc),
        
        im_enc = self.conv_pool3(im_enc),
        im_enc = self.conv_pool2(im_enc),
        decoded_image = self.conv_pool1(im_enc)
        
        return decoded_image

In [17]:
class VAE(nn.Module):
    """
    Variational Autoencoder based on PyTorch module framework. VAE takes in SPM images or
    m2py_labels as a stack of 2-dimensional channels
    """
    
    def __init__(self, im_z, im_x = 256, im_y = 256):
        super(VAE, self).__init__()
        
        self.x = im_x
        self.y = im_y
        self.z = im_z
        
        #After *3* deconvolution and activation layers (decreasing x and y by 50% each),
        #there are *128* channels expanded from the input tensor of the image encoding.
        #May need modification if, stride, padding, convolution layers, or number of
        #channels in output of convolution and pooling are changed.
        
        self.fc_nodes = int((im_x/(2^3))*(im_y/(2^3))*128)
        
        self.encoder = ConvolutionalEncoder(self.z, self.fc_nodes)
        self.decoder = ConvolutionalDecoder(self.z, self.fc_nodes)
        
    def forward(self, original_im):
        encoding = self.encoder(original_im)
        decoded_image = self.decoder(encoding)
        
        return decoded_image, encoding

In [18]:
def train(model, training_dataset, criterion, optimizer):
    model.train()
    
    loss_list = []
    
    batch_iterator = 0
    for images, labels in training_data_set:
        batch_iterator += 1
        print(f'image # {batch_iterator}')
        
        images = images.to(device)
#         labels = labels.to(device)
        
        # Run the forward pass     
        optimizer.zero_grad()
        decoded_images, encoding = model(images)
        
        loss = criterion(decoded_images, images)
        torch.autograd.backward(loss)
        optimizer.step()
        
        loss_list.append(loss.item())
        
    samples = len(loss_list)
    epoch_loss = sum(loss_list)/samples
    
    return epoch_loss
        

In [19]:
def test(model, test_dataset, criterion):
    model.eval()
    
    with torch.no_grad():
        
        results_dict = {}

        batch_iterator = 0
        for images in training_data_set:
            batch_iterator += 1
            print(f'image # {batch_iterator}')
            
            images = images.to(device)
#             labels = labels.to(device)

            # Run the forward pass     
            optimizer.zero_grad()
            decoded_images, encoding = model(images)
            
            results_dict[batch_iterator] = {'original': images,
                                            'decoded': decoded_images}

            loss = criterion(decoded_images, images)
            torch.autograd.backward(loss)

            loss_list.append(loss.item())

    samples = len(loss_list)
    epoch_loss = sum(loss_list)/samples
    
    return epoch_loss, results_dict

In [None]:
model = VAE(im_z = 2)
criterion = BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
epochs = np.arange(0, 10, 1)

train_losses = []
test_losses = []
test_results = {}

for ep in epochs:
    train_loss = train(model, train_dataloader, criterion, optimizer)
    train_losses.append(train_loss)
    
    test_loss, results_dict = test(model, test_dataloader, criterion)
    test_losses.append(test_loss)
    test_results[ep] = results_dict