# Image denoising
The aim of this project is to prepare neural network which takes images with noise and returns clean one.

### Data equality
Firstly, I want to check if all files in distorted directory have corresponding clean images. If not, data will be equalize.

In [1]:
import os

def get_filenames(path_to_data):
    return sorted([file_name for file_name in os.listdir(path_to_data) 
                        if file_name.endswith('.jpg')], key=lambda x: int(x.split('.')[0]))

    
def check_data_equality(distorted_filenames, clean_filenames):
    try:
        if distorted_filenames != clean_filenames:
            raise ValueError('Filenames for distorted and clean data are not the same.')
    except ValueError as e:
        print(str(e) + ' Making them equal.')
        distorted_filenames = sorted(list(set(distorted_filenames) & set(clean_filenames)), key=lambda x: int(x.split('.')[0]))
        clean_filenames = distorted_filenames
    return distorted_filenames, clean_filenames
    
    
distorted_train_path = 'data/train/distorted/'
clean_train_path = 'data/train/clean/'

distorted_validation_path = 'data/validation/distorted/'
clean_validation_path = 'data/validation/clean/'

distorted_train_filenames, clean_train_filenames = check_data_equality(
    get_filenames(distorted_train_path + '/images'), get_filenames(clean_train_path + '/images'))

distorted_validation_filenames, clean_validation_filenames = check_data_equality(
    get_filenames(distorted_validation_path + '/images'), get_filenames(clean_validation_path + '/images'))

Data as tensors will be stored in datasets prepared for working with images.

In [2]:
import torch
import torchvision
from torchvision.transforms import Compose, ToTensor, Normalize, Lambda
torch.manual_seed(1)

def create_dataset(data_path: str) -> torch.utils.data.Dataset:
    dataset = torchvision.datasets.ImageFolder(
        root=data_path,
        transform=ToTensor()
    )
    return dataset

As PyTorch does not have dataset for images as input and output, I have prepared custom one.

In [3]:
class CustomImageDataset(torch.utils.data.Dataset):
    
    def __init__(self, distorted_dataset, clean_dataset):
        self.distorted_dataset = distorted_dataset
        self.clean_dataset = clean_dataset

    def __getitem__(self, index):
        return self.distorted_dataset[index][0], self.clean_dataset[index][0]

    def __len__(self):
        return len(self.clean_dataset)

### Data split (train & validation)
Data from distorted and clean directories was splitted into train and validation.
Train consists of 80% of examples. Other 20% is assigned to validation.
I splitted data manually - first two hundreds of each thousand is taken into validation data.

In [4]:
distorted_train_dataset = create_dataset(distorted_train_path)
clean_train_dataset = create_dataset(clean_train_path)
train_dataset = CustomImageDataset(distorted_train_dataset, clean_train_dataset)

In [5]:
batch_size = 64
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [6]:
distorted_validation_dataset = create_dataset(distorted_validation_path)
clean_validation_dataset = create_dataset(clean_validation_path)
validation_dataset = CustomImageDataset(distorted_validation_dataset, clean_validation_dataset)

In [7]:
validation_dataloader = torch.utils.data.DataLoader(validation_dataset, batch_size=batch_size, shuffle=True)

### Model
As a model I chose convolutional autoencoder. <br>
It consists of two parts: encoder (convolution + max pooling) and decoder (transpose convolution + upsampling).

In [8]:
class ConvolutionalAutoencoder(torch.nn.Module):
    
    def __init__(self):
        
        super(ConvolutionalAutoencoder, self).__init__()
        
        self.encoder = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=3, out_channels=64 ,kernel_size=3 , stride=1, padding=0),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=3),
            torch.nn.Conv2d(in_channels=64, out_channels=32 ,kernel_size=2 , stride=1, padding=1),
            torch.nn.MaxPool2d(kernel_size=3), 
            torch.nn.ReLU(),
            torch.nn.Conv2d(in_channels=32, out_channels=8 ,kernel_size=2 , stride=1, padding=1),
            torch.nn.MaxPool2d(kernel_size=3),
            torch.nn.ReLU()
        )
        
        self.decoder = torch.nn.Sequential(
            torch.nn.Upsample(scale_factor=3, mode='bilinear', align_corners=True),
            torch.nn.ConvTranspose2d(in_channels=8, out_channels=32, kernel_size=2, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Upsample(scale_factor=3, mode='bilinear', align_corners=True),
            torch.nn.ConvTranspose2d(in_channels=32, out_channels=64, kernel_size=2, stride=1, padding=0),
            torch.nn.ReLU(),
            torch.nn.Upsample(scale_factor=3, mode='bilinear', align_corners=True),
            torch.nn.ConvTranspose2d(in_channels=64, out_channels=3, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU()
        )
        
    def decode(self, x):
        return self.decoder(x)
    
    def encode(self, x):
        return self.encoder(x)

    def forward(self, x):
        encoded = self.encode(x)
        decoded = self.decode(encoded)
        return encoded, decoded

### torchsummary
`torchsummary` provides nice summary of model and output shape after each layer. It was useful during creation of decoder while having encoder architecture.

In [9]:
from torchsummary import summary

autoencoder = ConvolutionalAutoencoder()
summary(autoencoder, input_size=(3, 48, 48))   

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 46, 46]           1,792
              ReLU-2           [-1, 64, 46, 46]               0
         MaxPool2d-3           [-1, 64, 15, 15]               0
            Conv2d-4           [-1, 32, 16, 16]           8,224
         MaxPool2d-5             [-1, 32, 5, 5]               0
              ReLU-6             [-1, 32, 5, 5]               0
            Conv2d-7              [-1, 8, 6, 6]           1,032
         MaxPool2d-8              [-1, 8, 2, 2]               0
              ReLU-9              [-1, 8, 2, 2]               0
         Upsample-10              [-1, 8, 6, 6]               0
  ConvTranspose2d-11             [-1, 32, 5, 5]           1,056
             ReLU-12             [-1, 32, 5, 5]               0
         Upsample-13           [-1, 32, 15, 15]               0
  ConvTranspose2d-14           [-1, 64,



### RMSE loss
As RMSE loss will be used for evaluating models, I implemented class which can be used during training to observe validation loss.

In [10]:
class RMSELoss(torch.nn.Module):
    def __init__(self, eps=1e-6):
        super().__init__()
        self.mse = torch.nn.MSELoss()
        self.eps = eps
        
    def forward(self,input_out, target_out):
        loss = torch.sqrt(self.mse(input_out, target_out) + self.eps)
        return loss

### Test data preparation

In [11]:
distorted_test_path = 'data/test/'
test_dataset = create_dataset(distorted_test_path)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### Utils for creating not existing directory

In [12]:
import pathlib

def create_not_existing_directory(directory: str):
    """
    Create not existing directory. 
    If directory exists, do nothing.
    :param directory: str
        directory to create
    """
    p = pathlib.Path(directory)
    if not p.is_dir():
        print(f'Creating directory: {directory} as it does not exist')
        p.mkdir(parents=True, exist_ok=True)
    
create_not_existing_directory('model/')

### Training
Training is conducted for 100 epochs and can be stopped if there is no improvements for loss value (should decrease) in following 8 epochs.

The best model so far is saved for predictions purposes.

In [13]:
import os
import numpy as np
np.random.seed(1)

optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)

epochs = 150
loss = RMSELoss()

epochs_without_improvement: int = 0
max_epochs_without_improvement: int = 8
the_lowest_loss: int = 1

for epoch in range(epochs):
    for x, y in train_dataloader:
        _, decoded_out = autoencoder(x)
        loss_value = loss(y, decoded_out)
        optimizer.zero_grad()
        loss_value.backward()
        optimizer.step()
    
    with torch.no_grad():
        loss_value_per_epoch = 0
        for x, y in validation_dataloader:
            _, decoded_out = autoencoder(x)
            loss_value = loss(y, decoded_out)
            loss_value_per_epoch = loss_value_per_epoch + loss_value.item()
        
        loss_value_per_epoch = loss_value_per_epoch / len(distorted_validation_dataset)
        print(f"Epoch: {epoch}, loss: {loss_value_per_epoch}")
        if loss_value_per_epoch < the_lowest_loss:
            the_lowest_loss = loss_value_per_epoch
            epochs_without_improvement = 0
            torch.save(autoencoder.state_dict(), os.path.join('model', 'model' + '.pt'))
        else:
            epochs_without_improvement = epochs_without_improvement + 1
            if epochs_without_improvement == max_epochs_without_improvement:
                 break

Epoch: 0, loss: 0.0035788718909025193
Epoch: 1, loss: 0.0033174886927008628
Epoch: 2, loss: 0.0030996461808681488
Epoch: 3, loss: 0.0030359924733638763
Epoch: 4, loss: 0.0029196545854210856
Epoch: 5, loss: 0.002964951179921627
Epoch: 6, loss: 0.002847883924841881
Epoch: 7, loss: 0.002823504202067852
Epoch: 8, loss: 0.002795414872467518
Epoch: 9, loss: 0.002812020227313042
Epoch: 10, loss: 0.002781913921236992
Epoch: 11, loss: 0.0027661576345562934
Epoch: 12, loss: 0.0027250415235757827
Epoch: 13, loss: 0.0027250478565692903
Epoch: 14, loss: 0.002702657103538513
Epoch: 15, loss: 0.002700040742754936
Epoch: 16, loss: 0.002687894992530346
Epoch: 17, loss: 0.002682480715215206
Epoch: 18, loss: 0.0026886307448148726
Epoch: 19, loss: 0.002677454359829426
Epoch: 20, loss: 0.0026683810502290727
Epoch: 21, loss: 0.002658243544399738
Epoch: 22, loss: 0.0026534142345190047
Epoch: 23, loss: 0.002647374853491783
Epoch: 24, loss: 0.002648480162024498
Epoch: 25, loss: 0.002630806252360344
Epoch: 26, 

KeyboardInterrupt: 

In [14]:
print(f'The lowest loss: {the_lowest_loss}')

The lowest loss: 0.0025031174868345263


### Predictions
The best model is used for preapring predictions for test data.

In [15]:
autoencoder = ConvolutionalAutoencoder()
autoencoder.load_state_dict(torch.load(os.path.join('model', 'model' + '.pt')))
with torch.no_grad():
    decoded_images = []
    for x in test_dataloader:
        _, decoded_out = autoencoder(x[0])
        decoded_images.append(decoded_out)    

In [16]:
stacked_test = torch.cat(decoded_images, dim=0)
stacked_test = stacked_test.reshape(-1, 3, 48, 48)

### Saving results 
Results are saved to csv file matching kaggle submission format.

In [17]:
import numpy as np

def save_result(images: np.ndarray, out_path: str):
    
    assert images.shape == (400, 3, 48, 48)
    
    flat_img = images.reshape(400, -1)
    n_rows = np.prod(images.shape)
    
    y_with_id = np.concatenate([np.arange(n_rows).reshape(-1, 1), flat_img.reshape(n_rows, 1)], axis=1)
    np.savetxt(out_path, y_with_id, delimiter=",", fmt=['%d', '%.4f'], header="id,expetced", comments='')

In [18]:
create_not_existing_directory('results/')
save_result(stacked_test.detach().numpy(), 'results/result.csv')