In [1]:
from torch import nn
from torch.autograd import Variable
import torch
import torch.nn.functional as F
import torchvision
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.utils as vutils
import numpy as np
from PIL import Image
import os
import matplotlib.pyplot as plt
import time
from torchsummary import summary
import config
from facenet_pytorch import training
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch import optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from PIL import Image
import glob
import torchvision.models as models
from util import AverageMeter, learning_rate_decay, Logger
import collections
import math

In [2]:
# Root directory for dataset
data_root = "/home/mehmetyavuz/datasets/CelebA128/"
attr_root = "/home/mehmetyavuz/datasets/list_attr_celeba.txt"
# Number of workers for dataloader
workers = 4

# Batch size during training
batch_size = 64

# Spatial size of training images. All images will be resized to this
#   size using a transformer.
image_size = (128,128)
epochs = 100

In [3]:
class CelebA(data.Dataset):
    def __init__(self, data_path, attr_path, image_size, mode, selected_attrs):
        super(CelebA, self).__init__()
        self.data_path = data_path
        att_list = open(attr_path, 'r', encoding='utf-8').readlines()[1].split()
        atts = [att_list.index(att) + 1 for att in selected_attrs]
        images = np.loadtxt(attr_path, skiprows=2, usecols=[0], dtype=np.str)
        labels = np.loadtxt(attr_path, skiprows=2, usecols=atts, dtype=np.int)
        
        self.tf = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            ])
        self.tf_a = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomApply([
                transforms.ColorJitter(hue=.05, saturation=.05),
            ], p=0.8),
            transforms.RandomGrayscale(0.2),
        ])        
        if mode == 'train':
            self.images = images[:162770]
            self.labels = labels[:162770]

        if mode == 'valid':
            self.images = images[162770:182637]
            self.labels = labels[162770:182637]

        if mode == 'test':
            self.images = images[182637:]
            self.labels = labels[182637:]
                                       
        self.length = len(self.images)
    def __getitem__(self, index):
        if index < 162770:
            img = self.tf(self.tf_a(Image.open(os.path.join(self.data_path, self.images[index]))))
        else:
            img = self.tf(Image.open(os.path.join(self.data_path, self.images[index])))
        att = torch.tensor((self.labels[index] + 1) // 2)
        return img, att.to(torch.float32)
    def __len__(self):
        return self.length

In [4]:
attrs_default = ["5_o_Clock_Shadow", "Arched_Eyebrows", "Attractive", "Bags_Under_Eyes", "Bald", "Bangs", "Big_Lips", "Big_Nose", "Black_Hair", "Blond_Hair", "Blurry", "Brown_Hair", "Bushy_Eyebrows", "Chubby", "Double_Chin", "Eyeglasses", "Goatee", "Gray_Hair", "Heavy_Makeup", "High_Cheekbones", "Male", "Mouth_Slightly_Open", "Mustache", "Narrow_Eyes", "No_Beard", "Oval_Face", "Pale_Skin", "Pointy_Nose", "Receding_Hairline", "Rosy_Cheeks", "Sideburns", "Smiling", "Straight_Hair", "Wavy_Hair", "Wearing_Earrings", "Wearing_Hat", "Wearing_Lipstick", "Wearing_Necklace", "Wearing_Necktie", "Young"]

In [5]:
train_dataset = CelebA(data_root, attr_root, image_size, 'train', attrs_default)
train_loader = torch.utils.data.DataLoader(train_dataset,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          num_workers=workers)
dataset = CelebA(data_root, attr_root, image_size, 'valid', attrs_default)
val_loader = torch.utils.data.DataLoader(dataset,
                                          batch_size=batch_size,
                                          shuffle=False,
                                          num_workers=workers)
dataset = CelebA(data_root, attr_root, image_size, 'test', attrs_default)
test_loader = torch.utils.data.DataLoader(dataset,
                                          batch_size=batch_size,
                                          shuffle=False,
                                          num_workers=workers)

In [6]:
# Decide which device we want to run on
device = torch.device("cuda:0")

In [7]:
class Encoder(nn.Module):
    def __init__(self, hidden_n_1=2048, hidden_n_2=400):
        super(Encoder, self).__init__()
        self.fc_mu = nn.Linear(hidden_n_1, hidden_n_2)
        self.fc_var  = nn.Linear(hidden_n_1, hidden_n_2)
        
    def reparameterize(self,mu, log_var):
        vector_size = log_var.size()
        eps = Variable(torch.FloatTensor(vector_size).normal_()).cuda()
        std = log_var.mul(0.5).exp_()
        return eps.mul(std).add_(mu)

    def forward(self, x):
        rp = self.reparameterize(self.fc_mu(x), self.fc_var(x))
        return rp

In [8]:
class VGG(nn.Module):

    def __init__(self, features, num_classes, sobel):
        super(VGG, self).__init__()
        self.features = features
        self.classifier = nn.Sequential(
            nn.Linear(512 * 3 * 3, 4096),
            nn.ReLU(True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(True)
        )
        self.top_layer = nn.Linear(4096, num_classes)
        self._initialize_weights()
        if sobel:
            grayscale = nn.Conv2d(3, 1, kernel_size=1, stride=1, padding=0)
            grayscale.weight.data.fill_(1.0 / 3.0)
            grayscale.bias.data.zero_()
            sobel_filter = nn.Conv2d(1, 2, kernel_size=3, stride=1, padding=1)
            sobel_filter.weight.data[0,0].copy_(
                torch.FloatTensor([[1, 0, -1], [2, 0, -2], [1, 0, -1]])
            )
            sobel_filter.weight.data[1,0].copy_(
                torch.FloatTensor([[1, 2, 1], [0, 0, 0], [-1, -2, -1]])
            )
            sobel_filter.bias.data.zero_()
            self.sobel = nn.Sequential(grayscale, sobel_filter)
            for p in self.sobel.parameters():
                p.requires_grad = False
        else:
            self.sobel = None

    def forward(self, x):
        if self.sobel:
            x = self.sobel(x)
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        if self.top_layer:
            x = self.top_layer(x)
        return x

    def _initialize_weights(self):
        for y,m in enumerate(self.modules()):
            if isinstance(m, nn.Conv2d):
                #print(y)
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                for i in range(m.out_channels):
                    m.weight.data[i].normal_(0, math.sqrt(2. / n))
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()


def make_layers(input_dim, batch_norm):
    layers = []
    in_channels = input_dim
    cfg = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)


def vgg16(sobel=False, bn=True, out=40):
    dim = 2 + int(not sobel)
    model = VGG(make_layers(dim, bn), out, sobel)
    return model

In [9]:
resnet = vgg16(True)

In [10]:
vgg16 = models.vgg16(pretrained=True)

In [27]:
epoch = 1
for i, (name, param) in enumerate(vgg16.named_parameters()):
    j = i // 2
    if epoch % 2 == 0:
        if j % 2 == 0:
            if 'weight' in name:
                param.requires_grad = False
            if 'bias' in name:
                param.requires_grad = False
        else:
            if 'weight' in name:
                param.requires_grad = True
            if 'bias' in name:
                param.requires_grad = True       
    else:
        if j % 2 == 0:        
            if 'weight' in name:
                param.requires_grad = True
            if 'bias' in name:
                param.requires_grad = True
        else:
            if 'weight' in name:
                param.requires_grad = False
            if 'bias' in name:
                param.requires_grad = False         

In [28]:
for i, (name, param) in enumerate(vgg16.named_parameters()):
    if param.requires_grad:
        print(name)

features.0.weight
features.0.bias
features.5.weight
features.5.bias
features.10.weight
features.10.bias
features.14.weight
features.14.bias
features.19.weight
features.19.bias
features.24.weight
features.24.bias
features.28.weight
features.28.bias
classifier.3.weight
classifier.3.bias


In [None]:
optimizer = optim.Adam(resnet.parameters(), lr=0.00001)
Q = math.floor(len(train_dataset)*epochs)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, Q)

In [None]:
loss_fn = torch.nn.BCEWithLogitsLoss()
metrics = {
    'acc': training.accuracy_ml
} 

In [None]:
print('\n\nInitial')
print('-' * 10)
resnet.eval()
training.pass_epoch(
    resnet, loss_fn, test_loader,
    batch_metrics=metrics, show_running=True, device=device,
    #writer=writer
)

val_loss = 1
for epoch in range(epochs):
    print('\nEpoch {}/{}'.format(epoch + 1, epochs))
    print('-' * 10)

    resnet.train()
    training.pass_epoch(
        resnet, loss_fn, train_loader, optimizer, scheduler,
        batch_metrics=metrics, show_running=True, device=device,
        #writer=writer
    )

    resnet.eval()
    val_metrics = training.pass_epoch(
        resnet, loss_fn, val_loader,
        batch_metrics=metrics, show_running=True, device=device,
        #writer=writer
    )
    
    if val_metrics[0].item() < val_loss:
        val_loss = val_metrics[0].item()
        print('Test set Accuracy Lowest Validation Loss:')
        training.pass_epoch(
            resnet, loss_fn, test_loader,
            batch_metrics=metrics, show_running=True, device=device,
            #writer=writer
        )
        torch.save(resnet.state_dict(), "alexnet.pth")

#writer.close()