In [1]:
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset
import torch.nn as nn 
import os
from matplotlib import pyplot as plt
from skimage import io, transform
import numpy as np
from torch.utils.data import DataLoader
import torch.optim as optim

In [2]:
class ImageNetDataset(Dataset):
    
    def __init__(self, root_dir, data_folder, transform=None):
        self.data_path = os.path.join(root_dir,data_folder)
        self.data = os.listdir(self.data_path)
        self.root = root_dir
        self.transform = transform
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_name = self.data[index]
        img_path = os.path.join(self.data_path,img_name)
        image = io.imread(img_path)
        if self.transform:
            image = self.transform(image)
        
        return image,img_name

In [3]:
class addGaussian:
    
    def __init__(self, std_range):
        self.std_range = std_range

    def __call__(self,img):
        minval,maxval = self.std_range
        noise_img = img.astype(np.float)
        stddev = np.random.uniform(minval, maxval)
        noise = np.random.randn(*img.shape) * stddev
        noise_img += noise
        noise_img = np.clip(noise_img, 0, 255).astype(np.uint8)
        return noise_img

In [4]:
class Rescale(object):
    
    def __init__(self, output_size):
        self.output_size = output_size

    def __call__(self, sample):
        image = sample
        
        new_h, new_w = self.output_size

        img = transform.resize(image, (new_h, new_w))

        return img


In [5]:
num_epochs = 200
batch_size = 1
learning_rate = 1e-3

In [6]:
train_dateset = ImageNetDataset('/home/turing/Documents/BE/','data/', 
                               transform=transforms.Compose([
                                   addGaussian((0,50)),
                                   Rescale((256,256)),
                                   transforms.ToTensor()
                               ]))

In [7]:
test_dataset = ImageNetDataset('/home/turing/Documents/BE/','data/', 
                               transform=transforms.Compose([
                                   addGaussian((0,50)),
                                   Rescale((256,256)),
                                   transforms.ToTensor()
                               ]))

In [8]:
def getTrainLoader(batch_size):
    train_loader = DataLoader(train_dateset, batch_size=batch_size,shuffle=True, num_workers=2)
    return(train_loader)
    

In [9]:
def getTestLoader(batch_size):
    test_loader = DataLoader(test_dataset, batch_size=batch_size,shuffle=True, num_workers=2)
    
    return(test_loader)
    

In [10]:
class Net(nn.Module):
    
    def __init__(self):
        super(Net, self).__init__()
        
        self.encoder = nn.Sequential(
            
            # first and second layer.
            # second layer
            nn.Conv2d(3,48,3),
            nn.LeakyReLU(True),            
            nn.Conv2d(48,48,3),
            nn.LeakyReLU(True),
            nn.MaxPool2d(2,2),
            
            # third conv
            nn.Conv2d(48,48,3),
            nn.LeakyReLU(True),
            nn.MaxPool2d(2,2),
            
            # fourth convolve
            nn.Conv2d(48,48,3),
            nn.LeakyReLU(True),
            nn.MaxPool2d(2,2),

            # fifth convolve
            nn.Conv2d(48,48,3),
            nn.LeakyReLU(True),
            nn.MaxPool2d(2,2),
            nn.Conv2d(48,48,3),
            nn.LeakyReLU(True)
        )
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(48,96,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),

            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),

            nn.ConvTranspose2d(96,96,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(96,64,3),
            nn.LeakyReLU(True),
            
            nn.ConvTranspose2d(64,32,3),
            nn.LeakyReLU(True),
            
            
            nn.Conv2d(32,3,3),
        )
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        return x

In [11]:
def createLossAndOptimizer(net, learning_rate=0.001):
    
    #Loss function
    loss = torch.nn.MSELoss()
    
    #Optimizer
    optimizer = optim.Adam(net.parameters(), lr=learning_rate)
    
    return(loss, optimizer)

In [12]:
import time

In [15]:
def trainNet(net, batch_size, n_epochs, learning_rate):
    
    print("===== HYPERPARAMETERS =====")
    print("batch_size=", batch_size)
    print("epochs=", n_epochs)
    print("learning_rate=", learning_rate)
    print("=" * 30)
    
    train_loader = getTrainLoader(batch_size)
    
    test_loader = getTestLoader(batch_size)
    
    n_batches = len(train_loader)
    
    loss, optimizer = createLossAndOptimizer(net, learning_rate)
    
    training_start_time = time.time()
    
    for epoch in range(n_epochs):
        
        running_loss = 0.0
        print_every = n_batches // 10
        start_time = time.time()
        total_train_loss = 0
        
        for inputs, labels in zip(train_loader, test_loader):
            
            
            print(inputs[0].shape)
            optimizer.zero_grad()
            
            outputs = net(inputs[0].float())
            print(outputs[0].shape)
            loss_size = loss(outputs, labels[0].float())
            loss_size.backward()
            optimizer.step()
            
            running_loss += loss_size.data[0]
            
            total_train_loss += loss_size.data[0]
            
            if (i + 1) % (print_every + 1) == 0:
                print("Epoch {}, {:d}% \t train_loss: {:.2f} took: {:.2f}s".format(
                        epoch+1, int(100 * (i+1) / n_batches), running_loss / print_every, time.time() - start_time))
                #Reset running loss and time
                running_loss = 0.0
                start_time = time.time()

In [None]:
net = Net()
trainNet(net, batch_size=batch_size, n_epochs=num_epochs, learning_rate=learning_rate)