In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
import torch
import random
import numpy as np
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision
import torch.optim as optim
import torchvision.transforms as transforms
from torch.autograd import Variable
import matplotlib.pyplot as plt

In [0]:
# custom dataloader for .npy file
class numpyDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = torch.from_numpy(data).float()
        self.transform = transform

    def __getitem__(self, index):
        x = self.data[index]
        if self.transform:
            x = self.transform(x)

        return x

    def __len__(self):
        return len(self.data)

In [0]:
class DenoiseNetwork(nn.Module):
    def __init__(self, depth=17, n_channels=64, image_channels=1, kernel_size=3):
        super(DnCNN, self).__init__()
        kernel_size = 3
        padding = 1
        encoder_layers = []

        encoder_layers.append(nn.Conv2d(in_channels=image_channels, out_channels=n_channels, kernel_size=kernel_size, padding=padding, bias=True))
        encoder_layers.append(nn.ReLU(inplace=True))
        for _ in range(depth-2):
            encoder_layers.append(nn.Conv2d(in_channels=n_channels, out_channels=n_channels, kernel_size=kernel_size, padding=padding, bias=False))
            encoder_layers.append(nn.BatchNorm2d(n_channels, momentum = 0.95))
            encoder_layers.append(nn.ReLU(inplace=True))
        encoder_layers.append(nn.Conv2d(in_channels=n_channels, out_channels=image_channels, kernel_size=kernel_size, padding=padding, bias=False))
        encoder_layers.append(nn.Conv2d(1, 1, 1))
        
        self.encoder = nn.Sequential(*encoder_layers)
        self.decoder = nn.Sequential(nn.ConvTranspose2d(1, 1, 1))

        self._initialize_weights()

    def forward(self, x):
        y = x
        out = self.encoder(x)
        out = self.decoder(out)
        return y-out

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.orthogonal_(m.weight)
                if m.bias is not None:
                    init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                init.constant_(m.weight, 1)
                init.constant_(m.bias, 0)

In [0]:
# import model
model = DenoiseNetwork()
model.cuda()


learning_rate = 1e-3
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[30, 60, 90], gamma=0.2)
objective = nn.MSELoss(reduction = 'sum')


loss_train = []
to_img = transforms.ToPILImage()

In [0]:
NUM_EPOCH = 100

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
])

# for training
traindata = np.load('/content/drive/My Drive/Colab Notebooks/train.npy')
traindataset = numpyDataset(traindata, transform)
trainloader = DataLoader(traindataset, batch_size=4, shuffle=True, drop_last=True, num_workers=2)

loss_train_mean, loss_train_std = [], []
prev_train_loss, next_train_loss = 0, 0

In [0]:
def train(batch_idx, epoch, clean_image, noisy_image):
    clean, noisy = Variable(clean_image).cuda(), Variable(noisy_image).cuda()

    optimizer.zero_grad()
    output = model(noisy)

    loss = objective(output, clean)

    loss.backward()
    optimizer.step()
    scheduler.step(epoch)
    
    loss_train_batch = loss.item() / len(data)

    return loss_train_batch

In [0]:
fig, ax = plt.subplots(ncols=3, nrows=1, figsize=(9, 7))
ax[0].set_title("Clean")
ax[1].set_title("Noisy(input)")
ax[2].set_title("Denoised")

model.train()
for epoch in range(NUM_EPOCH):

    batch_train_loss = []

    for batch_idx, data in enumerate(trainloader):
        
        # Noisy Images #
        samples = [
            data + (0.01 * torch.randn(len(data), 1, 120, 80)),
            data + (0.02 * torch.randn(len(data), 1, 120, 80)),
            data + (0.03 * torch.randn(len(data), 1, 120, 80)),
            data + (0.04 * torch.randn(len(data), 1, 120, 80))
        ]
        ########

        loss_train = train(batch_idx, epoch, data, samples[random.randint(0, 3)])

        batch_train_loss.append(loss_train)

    loss_train_mean.append(np.mean(batch_train_loss))
    loss_train_std.append(np.std(batch_train_loss))

    print("[epoch %s] loss(training): %s" % (epoch, loss_train_mean[-1]))

In [0]:
# for testing
testdata = np.load('/content/drive/My Drive/Colab Notebooks/test.npy')
testdataset = numpyDataset(testdata, transform)
testloader = DataLoader(testdataset, batch_size=1, shuffle=False, drop_last=False, num_workers=2)

result_for_submit = None  # this is for submit file

model.eval()
to_img = transforms.ToPILImage()
fig, ax = plt.subplots(ncols=2, nrows=1, figsize=(9, 7))
for batch_idx, data in enumerate(testloader):
    result_of_test = data

    with torch.no_grad():
        fig, ax = plt.subplots(ncols=2, nrows=1, figsize=(7,9))
        result_of_test = model.forward(data.cuda())
        ax[0].imshow(to_img(data[0].cpu()), cmap='gray')
        ax[1].imshow(to_img(y[0].cpu()), cmap='gray')
        fig.show()

    if batch_idx == 0:
        result_for_submit = result_of_test
    else:
        try:
            result_for_submit = torch.cat([result_for_submit, result_of_test], dim=0)

        except RuntimeError:
            transposed = torch.transpose(result_of_test, 2, 3)
            result_for_submit = torch.cat([result_for_submit, transposed], dim=0)

# the submit_file.shape must be (400,1,120,80)
submit_file = result_for_submit.detach().numpy()
print(submit_file.shape)
np.save('your_name.npy', submit_file)