In [1]:
DATA_DIR = '../input/animefacedataset'

#### I am using Kaggle notebook so that I don't have to download the data and GPUs here are much faster

In [1]:
image_size = 64
batch_size = 128
stats = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5)

In [1]:
import os
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as tt
import torch
import torch.nn as nn
import cv2
from tqdm.notebook import tqdm
import torch.nn.functional as F
from torchvision.utils import save_image
from torchvision.utils import make_grid
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [1]:
train_ds = ImageFolder(DATA_DIR, transform=tt.Compose([
    tt.Resize(image_size),
    tt.CenterCrop(image_size),
    tt.ToTensor(),
    tt.Normalize(*stats)]))

In [1]:
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=2, pin_memory=True)

In [1]:
def denorm(img_tensors):
    return img_tensors * stats[1][0] + stats[0][0]

def show_images(images, nmax=64):
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xticks([]); ax.set_yticks([])
    ax.imshow(make_grid(denorm(images.detach()[:nmax]), nrow=8).permute(1, 2, 0))

def show_batch(dl, nmax=64):
    for images, _ in dl:
        show_images(images, nmax)
        break

In [1]:
show_batch(train_dl)

In [1]:
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [1]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

In [1]:
train_dl = DeviceDataLoader(train_dl, device)

In [1]:
discriminator = nn.Sequential(
    # in: 3 x 64 x 64

    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 64 x 32 x 32

    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 128 x 16 x 16

    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 256 x 8 x 8

    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 512 x 4 x 4

    nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0, bias=False),
    # out: 1 x 1 x 1

    nn.Flatten(),
    nn.Sigmoid())

In [1]:
discriminator = to_device(discriminator, device)

In [1]:
latent_size = 128

In [1]:
generator = nn.Sequential(
    # in: latent_size x 1 x 1

    nn.ConvTranspose2d(latent_size, 512, kernel_size=4, stride=1, padding=0, bias=False),
    nn.BatchNorm2d(512),
    nn.ReLU(True),
    # out: 512 x 4 x 4

    nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True),
    # out: 256 x 8 x 8

    nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True),
    # out: 128 x 16 x 16

    nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True),
    # out: 64 x 32 x 32

    nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),
    nn.Tanh()
    # out: 3 x 64 x 64
)

In [1]:
xb = torch.randn(batch_size, latent_size, 1, 1) 
fake_images = generator(xb)
print(fake_images.shape)
show_images(fake_images)

In [1]:
generator = to_device(generator, device)

In [1]:
sample_dir = 'generated'
os.makedirs(sample_dir, exist_ok=True)

In [1]:
fixed_latent = torch.randn(64, latent_size, 1, 1, device=device)

In [1]:
fixed_latent.shape

In [1]:
def save_samples(index, latent_tensors, show=True):
    fake_images = generator(latent_tensors)
    fake_fname = 'generated-images.png'.format(index)
    save_image(denorm(fake_images), os.path.join(sample_dir, fake_fname), nrow=8)
    print('Saving', fake_fname)
    if show:
        fig, ax = plt.subplots(figsize=(8, 8))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_images.cpu().detach(), nrow=8).permute(1, 2, 0))
    return fake_images.cpu().detach()

In [1]:
model = {
    "discriminator": discriminator.to(device),
    "generator": generator.to(device)
}

criterion = {
    "discriminator": nn.BCELoss(),
    "generator": nn.BCELoss()
}
lr = 0.0002
epochs = 40


In [1]:
!nvidia-smi

In [1]:
model["discriminator"].train()
model["generator"].train()
torch.cuda.empty_cache()

# Losses & scores
losses_g = []
losses_d = []
real_scores = []
fake_scores = []
saved_samples = []

# Create optimizers
optimizer = {
    "discriminator": torch.optim.Adam(model["discriminator"].parameters(), 
                                      lr=lr, betas=(0.5, 0.999)),
    "generator": torch.optim.Adam(model["generator"].parameters(),
                                  lr=lr, betas=(0.5, 0.999))
}

for epoch in range(epochs):
    loss_d_per_epoch = []
    loss_g_per_epoch = []
    real_score_per_epoch = []
    fake_score_per_epoch = []
    for real_images, _ in tqdm(train_dl):
        # Train discriminator
        # Clear discriminator gradients
        optimizer["discriminator"].zero_grad()

        # Pass real images through discriminator
        real_preds = model["discriminator"](real_images)
        real_targets = torch.ones(real_images.size(0), 1, device=device)
        real_loss = criterion["discriminator"](real_preds, real_targets)
        cur_real_score = torch.mean(real_preds).item()

        # Generate fake images
        latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
        fake_images = model["generator"](latent)

        # Pass fake images through discriminator
        fake_targets = torch.zeros(fake_images.size(0), 1, device=device)
        fake_preds = model["discriminator"](fake_images)
        fake_loss = criterion["discriminator"](fake_preds, fake_targets)
        cur_fake_score = torch.mean(fake_preds).item()

        real_score_per_epoch.append(cur_real_score)
        fake_score_per_epoch.append(cur_fake_score)

        # Update discriminator weights
        loss_d = real_loss + fake_loss
        loss_d.backward()
        optimizer["discriminator"].step()
        loss_d_per_epoch.append(loss_d.item())


        # Train generator
        # Clear generator gradients
        optimizer["generator"].zero_grad()

        # Generate fake images
        latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
        fake_images = model["generator"](latent)

        # Try to fool the discriminator
        preds = model["discriminator"](fake_images)
        targets = torch.ones(batch_size, 1, device=device)
        loss_g = criterion["generator"](preds, targets)

        # Update generator weights
        loss_g.backward()
        optimizer["generator"].step()
        loss_g_per_epoch.append(loss_g.item())

    # Record losses & scores
    losses_g.append(np.mean(loss_g_per_epoch))
    losses_d.append(np.mean(loss_d_per_epoch))
    real_scores.append(np.mean(real_score_per_epoch))
    fake_scores.append(np.mean(fake_score_per_epoch))

    # Log losses & scores (last batch)
    print("Epoch [{}/{}], loss_g: {:.4f}, loss_d: {:.4f}, real_score: {:.4f}, fake_score: {:.4f}".format(
        epoch+1, epochs, 
        losses_g[-1], losses_d[-1], real_scores[-1], fake_scores[-1]))

    # Save generated images
    if epoch+1 in [5, 10, 20, 40]:
        saved_samples.append(generator(fixed_latent).cpu().detach().numpy())
        print('Samples are saved')

#### Actually samples are not saved they are just recorded into python list

In [1]:
saved_samples = np.array(saved_samples)
saved_samples.shape

In [1]:
real_imgs = []
for images, _ in train_dl:
    real_imgs = images[:64].cpu().detach().numpy()
    break
real_imgs.shape

In [1]:
real_imgs.shape

In [1]:
show_images(torch.from_numpy(real_imgs)) # real images

In [1]:
show_images(torch.from_numpy(saved_samples[0])) # generated images when batch = 5

In [1]:
show_images(torch.from_numpy(saved_samples[1])) # generated images when batch = 10

In [1]:
show_images(torch.from_numpy(saved_samples[2])) # generated images when batch = 20

In [1]:
show_images(torch.from_numpy(saved_samples[3])) # generated images when batch = 40

#### Now let's cluster these images

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import pandas as pd
# for loading/processing the images  
from keras.preprocessing.image import load_img 
from keras.preprocessing.image import img_to_array 
from keras.applications.vgg16 import preprocess_input 

# models 
from keras.applications.vgg16 import VGG16 
from keras.models import Model

# clustering and dimension reduction
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA

# for everything else
import os
import numpy as np
import matplotlib.pyplot as plt
from random import randint
import pandas as pd
import pickle

In [1]:
import keras
from keras.applications.vgg16 import VGG16
from keras.models import Model
from keras.layers import Dense, GlobalAveragePooling2D, Dropout, UpSampling2D, Conv2D, MaxPooling2D, Activation, Flatten

base_model = VGG16(weights='imagenet', input_shape=(64, 64, 3), include_top=False)

base_out = base_model.output
flt = Flatten()(base_out)

# dns = Dense(units=100)(flt)

model = Model(base_model.input, flt) #Let's leave it this way so that we keep al the information about image, also stardatization will be made further



# model.layers[-1:][0].get_weights()[0]

# shape = model.layers[-1:][0].get_weights()[0].shape #get shape of the last dense layer
# print(shape)

# weights = np.ones(shape) #(2048, 100) shaped ones
# bias = np.zeros((100,)) #(100,) shaped zeros
# model.layers[-1].set_weights([weights, bias]) #setting new weights to the last dense layer

In [1]:
model.summary()

In [1]:
real_imgs = real_imgs.transpose(0, 2, 3, 1)
real_imgs.shape # DO NOT RUN THESE ONES SEVERAL TIMES

In [1]:
saved_samples = saved_samples.transpose(0, 1, 3, 4, 2)
saved_samples.shape # DO NOT RUN THESE ONES SEVERAL TIMES

In [1]:
all_classes = np.append(saved_samples, [real_imgs], axis=0)
all_classes.shape

In [1]:
 def extract_features(img, model):
    # reshape the data for the model reshape(num_of_samples, dim 1, dim 2, channels)
    reshaped_img = img.reshape(1,64,64,3) 
    # prepare image for model
    imgx = preprocess_input(reshaped_img)
    # get the feature vector
    features = model.predict(imgx, use_multiprocessing=True)
    return features

In [1]:
dataset = []
for i in range(len(all_classes)):
    print(i)
    for j in range(len(all_classes[0])):
        dataset.append(extract_features(all_classes[i,j], model)[0])

In [1]:
dataset = np.array(dataset)
dataset.shape

In [1]:
dataset.mean()

In [1]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
dataset = scaler.fit_transform(dataset)
dataset.mean()

In [1]:
kmeans = KMeans(n_clusters=5, random_state=22)

In [1]:
%%time
kmeans.fit(dataset)

In [1]:
clusters = kmeans.predict(dataset)
classes = np.array([0]*64+[1]*64+[2]*64+[3]*64+[4]*64)

In [1]:
np.unique(clusters, return_counts=True)

In [1]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

confusion_matrix(classes, clusters)

##### Judging by the confusion matrix cluster values should be adjusted (calibrated)
##### Let's do this as follows: 
##### cluster=0 gets value 2 (because 21 observations of 64 of 2nd class are concentrated at cluster 0)
#####                           cluster=1 gets value 4
#####                           cluster=2 gets value 1
#####                           cluster=3 gets value 3
#####                           cluster=4 gets value 0

In [1]:
clusters

In [1]:
new_clusters = np.empty(5, dtype=int)
new_clusters[[0,1,2,3,4]]=[2,4,1,3,0]
clusters = new_clusters[clusters] # The values have been replaced

In [1]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

print(confusion_matrix(classes, clusters))
print(classification_report(classes, clusters))

### Results are not that bad, now let's see if it can distinguish between real images and generated by 40 epochs images

In [1]:
kmeans = KMeans(n_clusters=2, random_state=22)
kmeans.fit(dataset[-128:])
pred = kmeans.predict(dataset[-128:])

In [1]:
pred

In [1]:
classes = np.array([1]*64+[0]*64)
classes

In [1]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

print(confusion_matrix(pred, classes), '\n\n')
print(classification_report(pred, classes))

### Great results! It does not struggle identifying fake images at all, although there are some missclassifications of real images