Autoencoder build with convolutions on CIFAR10.

In [None]:
!pip install progressbar2
import progressbar
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.cifar10.load_data()

train_images = train_images.reshape((-1, 32, 32, 3)) / 255
test_images = test_images.reshape((-1, 32, 32, 3)) / 255



In [None]:
def build_model(reuse, is_training, name, **kwargs):
  with tf.variable_scope(name, reuse=reuse):
    
    def instance_normalization(x):
      return tf.contrib.layers.instance_norm(x)
    
    def layer(x, filter, strides, keep_prob, activation, name):
      with tf.variable_scope(name):
        filter = tf.get_variable('filter_weight', filter, initializer=tf.truncated_normal_initializer(stddev=5e-2, dtype=tf.float32), dtype=tf.float32)
        x = tf.nn.conv2d(input=x, filter=filter, strides=strides, padding="SAME")
        x = instance_normalization(x)
        x = activation(x)
        x = x = tf.nn.dropout(x, keep_prob)
        return x
    
    def layer_transpose(x, filter, strides, keep_prob, activation, name):
      with tf.variable_scope(name):
        x = tf.layers.conv2d_transpose(x, filters=filter[3], kernel_size=filter[1], strides=strides[1], padding="SAME")
        x = instance_normalization(x)
        x = activation(x)
        x = x = tf.nn.dropout(x, keep_prob)
        return x
      
    def residual_block(x, in_filters, keep_prob, activation, name):
      rx = x
      x = layer(x, filter=[3, 3, in_filters, in_filters], strides=[1, 1, 1, 1], keep_prob=keep_prob, activation=activation, name=name+"Residual1")
      x = layer(x, filter=[3, 3, in_filters, in_filters], strides=[1, 1, 1, 1], keep_prob=keep_prob, activation=activation, name=name+"Residual2")
      return x + rx
    
    def encoder(x, keep_prob):
      activation = tf.nn.relu
      with tf.variable_scope("encoder"): #supposse the input is 4 dimesional
        x = layer(x, filter=[3, 3, 3, 60], strides=[1, 1, 1, 1], keep_prob=keep_prob, activation=activation, name="el1")
        x = layer(x, filter=[3, 3, 60, 120], strides=[1, 2, 2, 1], keep_prob=keep_prob, activation=activation, name="el2")
        x = layer(x, filter=[3, 3, 120, 240], strides=[1, 2, 2, 1], keep_prob=keep_prob, activation=activation, name="el4")
        x = layer(x, filter=[3, 3, 240, 8], strides=[1, 2, 2, 1], keep_prob=keep_prob, activation=activation, name="el3")
        return x

    def decoder(x, keep_prob):
      activation = tf.nn.relu
      with tf.variable_scope("decoder"):
          x = layer(x, filter=[3, 3, 8, 240], strides=[1, 1, 1, 1], keep_prob=keep_prob, activation=activation, name="dl1")
          x = residual_block(x, in_filters=240, keep_prob=keep_prob, activation=activation, name="dr1")
          x = layer_transpose(x, filter=[3, 3, 240, 120], strides=[1, 2, 2, 1], keep_prob=keep_prob, activation=activation, name="dl2")
          x = layer_transpose(x, filter=[3, 3, 120, 60], strides=[1, 2, 2, 1], keep_prob=keep_prob, activation=activation, name="dl4")
          x = layer_transpose(x, filter=[3, 3, 60, 3], strides=[1, 2, 2, 1], keep_prob=keep_prob, activation=activation, name="dl3")
          return x
    
    class Model(object):
      pass
    
    model = Model()
  
    model.X = tf.placeholder(dtype=tf.float32, shape=[None, None, None, None], name='X')
    model.Y = tf.contrib.layers.flatten(inputs=model.X)
    keep_prob = kwargs["keep_prob"]

    model.sampled = encoder(model.X, keep_prob)
    model.dec = decoder(model.sampled, keep_prob)

    model.unreshaped = tf.contrib.layers.flatten(inputs=model.dec)
    model.img_loss = tf.reduce_sum(tf.squared_difference(model.unreshaped, model.Y), 1)
    model.loss = tf.reduce_mean(model.img_loss)
    model.optimizer = tf.train.AdamOptimizer(0.001).minimize(model.loss)
    
    return model

In [None]:

def print_reconstruction(n, batch, d):
  m = 32
  canvas_orig = np.empty((m * n, m * n, 3))
  canvas_recon = np.empty((m * n, m * n, 3))
  for i in range(n):
      t = n
      for j in range(n):
          canvas_orig[i * m:(i + 1) * m, j * m:(j + 1) * m,:] = \
              batch[i*n+j].reshape([32, 32, 3])
      for j in range(n):
          canvas_recon[i * m:(i + 1) * m, j * m:(j + 1) * m,:] = \
              d[i*n+j].reshape([32, 32, 3])

  print("Original Images")
  plt.figure(figsize=(n, n))
  plt.imshow(canvas_orig, origin="upper", cmap="gray")
  plt.show()

  print("Reconstructed Images")
  plt.figure(figsize=(n, n))
  plt.imshow(canvas_recon, origin="upper", cmap="gray")
  plt.show()
    
def create_cnnae(**params):
  tf.reset_default_graph()
  
  model_train = build_model(is_training=True, reuse=False, name = "VA1", **params)
  model_test = build_model(is_training=False, reuse=True, name = "VA1", **params)
  
  
  with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
 
    batch_size = params['batch_size']
    steps_per_epoch =  params['elems'] //batch_size 

    print(steps_per_epoch, params['elems'], params['batch_size'])
    for epoch in range(params['num_epochs']):
      print("EPOCH {}:".format(epoch))
      loss_history1 = []
      loss_history2 = []
      progres = 0
      with progressbar.ProgressBar(max_value = steps_per_epoch) as bar:
        while True:
          bar.update(progres)
          progres += 1
          if progres > steps_per_epoch:
            break
          step = epoch * steps_per_epoch + progres

          batch = train_images[(progres-1)*batch_size:progres*batch_size]

          ls1, _ = sess.run([model_train.loss, model_train.optimizer], feed_dict = {model_train.X: batch})

          loss_history1.append(ls1)

          if not progres % 200:
              ls, d = sess.run([model_train.loss, model_train.dec], \
                                                     feed_dict = {model_train.X: batch})
              print_reconstruction(3, batch, d)

              print("AUTOENCODER 1: loss: {}, image_loss:{}"\
                    .format(ls, np.mean(d)))

            
      print("Loss Function for AE1:")
      print(len(loss_history1))
      plt.plot(loss_history1)
      

In [None]:
batch_size = 32
params={
  'batch_size': batch_size,
  'num_epochs': 20,
  'elems': train_images.shape[0],
  'keep_prob': 1
}

  
create_cnnae(**params)