In [0]:
from __future__ import print_function

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
from PIL import Image
from IPython.display import display 
import matplotlib.pyplot as plt
from torchvision import datasets
import torchvision.transforms as transforms
import torchvision.models as models
import torchsummary
import numpy as np
from collections import namedtuple
from torch.utils.data.sampler import SubsetRandomSampler
from torchsummary import summary
from torch.autograd import Variable


import copy
import time
import glob
import re
import cv2

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [0]:
from google.colab import drive
drive.mount("/content/gdrive")

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
batch_size = 32
random_seed = 10
num_epochs = 280
initial_lr = 1e-3
checkpoint_dir = "/content/gdrive/My Drive/Colab_Notebooks/best-artworks-of-all-time/data/" #FIXME

log_interval = 20
checkpoint_interval = 210

running_option = "test"  
#running_option = "training" 

In [0]:
class ConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride):
        super(ConvLayer, self).__init__()
        reflection_padding = kernel_size // 2
        self.reflection_pad = nn.ReflectionPad2d(reflection_padding)
        self.conv2d = nn.Conv2d(in_channels, out_channels, kernel_size, stride)

    def forward(self, x):
        out = self.reflection_pad(x)
        out = self.conv2d(out)
        return out

    
    
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = ConvLayer(channels, channels, kernel_size=3, stride=1)
        self.in1 = nn.InstanceNorm2d(channels, affine=True)
        self.conv2 = ConvLayer(channels, channels, kernel_size=3, stride=1)
        self.in2 = nn.InstanceNorm2d(channels, affine=True)
        self.relu = nn.ReLU()

    def forward(self, x):
        residual = x
        out = self.relu(self.in1(self.conv1(x)))
        out = self.in2(self.conv2(out))
        out = out + residual
        return out    


class UpsampleConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, upsample=None):
        super(UpsampleConvLayer, self).__init__()
        self.upsample = upsample
        reflection_padding = kernel_size // 2
        self.reflection_pad = nn.ReflectionPad2d(reflection_padding)
        self.conv2d = nn.Conv2d(in_channels, out_channels, kernel_size, stride)

    def forward(self, x):
        x_in = x
        if self.upsample:
            x_in = nn.functional.interpolate(x_in, mode='nearest', scale_factor=self.upsample)
        out = self.reflection_pad(x_in)
        out = self.conv2d(out)
        return out
    

    
class AutoencoderNet(nn.Module):
    def __init__(self):
        super(AutoencoderNet, self).__init__()
        # Initial convolution layers
        self.encoder = nn.Sequential()
        
        self.encoder.add_module('conv1', ConvLayer(3, 32, kernel_size=9, stride=1))
        self.encoder.add_module('in1', nn.InstanceNorm2d(32, affine=True))
        self.encoder.add_module('relu1', nn.ReLU())
        self.encoder.add_module('maxpool1', nn.MaxPool2d(2,2))
        
        self.encoder.add_module('conv2', ConvLayer(32, 64, kernel_size=3, stride=2))
        self.encoder.add_module('in2', nn.InstanceNorm2d(64, affine=True))
        self.encoder.add_module('relu2', nn.ReLU())
        self.encoder.add_module('maxpool2', nn.MaxPool2d(2,2))
        
        self.encoder.add_module('conv3', ConvLayer(64, 128, kernel_size=3, stride=2))
        self.encoder.add_module('in3', nn.InstanceNorm2d(128, affine=True))
        self.encoder.add_module('relu3', nn.ReLU())
        self.encoder.add_module('maxpool3', nn.MaxPool2d(2,2))

        # Residual layers
        self.residual = nn.Sequential()
        
        for i in range(5):
            self.residual.add_module('resblock_%d' %(i+1), ResidualBlock(128))
        
        # Upsampling Layers
        self.decoder = nn.Sequential()
        self.decoder.add_module('upsample1', nn.Upsample(scale_factor=2))
        self.decoder.add_module('deconv1', UpsampleConvLayer(128, 64, kernel_size=3, stride=1, upsample=2))
        self.decoder.add_module('in4', nn.InstanceNorm2d(64, affine=True))
        self.decoder.add_module('relu4', nn.ReLU())

        self.decoder.add_module('upsample2', nn.Upsample(scale_factor=2))
        self.decoder.add_module('deconv2', UpsampleConvLayer(64, 32, kernel_size=3, stride=1, upsample=2))
        self.decoder.add_module('in5', nn.InstanceNorm2d(32, affine=True))
        self.decoder.add_module('relu5', nn.ReLU())

        self.decoder.add_module('upsample3', nn.Upsample(scale_factor=2))
        self.decoder.add_module('deconv3', ConvLayer(32, 3, kernel_size=9, stride=1))


    def forward(self, X):
        encoder_output = self.encoder(X)
        residual_output = self.residual(encoder_output)
        decoder_output = self.decoder(residual_output)
        return decoder_output

    
class ClassifierNet(nn.Module):
    def __init__(self):
        super(ClassifierNet, self).__init__()
        # Initial convolution layers
        self.encoder = nn.Sequential()
        
        self.encoder.add_module('conv1', ConvLayer(3, 32, kernel_size=9, stride=1))
        self.encoder.add_module('in1', nn.InstanceNorm2d(32, affine=True))
        self.encoder.add_module('relu1', nn.ReLU())
        self.encoder.add_module('maxpool1', nn.MaxPool2d(2,2))
        
        self.encoder.add_module('conv2', ConvLayer(32, 64, kernel_size=3, stride=2))
        self.encoder.add_module('in2', nn.InstanceNorm2d(64, affine=True))
        self.encoder.add_module('relu2', nn.ReLU())
        self.encoder.add_module('maxpool2', nn.MaxPool2d(2,2))
        
        self.encoder.add_module('conv3', ConvLayer(64, 128, kernel_size=3, stride=2))
        self.encoder.add_module('in3', nn.InstanceNorm2d(128, affine=True))
        self.encoder.add_module('relu3', nn.ReLU())
        self.encoder.add_module('maxpool3', nn.MaxPool2d(2,2))

        # Residual layers
        self.residual = nn.Sequential()
        
        for i in range(5):
            self.residual.add_module('resblock_%d' %(i+1), ResidualBlock(128))
        
        #Classifier
        self.fc = nn.Sequential()
        
        self.fc.add_module('fullyconnected1', nn.Linear(128*8*8, 1024))
        self.fc.add_module('bn1', nn.BatchNorm1d(1024))
        self.fc.add_module('relu4', nn.ReLU())
        self.fc.add_module('fullyconnected2', nn.Linear(1024, 500))
        self.fc.add_module('bc2', nn.BatchNorm1d(500))
        self.fc.add_module('relu5', nn.ReLU())
        self.fc.add_module('fullyconnected3', nn.Linear(500, 50))
        #self.fc.add_module('softmax1', nn.Softmax(dim = 1)) 


    def forward(self, X):
        encoder_output = self.encoder(X)
        residual_output = self.residual(encoder_output) #autoencoder
        residual_flatten = residual_output.view(residual_output.size(0), -1)        
        classifier_output = self.fc(residual_flatten) #classifier
        return classifier_output


In [0]:
def normalize_batch(batch):
    # normalize using imagenet mean and std
    mean = batch.new_tensor([0.485, 0.456, 0.406]).view(-1, 1, 1)
    std = batch.new_tensor([0.229, 0.224, 0.225]).view(-1, 1, 1)
    batch = batch.div_(255.0)
    return (batch - mean) / std

In [0]:
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(256),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomAffine(0, shear=10, scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.mul(255))
])  

train_dataset = datasets.ImageFolder("/content/gdrive/My Drive/Colab_Notebooks/best-artworks-of-all-time/images", transform) #FIXME
test_dataset = datasets.ImageFolder("/content/gdrive/My Drive/Colab_Notebooks/best-artworks-of-all-time/images", transform) #FIXME

# ---------------------- train, test를 split하는 집합 --------------------------
num_train = len(train_dataset)
indices = list(range(num_train))
split = int(np.floor(0.1* num_train))

np.random.seed(random_seed)
np.random.shuffle(indices)

train_idx, test_idx = indices[split:], indices[:split]
train_sampler = SubsetRandomSampler(train_idx)
test_sampler = SubsetRandomSampler(test_idx)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler = train_sampler)
test_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler = test_sampler)
print("train dataset num:", len(train_loader))
print("test dataset num:",len(test_loader) )


def im_convert(tensor):
    image = tensor.clone().detach().numpy()
    image = image.transpose(1, 2, 0)
    # denormalize
    image = image * np.array([0.5, 0.5, 0.5] + np.array([0.5, 0.5, 0.5]))
    image = image.clip(0, 255)
    return image

dataiter = iter(train_loader)
images, classes = dataiter.next()
title = [train_dataset.classes[i] for i in classes]

fig = plt.figure(figsize=(25, 4))

for i in np.arange(20):
    ax = fig.add_subplot(2, 10, i+1, xticks=[], yticks=[])
    plt.imshow(im_convert(images[i]).astype('uint8'))
    ax.set_title(title[i])

In [0]:
classifier = ClassifierNet().to(device)
autoencoder = AutoencoderNet().to(device)
summary(classifier, (3, 256, 256))

In [0]:
#전이학습
transfer_learning = True # inference or training first --> False / Transfer learning --> True
ckpt_model_path = os.path.join(checkpoint_dir, "ckpt_epoch__114_batch_id_200.pth") 
'''
if transfer_learning:
    checkpoint = torch.load(ckpt_model_path, map_location=device)
    autoencoder.load_state_dict(checkpoint['model_state_dict'])
    autoencoder.to(device)

pretrained_dict = autoencoder.state_dict()
new_model_dict = classifier.state_dict()
pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in new_model_dict}
new_model_dict.update(pretrained_dict)
classifier.load_state_dict(new_model_dict)
classifier.train()
'''


classifier_ckpt_model_path = os.path.join(checkpoint_dir, "ckpt_epoch_249.pth") #FIXME
if transfer_learning:
    checkpoint = torch.load(classifier_ckpt_model_path ,map_location = device)
    classifier.load_state_dict(checkpoint['model_state_dict'])
    classifier.to(device)


# Optimizer에는 requires_grad=True 인 parameters들만 들어갈수 있습니다.
optimizer = torch.optim.Adam(classifier.parameters(), initial_lr)
criterion = nn.CrossEntropyLoss()


In [0]:
if running_option == 'training':
    if transfer_learning:
        transfer_learning_epoch = checkpoint['epoch']
    else:
        transfer_learning_epoch = 0
        
    for epoch in range(transfer_learning_epoch, num_epochs):
        classifier.train()
        running_loss=0.
        running_correct=0.
        count=0
        
        for batch_id, (x,labels) in enumerate(train_loader):
            n_batch = len(x)
            count += n_batch
            optimizer.zero_grad()
            
            # ================= forwar*d =====================
            x = x.to(device)
            labels = labels.to(device)
            x = normalize_batch(x)
            outputs = classifier(x)
            _,preds = torch.max(outputs, 1)
            
            # ================= loss =====================
            loss = criterion(outputs, labels)
            
            # =============== Backward ===================
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            running_correct += torch.sum(preds == labels.data)

    
        mesg = "{}\tEpoch {}:\t[{}/{}]\tloss: {:.4f}\tacc: {:.4f}".format( time.ctime(), epoch + 1, count, 
                                                                          len(train_dataset),running_loss / len(train_loader),running_correct.float() / len(train_loader))
        print(mesg)
        running_loss = 0
          
        classifier.eval().cpu()
        ckpt_model_filename = "ckpt_epoch_" + str(epoch) + ".pth"
        print(str(epoch), "th checkpoint is saved!")
        ckpt_model_path = os.path.join(checkpoint_dir, ckpt_model_filename)
        torch.save({
        'epoch': epoch,
        'model_state_dict': classifier.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss.data
        }, ckpt_model_path)

        classifier.to(device).train()

In [0]:
correct = 0
total = 0
classes = train_dataset.classes
if running_option == "test":
  ckpt_model_path = os.path.join(checkpoint_dir, "ckpt_epoch_264.pth") #FIXME
  checkpoint = torch.load(ckpt_model_path, map_location=device)         
  classifier.load_state_dict(checkpoint['model_state_dict'])
  classifier.to(device)   
  class_correct = list(0. for i in range(50))
  class_total = list(0. for i in range(50))
  with torch.no_grad():
      for data in test_loader:
          images, labels = data
          images = images.to(device)
          labels = labels.to(device)
          outputs = classifier(images)
          _, predicted = torch.max(outputs, 1)
          print(labels, predicted)
          total += labels.size(0)
          correct += (predicted == labels).sum().item()
          c = (predicted == labels).squeeze()
          for i in range(len(labels)):
              label = labels[i]
              class_correct[label] += c[i].item()
              class_total[label] += 1

print(len(classes), len(class_correct), len(class_total))
print('Accuracy of the network on the test images: %d %%' % (100 * correct / total))
for i in range(46):
    if(class_total[i] != 0):
        print('Accuracy of %5s : %2d %%' % (classes[i], 100 * class_correct[i] / class_total[i]))

https://excelsior-cjh.tistory.com/187 - sparse loss function
