# Get Dataset from Google Drive
please upload your dataset on google drive first.

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
import os
import zipfile
import tqdm

file_name = "Multimedia_dataset.zip"
zip_path = os.path.join('/content/drive/MyDrive/Multimedia_dataset.zip')

!cp "{zip_path}" .
!unzip -q "{file_name}"
!rm "{file_name}"

# Noise Transform
If you want to change how much noise you are giving, change the stddev and mean values at 'gaussian_noise' function.

In [None]:
import torch
from torch.autograd import Variable
from torchvision import transforms

import random

class NoiseTransform(object):
  def __init__(self, size=180, mode="training"):
    super(NoiseTransform, self).__init__()
    self.size = size
    self.mode = mode
  
  def gaussian_noise(self, img):
    mean = 0
    stddev = 25
    noise = Variable(torch.zeros(img.size()))
    noise = noise.data.normal_(mean, stddev/255.)

    return noise

  def __call__(self, img):
    if (self.mode == "training") | (self.mode == "validation"):
      self.gt_transform = transforms.Compose([
        # transforms.RandomCrop(self.size),
        transforms.Resize((self.size, self.size), interpolation=2),
        transforms.ToTensor()])
      self.noise_transform = transforms.Compose([
        # transforms.RandomCrop(self.size),
        transforms.Resize((self.size, self.size), interpolation=2),
        transforms.ToTensor(),
        transforms.Lambda(self.gaussian_noise),
      ])
      return self.gt_transform(img), self.noise_transform(img)

    elif self.mode == "testing":
      self.gt_transform = transforms.Compose([
        transforms.Resize((self.size, self.size), interpolation=2),
        transforms.ToTensor()])
      return self.gt_transform(img)
    else:
      return NotImplementedError

#Dataloader for Noise Dataset

In [None]:
import torch
import torch.utils.data  as data
import os
from PIL import Image

class NoiseDataset(data.Dataset):
  def __init__(self, root_path, size):
    super(NoiseDataset, self).__init__()

    self.root_path = root_path
    self.size = size
    self.transforms = None
    self.examples = None

  def set_mode(self, mode):
    self.mode = mode
    self.transforms = NoiseTransform(self.size, mode)
    if mode == "training":
      train_dir = os.path.join(self.root_path, "train")
      self.examples = [os.path.join(self.root_path, "train", dirs) for dirs in os.listdir(train_dir)]
    elif mode == "validation":
      val_dir = os.path.join(self.root_path, "validation")
      self.examples = [os.path.join(self.root_path, "validation", dirs) for dirs in os.listdir(val_dir)]
    elif mode == "testing":
      test_dir = os.path.join(self.root_path, "test")
      self.examples = [os.path.join(self.root_path, "test", dirs) for dirs in os.listdir(test_dir)]
    else:
      raise NotImplementedError
  
  def __len__(self):
    return len(self.examples)

  def __getitem__(self, idx):
    file_name = self.examples[idx]
    image = Image.open(file_name)

    if self.mode == "testing":
      input_img = self.transforms(image)
      sample = {"img": input_img, "file_name": self.examples[idx].split("/")[-1]}
    else:
      clean, noise = self.transforms(image)
      sample = {"img": clean, "noise": noise, "file_name": self.examples[idx].split("/")[-1]}

    return sample

# Network
Simplified DNCNN network for example.

In [None]:
import torch.nn as nn


class DNCNN(nn.Module):
  def __init__(self, in_planes=3, blocks=17, hidden=64, kernel_size=3, padding=1, bias=False):
    super(DNCNN, self).__init__()
    self.conv_f = nn.Conv2d(in_channels=in_planes, out_channels=hidden, kernel_size=kernel_size, padding=padding, bias=bias)
    self.conv_h = nn.Conv2d(in_channels=hidden, out_channels=hidden, kernel_size=kernel_size, padding=padding, bias=bias)
    self.conv_l = nn.Conv2d(in_channels=hidden, out_channels=in_planes, kernel_size=kernel_size, padding=padding, bias=bias)

    self.bn = nn.BatchNorm2d(hidden)
    self.relu = nn.ReLU(inplace=True)

    self.hidden_layer = self.mk_hidden_layer(blocks)

  def mk_hidden_layer(self, blocks=17):
    layers = []
    for _ in range(blocks-2):
      layers.append(self.conv_h)
      layers.append(self.bn)
      layers.append(self.relu)
    return nn.Sequential(*layers)

  def forward(self, x):
    out = self.conv_f(x)
    out = self.relu(out)

    out = self.hidden_layer(out)

    out = self.conv_l(out)

    return out

# Tensorboard
For training progress visualization. Run before training phase.

In [None]:
!pip install tensorboardX

Collecting tensorboardX
[?25l  Downloading https://files.pythonhosted.org/packages/07/84/46421bd3e0e89a92682b1a38b40efc22dafb6d8e3d947e4ceefd4a5fabc7/tensorboardX-2.2-py2.py3-none-any.whl (120kB)
[K     |██▊                             | 10kB 21.6MB/s eta 0:00:01[K     |█████▍                          | 20kB 19.5MB/s eta 0:00:01[K     |████████▏                       | 30kB 10.9MB/s eta 0:00:01[K     |██████████▉                     | 40kB 9.1MB/s eta 0:00:01[K     |█████████████▋                  | 51kB 7.5MB/s eta 0:00:01[K     |████████████████▎               | 61kB 8.7MB/s eta 0:00:01[K     |███████████████████             | 71kB 8.6MB/s eta 0:00:01[K     |█████████████████████▊          | 81kB 8.8MB/s eta 0:00:01[K     |████████████████████████▌       | 92kB 7.8MB/s eta 0:00:01[K     |███████████████████████████▏    | 102kB 7.3MB/s eta 0:00:01[K     |██████████████████████████████  | 112kB 7.3MB/s eta 0:00:01[K     |████████████████████████████████| 122kB 

In [None]:
# %load_ext tensorboard
%reload_ext tensorboard

In [None]:
%tensorboard --logdir logs

# Training Phase
Simple DNCNN training code.

In [None]:
import torch
import torch.nn as nn
import torch.utils.data  as data

from torchvision import transforms
from tensorboardX import SummaryWriter
import torchvision.utils as tvutils

import os
import matplotlib.pyplot as plt
import numpy as np
import tqdm.notebook as tq
from PIL import Image
from skimage.measure.simple_metrics import compare_psnr

def batch_PSNR(img, imclean, data_range):
    Img = img.data.cpu().numpy().astype(np.float32)
    Iclean = imclean.data.cpu().numpy().astype(np.float32)
    PSNR = 0
    for i in range(Img.shape[0]):
        PSNR += compare_psnr(Iclean[i, :, :, :], Img[i, :, :, :], data_range=data_range)
    return (PSNR/Img.shape[0])

# Change to your data root directory
root_path = "/content/"
save_path = "/content/drive/MyDrive/DNCNN_models"

# Depend on runtime setting
use_cuda = True

# Dataloader setting
train_dataset = NoiseDataset(root_path, 128)
train_dataset.set_mode("training")

val_dataset = NoiseDataset(root_path, 128)
val_dataset.set_mode("validation")

train_dataloader = data.DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = data.DataLoader(val_dataset, batch_size=8, shuffle=True)

# Model declaration
net = DNCNN()
model = nn.DataParallel(net)

# loss
criterion = nn.MSELoss(size_average=False)

if use_cuda:
  model.to('cuda')
  criterion.to('cuda')

# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, eps=1e-08)

step = 0
# Tensorboard writer
writer = SummaryWriter("logs")

for epoch in range(20):
  print('Epoch {}/{}'.format(epoch + 1, 50))
  print('-' * 10)

  for i, data in enumerate(tq.tqdm(train_dataloader)):
    if use_cuda:
      img = data["img"].to('cuda')
      noise = data["noise"].to('cuda')
    
    model.train()
    model.zero_grad()
    optimizer.zero_grad()
    
    noise_image = img + noise
    model_input = torch.clamp(noise_image, 0, 1)

    preds = model(model_input)
    
    loss = criterion(preds, noise) / (model_input.size()[0] * 2)

    loss.backward()
    optimizer.step()

    model.eval()

    out_train = torch.clamp(model_input - model(model_input), 0., 1.)
    psnr_train = batch_PSNR(out_train, model_input, 1.)

    if step % 100 == 0:
      # Log the scalar values
      writer.add_scalar('loss', loss.item(), step)
      writer.add_scalar('PSNR on training data', psnr_train, step)

      # log the images => Tensorboard
      Img = tvutils.make_grid(img.data, nrow=4, normalize=True, scale_each=True)
      Imgn = tvutils.make_grid(model_input.data, nrow=4, normalize=True, scale_each=True)
      Irecon = tvutils.make_grid(out_train.data, nrow=4, normalize=True, scale_each=True)
      writer.add_image('clean image', Img, epoch)
      writer.add_image('noisy image', Imgn, epoch)
      writer.add_image('reconstructed image', Irecon, epoch)
      print("[epoch %d][%d/%d] loss: %.4f PSNR_train: %.4f" %
          (epoch + 1, i + 1, len(train_dataloader), loss.item(), psnr_train))
    step += 1
    
  torch.save(model.module.state_dict(), os.path.join(save_path, "{}.tar".format(epoch+30)))
  print("saved at {}".format(os.path.join(save_path, "{}.tar".format(epoch+30))))

  psnr_val = 0

  # Validation on training phase
  model.eval()
  with torch.no_grad():
    for val_data in tq.tqdm(val_dataloader):
      img = val_data["img"].to('cuda')
      noise = val_data["noise"].to('cuda')

      val_image = img + noise
      preds = model(val_image)

      out_val = torch.clamp(val_image - preds, 0., 1.)
      psnr_val += batch_PSNR(out_val, val_image, 1.)

      val_Img = tvutils.make_grid(img.data, nrow=4, normalize=True, scale_each=True)
      val_Imgn = tvutils.make_grid(val_image.data, nrow=4, normalize=True, scale_each=True)
      val_Irecon = tvutils.make_grid(out_val.data, nrow=4, normalize=True, scale_each=True)
      writer.add_image('clean image', val_Img, epoch)
      writer.add_image('noisy image', val_Imgn, epoch)
      writer.add_image('validation reconstructed image', val_Irecon, epoch)
    
    psnr_val /= len(val_dataloader)
    print("\n[epoch %d] PSNR_val: %.4f" % (epoch + 1, psnr_val))
    writer.add_scalar('PSNR on validation data', psnr_val, epoch)

In [None]:
import torch
import torch.nn as nn
import torch.utils.data  as data

import os
import numpy as np
import tqdm.notebook as tq
from PIL import Image
from skimage.measure.simple_metrics import compare_psnr

def image_save(img, path):
  if isinstance(img, torch.Tensor):
    img = transforms.ToPILImage()(img)
  img.save(path)

def batch_PSNR(img, imclean, data_range):
    Img = img.data.cpu().numpy().astype(np.float32)
    Iclean = imclean.data.cpu().numpy().astype(np.float32)
    PSNR = 0
    for i in range(Img.shape[0]):
        PSNR += compare_psnr(Iclean[i, :, :, :], Img[i, :, :, :], data_range=data_range)
    return (PSNR/Img.shape[0])

# Change to your data root directory
image_path = "/content/drive/MyDrive/Multimedia_test_dataset/denoising2/"
checkpoint_path = "/content/drive/MyDrive/DNCNN_models/37.tar"
result_save_path = "/content/drive/MyDrive/Multimedia_test_dataset/dncnn_test_result"

# Depend on runtime setting
use_cuda = True

test_dataset = NoiseDataset(image_path, 128)
test_dataset.set_mode("testing")

test_dataloader = data.DataLoader(test_dataset, batch_size=1, shuffle=False)

net = DNCNN()

if use_cuda:
  net.to('cuda')

net.load_state_dict(torch.load(checkpoint_path))
model = nn.DataParallel(net)

model.eval()

psnr_test = 0

for i, data in enumerate(tq.tqdm(test_dataloader)):
  if use_cuda:
    img = data["img"].to('cuda')
  file_name = data["file_name"]

  with torch.no_grad():
    out_test = torch.clamp(img - model(img), 0., 1.)
    psnr = batch_PSNR(out_test, img, 1.)
    psnr_test += psnr
    for idx in range(len(img)):
      image_save(out_test[idx], os.path.join(result_save_path, file_name[idx]))

psnr_test /= len(test_dataloader)
print("\nPSNR on test data %f" % psnr_test)

HBox(children=(FloatProgress(value=0.0, max=1000.0), HTML(value='')))





PSNR on test data 30.914452
