<a href="https://colab.research.google.com/github/mgupta325/colab-project1/blob/master/ECE_6258_Autoencoders_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**The ECE 6258 assingment for autoencoders**



*   shallower network
*   reconstruction(fc nonlinear , fc linear, sparse, denoise)
*   denoise (3 level)














## Enabling and testing the GPU

First, you'll need to enable GPUs for the notebook:

- Navigate to Edit→Notebook Settings
- select GPU from the Hardware Accelerator drop-down

Next, we'll confirm that we can connect to the GPU with PyTorch:

In [0]:
import torch

print(torch.cuda.current_device())
#0
print(torch.cuda.device_count())
#1
print(torch.cuda.get_device_name(0))
#Tesla K80
print(torch.cuda.is_available())
#True

0
1
Tesla K80
True


In [0]:
####### importing python packages ######
import os

import torch
import torchvision
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.utils import save_image 
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
import gzip
import pickle
from PIL import Image
import cv2
# make dir to save images
if not os.path.exists('./in_out_img'):
    os.mkdir('./in_out_img')

**training configs (user defined)**      
**please adjust the training hyperparameters and architectures in this part**

**models (nonlinear_AE  |  linear_AE  |  sparse_AE| conv_AE | denoise_AE | )** 

In [0]:
# define the non-linear autoencoder  
class nonlinear_AE(nn.Module):
    def __init__(self):
        super(nonlinear_AE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(True),)
        self.decoder = nn.Sequential(
            nn.Linear(128, 28 * 28), 
            nn.Sigmoid())

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


# define the linear autoencoder 
class linear_AE(nn.Module):
    def __init__(self):
        super(linear_AE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            )
        self.decoder = nn.Sequential(
            nn.Linear(128, 28 * 28), 
            )
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

#define the sparse autoencoder
class sparse_AE(nn.Module):
    def __init__(self):
        super(sparse_AE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(inplace=True),
        )
        self.decoder = nn.Sequential(
            nn.Linear(128, 28*28),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
 
# define the convolutional autoencoder  16 -- 8 -- 16 -- 1  # the conv2d + unpooling implementation
class conv_AE(nn.Module):
    def __init__(self):
        super(conv_AE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, padding=1),   # single channel grayscale images  
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 8, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2)
        )  ## latent features has dimention of N, 8, 7, 7

        self.decoder = nn.Sequential(
            # the Transposed Convolutions in decoder layers
            # output_size = stride*(input-1)+kernel_size-2*padding
            nn.ConvTranspose2d(8, 16, 2, stride=2),  #2x (7-1) + 2 = 14
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(16, 1, 2, stride=2),    # 2x(14-1)+2 = 28
            nn.Sigmoid()
        )
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
    


def sparse_loss(autoencoder, inputs):
    loss = 0
    out = inputs
    fc_layer = list(autoencoder.encoder.children())[0]
    relu = list(autoencoder.encoder.children())[1]
    out = relu(fc_layer(out))
    loss += kl_divergence(sparsity_parameter, out)
    return loss

def kl_divergence(p, p_hat):
    funcs = nn.Sigmoid()
    p_hat = torch.mean(funcs(p_hat), 1)
    p_tensor = torch.Tensor([p] * len(p_hat)).cuda()
    return torch.sum(p_tensor * torch.log(p_tensor) - p_tensor * torch.log(p_hat) + (1 - p_tensor) * torch.log(1 - p_tensor) - (1 - p_tensor) * torch.log(1 - p_hat))

In [0]:
## Choosing model
arch = 'linear_AE'   # 'nonlinear_AE' | 'linear_AE' | 'sparse_AE' | 'conv_AE' | 'denoise_AE' | 

**load model**
1. Upload the provided 5 models
2.run the cell below to load the model based on the arch you choose

In [0]:
model_state_dict = torch.load('./' + arch + '_best.pth')

if arch == 'nonlinear_AE':  
    model = nonlinear_AE().cuda()
elif arch == 'linear_AE':
    model = linear_AE().cuda()
elif arch == 'sparse_AE':
    model = sparse_AE().cuda()
elif arch == 'conv_AE':
    model = conv_AE().cuda()
elif arch == 'denoise_AE':
    model = nonlinear_AE().cuda()

model.load_state_dict(model_state_dict)
print(model)


nonlinear_AE(
  (encoder): Sequential(
    (0): Linear(in_features=784, out_features=128, bias=True)
    (1): ReLU(inplace=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=128, out_features=784, bias=True)
    (1): Sigmoid()
  )
)


# New Section

**upload your own local image file to notebook directory**    
everytime you restart notebook, you need to upload local files


In [0]:
dataset = MNIST('./data', train=True, download=True,transform=None)
# fix random seed for reproducible results
seed = 0
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)
# 
# define the transformation applied to MNIST images
transform = transforms.Compose([
    transforms.ToTensor(),
])

# train/test loader 
train_loader = DataLoader(MNIST('./data', train=True, download=True, 
                          transform=transform),  shuffle=True)
test_loader = DataLoader(MNIST('./data', train=False, 
                          transform=transform), shuffle=False)
filename = [
["training_images","./data/MNIST/raw/train-images-idx3-ubyte.gz"],
["test_images","./data/MNIST/raw/t10k-images-idx3-ubyte.gz"],
["training_labels","./data/MNIST/raw/train-labels-idx1-ubyte.gz"],
["test_labels","./data/MNIST/raw/t10k-labels-idx1-ubyte.gz"]
]

# load downloaded .gz files to and save images as numpy arrays 
def save_mnist():
    mnist = {}
    for name in filename[:2]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28*28)
    # for name in filename[-2:]:
    #     with gzip.open(name[1], 'rb') as f:
    #         mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8)
    with open("mnist.pkl", 'wb') as f:
        pickle.dump(mnist,f)
    print("Save complete.")

save_mnist()

with open("mnist.pkl",'rb') as f:
    mnist = pickle.load(f)
    mnist_train_X = mnist["training_images"]  # 60000, 784
    mnist_test_X = mnist["test_images"]  # 10000, 784

# TODO:
##1.use your own image and resize it to (28,28) as test image 1
##2.define idx[0,9999] to pick one test image from mnist as test image 2

##enter your image path here(you have to upload it first)
img = Image.open('/content/airplane_downsample_gray_square.jpg') ##You can change this
test_img_1 = np.array(img.resize((28,28)))
##choose an idx here
idx = 0 ##You can change this
test_img_2 = mnist_test_X[idx].reshape(28,28)  / 255.



Save complete.


In [0]:
#TODO: define a noise ratio to evaluate the denoise autoencoder[0.1,0.4,0.9]
noise_ratio = 0.5
def evaluate(model, img):
    recons_loss = nn.MSELoss()
    model.eval()
    if arch != 'conv_AE':
            # TODO: 1 1 784
            img = torch.from_numpy(img.reshape(1,1,784)).float().cuda()
          
            print(img.shape)

    elif arch == 'denoise_AE':
            noise_img = img * (1 - noise_ratio) + torch.rand(img.size()) * noise_ratio      # add noise onto input images
            noise_img = Variable(noise_img.cuda())
            img = Variable(img.cuda())
            out = model(noise_img)
    else:
         img = Variable(torch.from_numpy(img.reshape(1,1,28,28)).float().cuda())
    out = model(img.cuda())
            
    loss = recons_loss(out, img)
    
    return loss
  
if arch != 'denoise_AE':
  MSEloss_1 = evaluate(model,test_img_1)
  MSEloss_2 = evaluate(model,test_img_2)
  print("loss for test_img1 = ",MSEloss_1)
  print("loss for test_img2 = ",MSEloss_2)
else:
  plt.figure()
  plt.imshow(test_img_2, cmap='gray')
  plt.axis('off')
  Image.fromarray(test_img_2).convert("L").save("clean_input.jpg")
  plt.savefig('clean_input.png')

  noise = torch.rand(torch.Size([28, 28])).numpy()
  plt.figure()
  plt.imshow(noise,cmap='gray')
  plt.axis('off')
  Image.fromarray(noise).convert("L").save("rand_noise.jpg")
  plt.savefig('rand_noise.png')

  corrupt_img =test_img_2 * (1 - noise_ratio) + noise * noise_ratio 
  plt.figure()
  plt.imshow(corrupt_img,cmap='gray')
  plt.axis('off')
  Image.fromarray(corrupt_img).convert("L").save("corrupt_input.jpg")
  plt.savefig('corrupt_input.png')
    

  img = torch.from_numpy(test_img_2.reshape(1,1,784)).float()
  noise_img =  img * (1 - noise_ratio) + torch.from_numpy(noise.reshape(1,1,784))*noise_ratio 
  out = model(noise_img.cuda()).view(28,28).cpu().detach().numpy()
  plt.figure()
  plt.imshow(out,cmap='gray')
  plt.axis('off')
  Image.fromarray(corrupt_img).convert("L").save("denoised_output.jpg")
  plt.savefig('denoised_output.png')
  
  



torch.Size([1, 1, 784])
torch.Size([1, 1, 784])
loss for test_img1 =  tensor(22393.8730, device='cuda:0', grad_fn=<MseLossBackward>)
loss for test_img2 =  tensor(0.0026, device='cuda:0', grad_fn=<MseLossBackward>)
