In [26]:
from keras.layers import Conv2D, BatchNormalization, Input, GlobalAveragePooling2D, Dense
from keras.models import Model, Sequential
from keras.layers.advanced_activations import LeakyReLU
from keras.layers import Conv2DTranspose, Reshape

import numpy as np
import pandas as pd
import os
from keras.optimizers import Adam

#import cv2
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import clear_output

from bs4 import BeautifulSoup
import requests
from re import sub

%matplotlib inline

In [27]:
import sys
sys.path.append('..')
import utils as p

## load data and setup environment

In [28]:
def getPunkFilename(d, x):
    return f'''{d}/punk{"%04d" % x.id}.png'''

__ROOT_DIR__ = "/home/david/artwork/zombiepunx"
__IMAGE_DIR__=f"{__ROOT_DIR__}/data/punx/images/training"
df = pd.read_csv(f"{__ROOT_DIR__}/data/punx/punks_df.csv", index_col='pid')
df['img_uri'] = df.agg(lambda x: getPunkFilename(__IMAGE_DIR__, x), axis=1)
df.iloc[0].img_uri

'/home/david/artwork/zombiepunx/data/punx/images/training/punk0000.png'

In [29]:
df.head(3)

Unnamed: 0_level_0,id,alien,ape,zombie,female,male,beanie,choker,pilotHelmet,tiara,...,bigShades,nerdGlasses,blackLipstick,mole,purpleLipstick,hotLipstick,cigarette,earring,straightHairDark,img_uri
pid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,0,-1.0,-1.0,-1.0,1.0,-1.0,-1.0,-1.0,-1.0,-1.0,...,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,1.0,-1.0,/home/david/artwork/zombiepunx/data/punx/image...
1,1,-1.0,-1.0,-1.0,-1.0,1.0,-1.0,-1.0,-1.0,-1.0,...,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,/home/david/artwork/zombiepunx/data/punx/image...
2,2,-1.0,-1.0,-1.0,1.0,-1.0,-1.0,-1.0,-1.0,-1.0,...,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,/home/david/artwork/zombiepunx/data/punx/image...


### GAN Hyperparameters

In [70]:

DIM = 24

# size of noise vector
LATENT_DIM_GAN = 24 

# filter size in conv layer
FILTER_SIZE = 5

# number of filters in conv layer
NET_CAPACITY = 16

# batch size
BATCH_SIZE_GAN = 32

# interval for displaying generated images
PROGRESS_INTERVAL = 80 

# number of discriminator updates per alternating training iteration
DISC_UPDATES = 1  
# number of generator updates per alternating training iteration
GEN_UPDATES = 1 

# directory for storing generated images
ROOT_DIR = '../viz'
if not os.path.isdir(ROOT_DIR):
    os.mkdir(ROOT_DIR)


In [76]:
p_9999 = p.get_punk(9999)
p_9999_flat = p.flatten(p_9999)
p_9999_encoded = p.v_color_map_encode(p_9999_flat)
p_9999_encoded.shape

(24, 24, 1)

In [59]:
def sample_punks(df, size):
    samples = df.sample(frac=1).iloc[0:size]
    X = np.empty(shape=(size, DIM, DIM, 1))
    for i in range(0, size):
        file = samples.iloc[i]
        img = mpimg.imread(file.img_uri)
        img_flat = p.flatten(img)
        img_encoded = p.v_color_map_encode(img_flat)
        X[i] = img_encoded
    return X

In [64]:
sample_punks(df, 1)[0][0][0:5]

array([[220.],
       [220.],
       [220.],
       [220.],
       [220.]])

In [71]:

def build_generator(start_filters, filter_size, latent_dim):
  
  # function for building a CNN block for upsampling the image
  def add_generator_block(x, filters, filter_size):
      x = Conv2DTranspose(filters, filter_size, strides=2, padding='same')(x)
      x = BatchNormalization()(x)
      x = LeakyReLU(0.3)(x)
      return x

  # input is a noise vector 
  inp = Input(shape=(latent_dim,))

  # projection of the noise vector into a tensor with 
  # same shape as last conv layer in discriminator
  x = Dense(4 * 4 * (start_filters * 8), input_dim=latent_dim)(inp)
  x = BatchNormalization()(x)
  x = Reshape(target_shape=(4, 4, start_filters * 8))(x)

  # design the generator to upsample the image 4x
  x = add_generator_block(x, start_filters * 4, filter_size)
  x = add_generator_block(x, start_filters * 2, filter_size)
  x = add_generator_block(x, start_filters, filter_size)
  x = add_generator_block(x, start_filters, filter_size)    

  # turn the output into a 3D tensor, an image with 1 channels 
  x = Conv2D(1, kernel_size=5, padding='same', activation='tanh')(x)
  
  return Model(inputs=inp, outputs=x)

### discriminator

In [72]:

def build_discriminator(start_filters, spatial_dim, filter_size):
    
    # function for building a CNN block for downsampling the image
    def add_discriminator_block(x, filters, filter_size):
      x = Conv2D(filters, filter_size, padding='same')(x)
      x = BatchNormalization()(x)
      x = Conv2D(filters, filter_size, padding='same', strides=2)(x)
      x = BatchNormalization()(x)
      x = LeakyReLU(0.3)(x)
      return x
    
    # input is an image with shape spatial_dim x spatial_dim and 1 channels
    inp = Input(shape=(spatial_dim, spatial_dim, 1))

    #  to downsample the image 4x
    x = add_discriminator_block(inp, start_filters, filter_size)
    x = add_discriminator_block(x, start_filters * 2, filter_size)
    x = add_discriminator_block(x, start_filters * 4, filter_size)
    x = add_discriminator_block(x, start_filters * 8, filter_size)
    
    # average and return a binary output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1, activation='sigmoid')(x)
    
    return Model(inputs=inp, outputs=x)

## build GAN from generator and discriminator

In [73]:
def construct_models(verbose=False):
    ### discriminator
    discriminator = build_discriminator(NET_CAPACITY, DIM, FILTER_SIZE)
    # compile discriminator
    discriminator.compile(loss='binary_crossentropy', 
                          optimizer=Adam(lr=0.0002), 
                          metrics=['mae'])

    ### generator
    # do not compile generator
    generator = build_generator(NET_CAPACITY, FILTER_SIZE, LATENT_DIM_GAN)

    ### DCGAN 
    gan = Sequential()
    gan.add(generator)
    gan.add(discriminator)
    discriminator.trainable = False 
    gan.compile(loss='binary_crossentropy', 
                optimizer=Adam(lr=0.0002), 
                metrics=['mae'])

    if verbose: 
        generator.summary()
        discriminator.summary()
        gan.summary()
        
    return generator, discriminator, gan
  
(generator_punk, 
discriminator_punk,
gan_punk) = construct_models(verbose=True)

Model: "model_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, 24)]              0         
_________________________________________________________________
dense_3 (Dense)              (None, 2048)              51200     
_________________________________________________________________
batch_normalization_21 (Batc (None, 2048)              8192      
_________________________________________________________________
reshape_1 (Reshape)          (None, 4, 4, 128)         0         
_________________________________________________________________
conv2d_transpose_4 (Conv2DTr (None, 8, 8, 64)          204864    
_________________________________________________________________
batch_normalization_22 (Batc (None, 8, 8, 64)          256       
_________________________________________________________________
leaky_re_lu_12 (LeakyReLU)   (None, 8, 8, 64)          0   

# Training

ref: https://towardsdatascience.com/generative-adversarial-network-gan-for-dummies-a-step-by-step-tutorial-fdefff170391

In [74]:
def run_training(generator, 
                 discriminator, 
                 gan, 
                 df, start_it=0, 
                 num_epochs=1000, 
                 get_real_images=sample_punks):
  # list for storing loss
  avg_loss_discriminator = []
  avg_loss_generator = []
  total_it = start_it

  # main training loop
  for epoch in range(num_epochs):

      # alternating training loop
      loss_discriminator = []
      loss_generator = []
      for it in range(200): 

          #### Discriminator training loop ####
          for i in range(DISC_UPDATES): 
              # select a random set of real images
              imgs_real = get_real_images(df, BATCH_SIZE_GAN)
              # generate a set of random noise vectors
              noise = np.random.randn(BATCH_SIZE_GAN, LATENT_DIM_GAN)
              # generate a set of fake images using the generator
              imgs_fake = generator.predict(noise)
              # train the discriminator on real images with label 1
              d_loss_real = discriminator.train_on_batch(imgs_real, 
                                                         np.ones([BATCH_SIZE_GAN]))[1]
              # train the discriminator on fake images with label 0
              d_loss_fake = discriminator.train_on_batch(imgs_fake, 
                                                         np.zeros([BATCH_SIZE_GAN]))[1]

          # display some fake images for visual control of convergence
          if total_it % PROGRESS_INTERVAL == 0:
              plt.figure(figsize=(5,2))
              num_vis = min(BATCH_SIZE_GAN, 5)
              imgs_real = get_real_images(df, num_vis)
              noise = np.random.randn(num_vis, LATENT_DIM_GAN)
              imgs_fake = generator.predict(noise)
              for obj_plot in [imgs_fake, imgs_real]:
                  plt.figure(figsize=(num_vis * 3, 3))
                  for b in range(num_vis):
                      disc_score = float(discriminator.predict(np.expand_dims(obj_plot[b], axis=0))[0])
                      plt.subplot(1, num_vis, b + 1)
                      plt.title(str(round(disc_score, 3)))
                      plt.imshow(obj_plot[b] * 0.5 + 0.5) 
                  if obj_plot is imgs_fake:
                      plt.savefig(os.path.join(ROOT_DIR, 
                                               str(total_it).zfill(10) + '.jpg'), 
                                  format='jpg',
                                  bbox_inches='tight')
                  plt.show()  

          #### Generator training loop ####
          loss = 0
          y = np.ones([BATCH_SIZE_GAN, 1]) 
          for j in range(GEN_UPDATES):
              # generate a set of random noise vectors
              noise = np.random.randn(BATCH_SIZE_GAN, LATENT_DIM_GAN)
              # train the generator on fake images with label 1
              loss += gan.train_on_batch(noise, y)[1]

          # store loss
          loss_discriminator.append((d_loss_real + d_loss_fake) / 2.)        
          loss_generator.append(loss / GEN_UPDATES)
          total_it += 1

      # visualize loss
      clear_output(True)
      print('Epoch', epoch)
      avg_loss_discriminator.append(np.mean(loss_discriminator))
      avg_loss_generator.append(np.mean(loss_generator))
      plt.plot(range(len(avg_loss_discriminator)), avg_loss_discriminator)
      plt.plot(range(len(avg_loss_generator)), avg_loss_generator)
      plt.legend(['discriminator loss', 'generator loss'])
      plt.show()

  return generator, discriminator, gan


In [75]:

(generator_punk,
 discriminator_punk,
 gan_punk) = run_training(generator_punk,
                          discriminator_punk,
                          gan_punk,
                          num_epochs=5,
                          df=df)                                                 

ValueError: in user code:

    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:805 train_function  *
        return step_function(self, iterator)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:795 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/distribute/distribute_lib.py:3417 _call_for_each_replica
        return fn(*args, **kwargs)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:788 run_step  **
        outputs = model.train_step(data)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py:754 train_step
        y_pred = self(x, training=True)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/keras/engine/base_layer.py:998 __call__
        input_spec.assert_input_compatibility(self.input_spec, inputs, self.name)
    /home/david/miniconda3/envs/zombiepunx/lib/python3.8/site-packages/tensorflow/python/keras/engine/input_spec.py:271 assert_input_compatibility
        raise ValueError('Input ' + str(input_index) +

    ValueError: Input 0 is incompatible with layer model_2: expected shape=(None, 24, 24, 1), found shape=(32, 64, 64, 1)


In [135]:
generator_punk

<tensorflow.python.keras.engine.functional.Functional at 0x7fe7587893a0>