In [1]:
import torch
from torchvision.datasets import CIFAR10
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as tvt
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
import random
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import os
from tqdm import tqdm
import pickle

seed = 0
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic=True
torch.backends.cudnn.benchmarks=False
os.environ['PYTHONHASHSEED'] = str(seed)

In [None]:
def arrangeDataSIDD(file_path = './data/SIDD_Medium_Srgb/Data', train_ratio=0.8):

    print("Arranging the SIDD images")
    file_list = os.listdir(file_path)
    random.shuffle(file_list)
    training_data = {"noisy":[], "gt":[]}
    testing_data = {"noisy":[], "gt": []}
    train_files = file_list[:len(file_list)*train_ratio]
    test_files = file_list[len(file_list)*train_ratio : ]

    print('Creating training data')
    for i in tqdm(range(len(train_files))):
        if "GT_SRGB_010" in train_files[i]:
            gt_img = Image.open(train_files[i]).convert('RGB').resize((512,512))
            training_data['gt'].append(gt_img)
        if "NOISY_SRGB_010" in train_files[i]:
            noisy_img = Image.open(train_files[i]).convert('RGB').resize((512,512))
            training_data['noisy'].append(noisy_img)
    print("Number of ground truth images:",len(training_data['gt']))
    print("Number of noisy images:",len(training_data['noisy']))  


    print('Creating testing data')
    for i in tqdm(range(len(test_files))):
        if "GT_SRGB_010" in test_files[i]:
            gt_img = Image.open(test_files[i]).convert('RGB').resize((512,512))
            testing_data['gt'].append(gt_img)
        if "NOISY_SRGB_010" in test_files[i]:
            noisy_img = Image.open(test_files[i]).convert('RGB').resize((512,512))
            testing_data['noisy'].append(noisy_img)
    print("Number of ground truth images:",len(testing_data['gt']))
    print("Number of noisy images:",len(testing_data['noisy']))
    
    with open('./data_sidd.pkl','wb') as f:
        pickle.dump((training_data, testing_data), f, protocol=pickle.HIGHEST_PROTOCOL)
    print('Pickle File Created for SIDD Images')

In [None]:
def arrangeDataPolyU(file_path = './data/PolyU-Real-World-Noisy-Images-Dataset-master', train_ratio=0.8):
    
    print('Arranging the PolyU images')
    overall_set = []
    file_list = os.listdir(file_path)
    for i in range(0,len(file_list),2):
        overall_set.append[(file_list[i], file_list[i+1])]
    random.shuffle(file_list)
    train_files = file_list[:len(file_list)*train_ratio]
    test_files = file_list[len(file_list)*train_ratio : ]
    training_data = {"noisy":[], "gt":[]}
    testing_data = {"noisy":[], "gt": []}

    print('Creating training data')
    for i in range(len(train_files)):
        training_data['noisy'].append(Image.open(train_files[i][1]).convert('RGB').resize(512,512))
        training_data['gt'].append(Image.open(train_files[i][0]).convert('RGB').resize(512,512))
    print("Number of ground truth images:",len(training_data['gt']))
    print("Number of noisy images:",len(training_data['noisy']))  
    
    print('Creating testing data')
    for i in range(len(test_files)):
        testing_data['noisy'].append(Image.open(test_files[i][1]).convert('RGB').resize(512,512))
        testing_data['gt'].append(Image.open(test_files[i][0]).convert('RGB').resize(512,512))
    print("Number of ground truth images:",len(testing_data['gt']))
    print("Number of noisy images:",len(testing_data['noisy']))

    with open('./data_polyu.pkl','wb') as f:
        pickle.dump((training_data, testing_data), f, protocol=pickle.HIGHEST_PROTOCOL)
    print('Pickle File Created for PolyU Images')

In [None]:
class ImageDataset(Dataset):
    def __init__(self, data):
        super(ImageDataset, self).__init__()
        self.noisyData = data['noisy']
        self.gtData = data['gt']
        self.transform = tvt.Compose([tvt.ToTensor()])

    def __len__(self):
        return len(self.noisyData)
    
    def __getitem__(self, index):
        return self.transform(self.noisyData[index]), self.transform(self.gtData[index])

In [None]:
def createDataLoaders(type):
    if type=='SIDD':
        if not os.path.isfile('./data_sidd.pkl'):
            arrangeDataSIDD()
        with open('./data_sidd.pkl', 'rb') as f:
            data = pickle.load(f)
        train_dataset = ImageDataset(data[0])
        test_dataset = ImageDataset(data[1])
        train_DL = DataLoader(train_dataset, batch_size=4, num_workers=2, shuffle=False)
        test_DL = DataLoader(test_dataset, batch_size=4, num_workers=2, shuffle=False)
        return train_DL, test_DL
    if type=='PolyU':
        if not os.path.isfile('./data_polyu.pkl'):
            arrangeDataSIDD()
        with open('./data_polyu.pkl', 'rb') as f:
            data = pickle.load(f)
        train_dataset = ImageDataset(data[0])
        test_dataset = ImageDataset(data[1])
        train_DL = DataLoader(train_dataset, batch_size=4, num_workers=2, shuffle=False)
        test_DL = DataLoader(test_dataset, batch_size=4, num_workers=2, shuffle=False)
        return train_DL, test_DL

In [None]:
class dCNN(torch.nn.Module):
    def __init__(self, channels, layers=17):
        super(dCNN, self).__init__()
        self.conv1 = torch.nn.Conv2d(
            in_channels=channels, out_channels=64, kernel_size=3, padding=1, bias=False
        )
        self.convList = torch.nn.ModuleList()
        for i in range(layers - 2):
            self.convList.append(
                torch.nn.Conv2d(
                    in_channels=64,
                    out_channels=64,
                    kernel_size=3,
                    padding=1,
                    bias=False,
                )
            )
            self.convList.append(torch.nn.BatchNorm2d(64))
            self.convList.append(torch.nn.ReLU(inplace=True))
        self.conv2 = torch.nn.Conv2d(
            in_channels=64, out_channels=channels, kernel_size=3, padding=1, bias=False
        )

    def forward(self, x):
        x = self.conv1(x)
        x = torch.nn.functional.relu(x, inplace=True)
        for conv in self.convList:
            x = conv(x)
        x = self.conv2(x)
        return x

In [None]:
# function to check PSNR(dB)/SSIM values of the resulted images
def calculate_metrics(image, original, data_range):
    image = image.data.cpu().numpy().astype(np.float32)
    original = original.data.cpu().numpy().astype(np.float32)
    psnr_value = 0
    ssim_value = 0
    for i in range(image.shape[0]):
        psnr_value += psnr(
            original[i, :, :, :], image[i, :, :, :], data_range=data_range
        )
        ssim_value += ssim(
            original[i, :, :, :],
            image[i, :, :, :],
            channel_axis=0,
            data_range=data_range,
        )
    psnr_value /= image.shape[0]
    ssim_value /= image.shape[0]

    return psnr_value, ssim_value

In [None]:
def trainingR2R(model, device, dataloader, epochs, sigma,type):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.MSELoss()
    alpha = 0.5
    for epoch in range(epochs):
        running_loss = 0.0
        psnr_value = 0.0
        for idx, data in enumerate(dataloader):
            model.train()
            optimizer.zero_grad()
            noisy_image, clean_image = data
            noisy_image = noisy_image.to(device)
            clean_image = clean_image.to(device)
            D = (sigma/255.00) * torch.FloatTensor(noisy_image.size()).normal_(mean=0, std=1.0).cuda()
            input = noisy_image + alpha * D
            target = noisy_image - D / alpha
            output = model(input)
            loss = criterion(output, target) / (target.size()[0]*2)
            running_loss += loss.item()
            loss.backward()
            optimizer.step()

            # evaluating the performance per batch
            model.eval()
            output = torch.clamp(model(noisy_image), 0, 1)
            psnr_val, ssim_value = calculate_metrics(output, clean_image, 1.0)
            psnr_value += psnr_val
            if idx % 50 == 0:
                print(
                    "Epoch:",epoch + 1,
                    "[",idx + 1,"/",
                    len(dataloader),"]",
                    "=>",
                    "PSNR:",psnr_value / 50,
                    "Loss:",running_loss / 50,
                )
                running_loss = 0.0
                psnr_value = 0.0
    file_name = "R2R_" + type + "_model_" + str(sigma) + ".pth"
    torch.save(model.state_dict(), file_name)

In [None]:
def trainingN2N(model, device, dataloader, epochs, sigma_1, sigma_2, type):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.MSELoss()
    for epoch in range(epochs):
        running_loss = 0.0
        psnr_value = 0.0
        for idx, data in enumerate(dataloader):
            model.train()
            optimizer.zero_grad()
            noisy_image, clean_image = data
            noisy_image = noisy_image.to(device)
            clean_image = clean_image.to(device)
            input = noisy_image + torch.randn(noisy_image.size()) * sigma_1
            target = noisy_image + torch.randn(noisy_image.size()) * sigma_2
            output = model(input)
            loss = criterion(output, target) / (target.size()[0]*2)
            running_loss += loss.item()
            loss.backward()
            optimizer.step()

            # evaluating the performance per batch
            model.eval()
            output = torch.clamp(model(noisy_image), 0, 1)
            psnr_val, ssim_value = calculate_metrics(output, clean_image, 1.0)
            psnr_value += psnr_val
            if idx % 50 == 0:
                print(
                    "Epoch:",epoch + 1,
                    "[",idx + 1,"/",
                    len(dataloader),"]",
                    "=>",
                    "PSNR:",psnr_value / 50,
                    "Loss:",running_loss / 50,
                )
                running_loss = 0.0
                psnr_value = 0.0
    file_name = "N2N_" + type + "_model_" + str(sigma_1) + str(sigma_2) + ".pth"
    torch.save(model.state_dict(), file_name)

In [None]:
def trainingN2C(model, device, dataloader, epochs, type):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.MSELoss()
    for epoch in range(epochs):
        running_loss = 0.0
        psnr_value = 0.0
        for idx, data in enumerate(dataloader):
            model.train()
            optimizer.zero_grad()
            noisy_image, clean_image = data
            noisy_image = noisy_image.to(device)
            clean_image = clean_image.to(device)
            input = noisy_image
            target = clean_image
            output = model(input)
            loss = criterion(output, target) / (target.size()[0]*2)
            running_loss += loss.item()
            loss.backward()
            optimizer.step()

            # evaluating the performance per batch
            model.eval()
            output = torch.clamp(model(noisy_image), 0, 1)
            psnr_val, ssim_value = calculate_metrics(output, clean_image, 1.0)
            psnr_value += psnr_val
            if idx % 50 == 0:
                print(
                    "Epoch:",epoch + 1,
                    "[",idx + 1,"/",
                    len(dataloader),"]",
                    "=>",
                    "PSNR:",psnr_value / 50,
                    "Loss:",running_loss / 50,
                )
                running_loss = 0.0
                psnr_value = 0.0
    file_name = "N2C_" + type + "_model_" + ".pth"
    torch.save(model.state_dict(), file_name)

In [None]:
#def sampleTestingR2R()

In [None]:
#def sampleTestingN2N()

In [None]:
#def noiseCheckR2R()

In [None]:
#def noiseCheckN2N()

In [None]:
# Main Pipeline using SIDD Dataset


In [None]:
# Main Pipeline using PolyU Dataset