In [1]:
# handle imports 
import matplotlib.pyplot as plt 
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from pathlib import Path

from tensorflow.keras import layers, Model
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten, Reshape, Conv2DTranspose, MaxPooling2D, UpSampling2D, LeakyReLU
from tensorflow.keras.activations import relu
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import binary_accuracy

import os

In [2]:
img_data_dir  = Path("C:/Users/ryanm/anaconda3/envs/SoftComputing/Lib/site-packages/tensorflow_datasets/downloads")
ds_train, ds_info = tfds.load('celeb_a', split='test', shuffle_files=False, with_info=True)

[1mDownloading and preparing dataset celeb_a (1.38 GiB) to C:\Users\ryanm\tensorflow_datasets\celeb_a\0.3.0...[0m


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

NotFoundError: NewRandomAccessFile failed to Create/Open: C:/Users/ryanm/anaconda3/envs/SoftComputing/Lib/site-packages/tensorflow_datasets/downloads/img_align_celeba.zip : The system cannot find the path specified.
; No such process

In [None]:
fig = tfds.show_examples(ds_train, ds_info)

In [None]:
features['attributes'].keys()

In [None]:
blond = features['attributes']['Blond_Hair']

In [None]:
sample_size = 1000
batch_train = ds_train.batch(sample_size)
features = next(iter(batch_train.take(1)))
sample_images = features['image'][blond]
new_image = np.median(sample_images, axis=0)
plt.imshow(new_image.astype(np.uint8))

In [None]:
def sample(req_attribs, features):
    sample_bool = []
    for i in range(sample_size):
        match = True
        for req_attrib in req_attribs:
            if features['attributes'][req_attrib][i] == False:
                match = False
                break
        sample_bool.append(match)
    return features['image'][np.array(sample_bool, dtype=np.bool)]

In [None]:
male = sample({"Male":True}, features)
female = sample({"Male":False}, features)

In [None]:
# average male
avg_male = np.mean(male, axis=0)
plt.imshow(avg_male.astype(np.uint8))

In [None]:
# average female
avg_female = np.mean(female, axis=0)
plt.imshow(avg_female.astype(np.uint8))

In [None]:
# random sample

In [None]:
sample_images = features['image']
rand_img = np.zeros(sample_images.shape[1:], dtype=np.uint8)
for i in range(rand_img.shape[0])
    for j in range(rand_img.shape[1]):
        rand_int = np.random.randint(0, sample_images.shape[0])
        new_image[i, j] = sample_images[rand_int, i, j]
        
plt.imshow(rand_img.astype(np.uint8))

https://www.tensorflow.org/tutorials/generative/dcgan

In [3]:
def binarize(image, label):
    image = tf.cast(image, tf.float32)
    image = tf.math.round(image / 255.)
    return image, tf.cast(image, tf.int32)

(mnist_train, mnist_test), mnist_info = tfds.load('mnist',
                                                 split = ['test', 'test'],
                                                 shuffle_files=True,
                                                 as_supervised=True,
                                                 with_info=True)

In [4]:
mnist_train

<_OptionsDataset shapes: ((28, 28, 1), ()), types: (tf.uint8, tf.int64)>

In [8]:
class MaskedConv2D(tf.keras.layers.Layer):
    def __init__(self, mask_type, kernel=5, filters=1):
        super(MaskedConv2D, self).__init__()
        self.kernel = kernel
        self.filters= filters
        self.mask_type = mask_type
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=[self.kernel,
                                       self.kernel,
                                       input_shape[-1],
                                       self.filters],
                                       initializer='glorot_normal',
                                       trainable = True)
        
        self.b = self.add_weight(shape=(self.filters), initializer = 'zeros', trainable=True)
                
        mask = np.ones(self.kernel**2, dtype=np.float32)
        center = len(mask) // 2
        mask[center+1:] = 0
        if self.mask_type == 'A':
            mask[center]=0
        mask = mask.reshape((self.kernel, self.kernel, 1, 1))
        self.mask = tf.constant(mask, dtype='float32')
        
    def call(self, inputs):
        masked_w = tf.math.multiply(self.w, self.mask)
        output = tf.nn.conv2d(inputs, masked_w, 1 , "SAME") + self.b
        return tf.nn.relu(output)
    

In [9]:
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential


class ResidualBlock(layers.Layer):
    def __init__(self, h=32):
        super(ResidualBlock, self).__init__()
        
        self.forward = Sequential([MaskedConv2D('B', kernel=1, filters=h),
                                  MaskedConv2D('B', kernel=3, filters=h),
                                  MaskedConv2D('B', kernel=1, filters=2*h)])
        
    def call(self, inputs):
        x = self.forward(inputs)
        return x + inputs
    

In [12]:
def SimplePixelCnn(hidden_features=64,
                  output_features=64,
                  resblocks_num=7):
    
    inputs= layers.Input(shape=[28,28,1])
    x = inputs
    
    x = MaskedConv2D('A', kernel=7, filters=2*hidden_features)(x)
    
    for _ in range(resblocks_num):
        x = ResidualBlock(hidden_features)(x)
        
    x = layers.Conv2D(output_features, (1,1), padding='same', activation='relu')(x)
    x = layers.Conv2D(1, (1,1), padding='same', activation='sigmoid')(x)
    
    return tf.keras.Model(inputs=inputs, outputs=x, name='PixelCnn')

In [13]:
pixelcnn = SimplePixelCnn()
pixelcnn.summary()

Model: "PixelCnn"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
masked_conv2d_22 (MaskedConv (None, 28, 28, 128)       6400      
_________________________________________________________________
residual_block_7 (ResidualBl (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_8 (ResidualBl (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_9 (ResidualBl (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_10 (ResidualB (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_11 (ResidualB (None, 28, 28, 128)       535

In [None]:
(ds_train, ds_test_), ds_info = tfds.load('mnist', 
                              split=['train', 'test'], 
                              shuffle_files=True,
                              as_supervised=True,
                              with_info=True)


batch_size = 256
def preprocess(image, label):
    image = tf.cast(image, tf.float32)
    image = image/255.
    return image, image


ds_train = ds_train.map(preprocess)
ds_train = ds_train.cache() # put dataset into memory
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)

ds_test = ds_test_.map(preprocess).batch(batch_size).cache().prefetch(batch_size)

In [None]:
def preprocess_with_label(image, label):
    image = tf.cast(image, tf.float32)
    image = tf.math.round(image/255.)
    return image, label

ds_test_label = ds_test_.map(preprocess_with_label).batch(1000)

In [None]:
def Encoder(z_dim):
    inputs  = layers.Input(shape=[28,28,1])
    
    x = inputs    
    x = Conv2D(filters=8,  kernel_size=(3,3), strides=2, padding='same', activation='relu')(x)
    x = Conv2D(filters=8,  kernel_size=(3,3), strides=1, padding='same', activation='relu')(x)
    x = Conv2D(filters=8,  kernel_size=(3,3), strides=2, padding='same', activation='relu')(x)
    x = Conv2D(filters=8,  kernel_size=(3,3), strides=1, padding='same', activation='relu')(x)
    x = Flatten()(x)
    out = Dense(z_dim)(x)
    
    return Model(inputs=inputs, outputs=out, name='encoder')

def Decoder(z_dim):
    inputs  = layers.Input(shape=[z_dim])
    x = inputs    
    x = Dense(7*7*64, activation='relu')(x)
    x = Reshape((7,7,64))(x)

    x = Conv2D(filters=64, kernel_size=(3,3), strides=1, padding='same', activation='relu')(x)
    x = UpSampling2D((2,2))(x)
    
    x = Conv2D(filters=32, kernel_size=(3,3), strides=1, padding='same', activation='relu')(x)
    x = UpSampling2D((2,2))(x)    

    out = Conv2D(filters=1, kernel_size=(3,3), strides=1, padding='same', activation='sigmoid')(x)
    
    #return out          
    return Model(inputs=inputs, outputs=out, name='decoder')

class Autoencoder:
    def __init__(self, z_dim):
        self.encoder = Encoder(z_dim)
        self.decoder = Decoder(z_dim)
        
        model_input = self.encoder.input
        model_output = self.decoder(self.encoder.output)
        self.model = Model(model_input, model_output)

In [None]:
autoencoder = Autoencoder(z_dim=10)

In [None]:
model_path = "./models/autoencoder.h5"
os.makedirs("./models", exist_ok=True)

checkpoint = ModelCheckpoint(model_path, 
                             monitor= "val_loss", 
                             verbose=1, 
                             save_best_only=True, 
                             mode= "auto", 
                             save_weights_only = False)

early = EarlyStopping(monitor= "val_loss", 
                      mode= "auto", 
                      patience = 5)

callbacks_list = [checkpoint, early]

autoencoder.model.compile(
    loss = "mse",
    optimizer=tf.keras.optimizers.RMSprop(learning_rate=3e-4))
    #metrics=[tf.keras.losses.BinaryCrossentropy()])
autoencoder.model.fit(ds_train, validation_data=ds_test,
                epochs = 5, callbacks = callbacks_list)

In [None]:
images, labels = next(iter(ds_test))
autoencoder.model = load_model(model_path)
outputs = autoencoder.model.predict(images)

# Display
grid_col = 10
grid_row = 2

f, axarr = plt.subplots(grid_row, grid_col, figsize=(grid_col*1.1, grid_row))

i = 0
for row in range(0, grid_row, 2):
    for col in range(grid_col):
        axarr[row,col].imshow(images[i,:,:,0], cmap='gray')
        axarr[row,col].axis('off')
        axarr[row+1,col].imshow(outputs[i,:,:,0], cmap='gray')
        axarr[row+1,col].axis('off')        
        i += 1
f.tight_layout(0.1, h_pad=0.2, w_pad=0.1)        
plt.show()

In [None]:
autoencoder_2 = Autoencoder(z_dim=2)

early = EarlyStopping(monitor= "val_loss", 
                      mode= "auto", 
                      patience = 5)

callbacks_list = [early]

autoencoder_2.model.compile(
    loss = "mse",
    optimizer=tf.keras.optimizers.RMSprop(learning_rate=1e-3))

autoencoder_2.model.fit(ds_train, validation_data=ds_test,
                epochs = 5, callbacks = callbacks_list)

In [None]:
images, labels = next(iter(ds_test_label))
outputs = autoencoder_2.encoder.predict(images)
plt.figure(figsize=(8,8))
plt.scatter(outputs[:,0], outputs[:,1], c=labels, cmap='RdYlBu', s=3)
plt.colorbar()

In [None]:
z_samples = np.array([[z1, z2] for z2 in np.arange(-5, 5, 1.) for z1 in np.arange(-5, 5, 1.)])
images = autoencoder_2.decoder.predict(z_samples)
grid_col = 10
grid_row = 10

f, axarr = plt.subplots(grid_row, grid_col, figsize=(grid_col, grid_row))

i = 0
for row in range(grid_row):
    for col in range(grid_col):
        axarr[row,col].imshow(images[i,:,:,0], cmap='gray')
        axarr[row,col].axis('off')   
        i += 1
f.tight_layout(0.1, h_pad=0.2, w_pad=0.1)        
plt.show() 

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import Layer, Input, Conv2D, Dense, Flatten, Reshape, Lambda, Dropout
from tensorflow.keras.layers import Conv2DTranspose, MaxPooling2D, UpSampling2D, LeakyReLU, BatchNormalization
from tensorflow.keras.activations import relu
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import tensorflow_datasets as tfds

import cv2
import pickle
import numpy as np
import matplotlib.pyplot as plt
import datetime, os
import warnings
warnings.filterwarnings('ignore')

In [None]:
strategy = tf.distribute.MirroredStrategy()
num_devices = strategy.num_replicas_in_sync
print('Number of devices: {}'.format(num_devices))

In [None]:
batch_size = 128*num_devices

def preprocess(sample):
    image = sample['image']
    image = tf.image.resize(image, [112,112])
    image = tf.cast(image, tf.float32)/255.
    return image, image

ds_train = ds_train.map(preprocess)
ds_train = ds_train.shuffle(batch_size*4)
ds_train = ds_train.batch(batch_size).prefetch(batch_size)

ds_test = ds_test_.map(preprocess).batch(batch_size).prefetch(batch_size)

train_num = ds_info.splits['train'].num_examples
test_num = ds_info.splits['test'].num_examples

In [None]:
class GaussianSampling(Layer):        
    def call(self, inputs):
        means, logvar = inputs
        epsilon = tf.random.normal(shape=tf.shape(means), mean=0., stddev=1.)
        samples = means + tf.exp(0.5*logvar)*epsilon

        return samples
    
class DownConvBlock(Layer):
    count = 0
    def __init__(self, filters, kernel_size=(3,3), strides=1, padding='same'):
        super(DownConvBlock, self).__init__(name=f"DownConvBlock_{DownConvBlock.count}")
        DownConvBlock.count+=1
        self.forward = Sequential([Conv2D(filters, kernel_size, strides, padding)])
        self.forward.add(BatchNormalization())
        self.forward.add(layers.LeakyReLU(0.2))
        
    def call(self, inputs):
        return self.forward(inputs)

class UpConvBlock(Layer):
    count = 0
    def __init__(self, filters, kernel_size=(3,3), padding='same'):
        super(UpConvBlock, self).__init__(name=f"UpConvBlock_{UpConvBlock.count}")
        UpConvBlock.count += 1
        self.forward = Sequential([Conv2D(filters, kernel_size, 1, padding),])
        self.forward.add(layers.LeakyReLU(0.2))
        self.forward.add(UpSampling2D((2,2)))
        
    def call(self, inputs):
        return self.forward(inputs)
    
class Encoder(Layer):
    def __init__(self, z_dim, name='encoder'):
        super(Encoder, self).__init__(name=name)
        
        self.features_extract = Sequential([
            DownConvBlock(filters = 32, kernel_size=(3,3), strides=2),
            DownConvBlock(filters = 32, kernel_size=(3,3), strides=2),
            DownConvBlock(filters = 64, kernel_size=(3,3), strides=2),
            DownConvBlock(filters = 64, kernel_size=(3,3), strides=2),
            Flatten()])
        
        self.dense_mean = Dense(z_dim, name='mean')
        self.dense_logvar = Dense(z_dim, name='logvar')
        self.sampler = GaussianSampling()
        
    def call(self, inputs):
        x = self.features_extract(inputs)
        mean = self.dense_mean(x)
        logvar = self.dense_logvar(x)
        z = self.sampler([mean, logvar])
        return z, mean, logvar

class Decoder(Layer):
    def __init__(self, z_dim, name='decoder'):
        super(Decoder, self).__init__(name=name)
            
        self.forward = Sequential([
                        Dense(7*7*64, activation='relu'),
                        Reshape((7,7,64)),
                        UpConvBlock(filters=64, kernel_size=(3,3)),
                        UpConvBlock(filters=64, kernel_size=(3,3)),
                        UpConvBlock(filters=32, kernel_size=(3,3)),
                        UpConvBlock(filters=32, kernel_size=(3,3)),
                        Conv2D(filters=3, kernel_size=(3,3), strides=1, padding='same', activation='sigmoid'),
                
        ])

    def call(self, inputs):
        return self.forward(inputs)

    
class VAE(Model):
    def __init__(self, z_dim, name='VAE'):
        super(VAE, self).__init__(name=name)
        self.encoder = Encoder(z_dim)
        self.decoder = Decoder(z_dim)
        self.mean = None
        self.logvar = None
        
    def call(self, inputs):
        z, self.mean, self.logvar = self.encoder(inputs)
        out = self.decoder(z)           
        return out

In [None]:
if num_devices>1:
    with strategy.scope():
        vae = VAE(z_dim=200)
else:
    vae = VAE(z_dim=200)

In [None]:
def vae_kl_loss(y_true, y_pred):
    kl_loss =  - 0.5 * tf.reduce_mean(1 + vae.logvar - tf.square(vae.mean) - tf.exp(vae.logvar))
    return kl_loss    

def vae_rc_loss(y_true, y_pred):
    #rc_loss = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    rc_loss = tf.keras.losses.MSE(y_true, y_pred)
    return rc_loss

def vae_loss(y_true, y_pred):
    kl_loss = vae_kl_loss(y_true, y_pred)
    rc_loss = vae_rc_loss(y_true, y_pred)
    kl_weight_const = 0.01
    return kl_weight_const*kl_loss + rc_loss

In [None]:
model_path = "./models/my_vae_celeb_a.h5"
os.makedirs("./models", exist_ok=True)

checkpoint = ModelCheckpoint(model_path, 
                             monitor= "vae_rc_loss", 
                             verbose=1, 
                             save_best_only=True, 
                             mode= "auto", 
                             save_weights_only = True)

early = EarlyStopping(monitor= "vae_rc_loss", 
                      mode= "auto", 
                      patience = 3)

callbacks_list = [checkpoint, early]

initial_learning_rate = 1e-3
steps_per_epoch = int(np.round(train_num/batch_size))
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=steps_per_epoch,
    decay_rate=0.96,
    staircase=True)

vae.compile(
    loss = [vae_loss],
    optimizer=tf.keras.optimizers.RMSprop(learning_rate=3e-3),
    metrics=[vae_kl_loss,vae_rc_loss])


history = vae.fit(ds_train, validation_data=ds_test,
                epochs = 5, callbacks = callbacks_list)

In [None]:
model_path = "./models/vae_celeb_a.h5"
vae.load_weights(model_path)

In [None]:
images, labels = next(iter(ds_train))
vae.load_weights(model_path)
outputs = vae.predict(images)

# Display
grid_col = 8
grid_row = 2

f, axarr = plt.subplots(grid_row, grid_col, figsize=(grid_col*2, grid_row*2))

i = 0
for row in range(0, grid_row, 2):
    for col in range(grid_col):
        axarr[row,col].imshow(images[i])
        axarr[row,col].axis('off')
        axarr[row+1,col].imshow(outputs[i])
        axarr[row+1,col].axis('off')        
        i += 1
f.tight_layout(0.1, h_pad=0.2, w_pad=0.1)        
plt.show()

In [None]:
load_from_file = True # Set to False to calculate the statistic but it can be slow

if load_from_file:
    stats = pickle.load(open('stats.p','wb'))
    avg_z_mean = stats['avg_z_mean']
    avg_z_std = stats['avg_z_std']
else:    
    avg_z_mean = []
    avg_z_std = []
    for i in range(steps_per_epoch):
        images, labels = next(iter(ds_train))    
        z, z_mean, z_logvar = vae.encoder(images)
        avg_z_mean.append(np.mean(z_mean, axis=0))
        avg_z_std.append(np.mean(np.exp(0.5*z_logvar),axis=0))

    avg_z_mean = np.mean(avg_z_mean, axis=0)
    avg_z_std = np.mean(avg_z_std, axis=0)

In [None]:
z_dim = 200
z_samples = np.random.normal(loc=0, scale=1, size=(25, z_dim))
images = vae.decoder(z_samples.astype(np.float32))
grid_col = 7
grid_row = 2

f, axarr = plt.subplots(grid_row, grid_col, figsize=(2*grid_col, 2*grid_row))

i = 0
for row in range(grid_row):
    for col in range(grid_col):
        axarr[row,col].imshow(images[i])
        axarr[row,col].axis('off')   
        i += 1
f.tight_layout(0.1, h_pad=0.2, w_pad=0.1)        
plt.show() 

## Problem 2 

### Part a 

In [3]:
# reload mnist dataset
(ds_train, ds_test_), ds_info = tfds.load('mnist', 
                              split=['train', 'test'], 
                              shuffle_files=True,
                              as_supervised=True,
                              with_info=True)
def preprocess(image, label):
    image = tf.cast(image, tf.float32)
    image = image/255.
    return image, image

def preprocess_with_label(image, label):
    image = tf.cast(image, tf.float32)
    image = tf.math.round(image/255.)
    return image, label


batch_size = 256
ds_train = ds_train.map(preprocess)
ds_train = ds_train.cache() # put dataset into memory
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)

ds_test = ds_test_.map(preprocess).batch(batch_size).cache().prefetch(batch_size)
ds_test_label = ds_test.map(preprocess_with_label).batch(1000)

In [4]:
class MaskedConv2D(tf.keras.layers.Layer):
    def __init__(self, mask_type, kernel=5, filters=1):
        super(MaskedConv2D, self).__init__()
        self.kernel = kernel
        self.filters= filters
        self.mask_type = mask_type
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=[self.kernel,
                                       self.kernel,
                                       input_shape[-1],
                                       self.filters],
                                       initializer='glorot_normal',
                                       trainable = True)
        
        self.b = self.add_weight(shape=(self.filters), initializer = 'zeros', trainable=True)
                
        mask = np.ones(self.kernel**2, dtype=np.float32)
        center = len(mask) // 2
        mask[center+1:] = 0
        if self.mask_type == 'A':
            mask[center]=0
        mask = mask.reshape((self.kernel, self.kernel, 1, 1))
        self.mask = tf.constant(mask, dtype='float32')
        
    def call(self, inputs):
        masked_w = tf.math.multiply(self.w, self.mask)
        output = tf.nn.conv2d(inputs, masked_w, 1 , "SAME") + self.b
        return tf.nn.relu(output)

In [5]:
def preprocess_with_label(image, label):
    image = tf.cast(image, tf.float32)
    image = tf.math.round(image/255.)
    return image, label

ds_test_label = ds_test_.map(preprocess_with_label).batch(1000)

In [6]:
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential


class ResidualBlock(layers.Layer):
    def __init__(self, h=32):
        super(ResidualBlock, self).__init__()
        
        self.forward = Sequential([MaskedConv2D('B', kernel=1, filters=h),
                                  MaskedConv2D('B', kernel=3, filters=h),
                                  MaskedConv2D('B', kernel=1, filters=2*h)])
        
    def call(self, inputs):
        x = self.forward(inputs)
        return x + inputs

In [10]:
def SimplePixelCnn(hidden_features=64,
                  output_features=64,
                  resblocks_num=7):
    
    inputs= layers.Input(shape=[28,28,1])
    x = inputs
    
    x = MaskedConv2D('A', kernel=7, filters=2*hidden_features)(x)
    
    for _ in range(resblocks_num):
        x = ResidualBlock(hidden_features)(x)
        
    x = layers.Conv2D(output_features,(1,1), padding='same', activation='relu')(x)
    x = layers.Conv2D(1, (1,1), padding='same', activation='sigmoid')(x)
    
    return tf.keras.Model(inputs=inputs, outputs=x, name='PixelCnn')

In [11]:
pixelcnn = SimplePixelCnn()

### Part b 

In [12]:
pixelcnn.summary()

Model: "PixelCnn"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
masked_conv2d_22 (MaskedConv (None, 28, 28, 128)       6400      
_________________________________________________________________
residual_block_7 (ResidualBl (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_8 (ResidualBl (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_9 (ResidualBl (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_10 (ResidualB (None, 28, 28, 128)       53504     
_________________________________________________________________
residual_block_11 (ResidualB (None, 28, 28, 128)       535

### Part c

In [None]:
pixelcnn.compile(
    loss = tf.keras.losses.BinaryCrossentropy(),
    optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001),
    metrics = [tf.keras.metrics.BinaryCrossentropy()]
)

pixelcnn.fit(ds_train, epochs = 10, validation_data=ds_test)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10

In [None]:
batches = 25
h = 28
w = 28
images = np.zeros((batches,h,w,1), dtype=np.float32)

for row in range(h):

    for col in range(w):

        prob = pixel_cnn.predict(images)[:,row,col,0]

        pixel_samples = tf.random.categorical(tf.math.log(np.stack([1-prob, prob],1)), 1)

        images[:,row,col,0] = tf.reshape(pixel_samples,[batch])

In [None]:
# display the 25 images
f, axarr = plt.subplots(5, 5, figsize=(5*1.1,5))

i = 0
for row in range(5):
    for col in range(5):
        axarr[row,col].imshow(images[i,:,:,0], cmap='gray')
        axarr[row,col].axis('off')
        i += 1
f.tight_layout(0.1, h_pad=0.2, w_pad=0.1)        
plt.show()

### Part d

In [None]:
pixelcnn.fit(ds_train, epochs = 25, validation_data=ds_test)

In [None]:
batches = 25
h = 28
w = 28
images = np.zeros((batches,h,w,1), dtype=np.float32)

for row in range(h):

    for col in range(w):

        prob = pixel_cnn.predict(images)[:,row,col,0]

        pixel_samples = tf.random.categorical(tf.math.log(np.stack([1-prob, prob],1)), 1)

        images[:,row,col,0] = tf.reshape(pixel_samples,[batch])

In [None]:
# display the 25 images
f, axarr = plt.subplots(5, 5, figsize=(5*1.1,5))

i = 0
for row in range(5):
    for col in range(5):
        axarr[row,col].imshow(images[i,:,:,0], cmap='gray')
        axarr[row,col].axis('off')
        i += 1
f.tight_layout(0.1, h_pad=0.2, w_pad=0.1)        
plt.show()

### Part e

analysis on part e

## Problem 3

### Part a 

In [None]:
class GAN():
    def __init__(self, generator, discriminator):        
        # discriminator
        self.D = discriminator
        self.G = generator

        self.bce = tf.keras.losses.BinaryCrossentropy()
        self.d_loss = {}
        self.g_loss = {}
        self.accuracy = {}        
        self.g_gradients = []

    def discriminator_loss(self, pred_fake, pred_real):
        real_loss = self.bce(tf.ones_like(pred_real), pred_real)
        fake_loss = self.bce(tf.zeros_like(pred_fake), pred_fake)
        
        d_loss = 0.5*(real_loss + fake_loss)
        return d_loss
    
    def generator_loss(self, pred_fake):
        g_loss = self.bce(tf.ones_like(pred_fake), pred_fake)
        return g_loss
    
    def train_step(self, g_input, real_input):

        with tf.GradientTape() as g_tape,\
             tf.GradientTape() as d_tape:
            # Feed forward
            fake_input = self.G(g_input)

            pred_fake = self.D(fake_input)
            pred_real = self.D(real_input)

            # Calculate losses
            d_loss = self.discriminator_loss(pred_fake, pred_real)
            g_loss = self.generator_loss(pred_fake)
            
            # Accuracy
            fake_accuracy = tf.math.reduce_mean(binary_accuracy(tf.zeros_like(pred_fake), pred_fake))
            real_accuracy = tf.math.reduce_mean(binary_accuracy(tf.ones_like(pred_real), pred_real))
            
            # backprop gradients
            gradient_g = g_tape.gradient(g_loss, self.G.trainable_variables)
            gradient_d = d_tape.gradient(d_loss, self.D.trainable_variables)
            
            gradient_g_l1_norm = [tf.norm(gradient).numpy() for gradient in gradient_g]
            self.g_gradients.append(gradient_g_l1_norm) 
            # update weights
            self.G_optimizer.apply_gradients(zip(gradient_g, self.G.trainable_variables))
            self.D_optimizer.apply_gradients(zip(gradient_d, self.D.trainable_variables))


        return g_loss, d_loss, fake_accuracy, real_accuracy
    
    def train(self, data_generator, 
                    z_generator,
                    g_optimizer, d_optimizer,
                    steps, interval=100):
        self.D_optimizer = d_optimizer
        self.G_optimizer = g_optimizer          
        val_g_input = next(z_generator)
        for i in range(steps):
            g_input = next(z_generator)
            real_input = next(data_generator)
            
            g_loss, d_loss, fake_accuracy, real_accuracy = self.train_step(g_input, real_input)
            self.d_loss[i] = d_loss.numpy()
            self.g_loss[i] = g_loss.numpy()
            self.accuracy[i] = 0.5*(fake_accuracy.numpy() + real_accuracy.numpy())
            if i%interval == 0:
                msg = "Step {}: d_loss {:.4f} g_loss {:.4f} Accuracy. real : {:.3f} fake : {:.3f}"\
                .format(i, d_loss, g_loss, real_accuracy, fake_accuracy)
                print(msg)
                
                fake_images = self.G(val_g_input)
                self.plot_images(fake_images)

    def plot_images(self, images):
        pass

In [None]:
class DCGAN(GAN):
    def __init__(self, z_dim, input_shape):
        
        discriminator = self.Discriminator(input_shape)
        generator = self.Generator(z_dim)
        
        GAN.__init__(self, generator, discriminator)
        
    def Discriminator(self, input_shape): 

        model = tf.keras.Sequential(name='Discriminator') 
        model.add(layers.Input(shape=input_shape)) 

        model.add(layers.Conv2D(32, 7, strides=(2,2), padding='same'))
        model.add(layers.BatchNormalization(momentum=0.9))
        model.add(layers.LeakyReLU(0.2)) 
        model.add(layers.Dropout(0.2))

        model.add(layers.Conv2D(64, 7, strides=(2,2), padding='same')) 
        model.add(layers.BatchNormalization(momentum=0.9)) 
        model.add(layers.LeakyReLU(0.2))
        model.add(layers.Dropout(0.2))

        model.add(layers.Flatten()) 
        model.add(layers.Dense(1, activation='sigmoid')) 

        return model 

    def Generator(self, z_dim): 

        model = tf.keras.Sequential(name='Generator') 
        model.add(layers.Input(shape=[z_dim])) 

        model.add(layers.Dense(7*7*64))        
        model.add(layers.BatchNormalization(momentum=0.9)) 
        model.add(layers.LeakyReLU(0.2))
        model.add(layers.Reshape((7,7,64))) 

        model.add(layers.Conv2D(64, 7, padding='same')) 
        model.add(layers.BatchNormalization(momentum=0.9)) 
        model.add(layers.LeakyReLU(0.2))         
        model.add(layers.UpSampling2D((2,2), interpolation="bilinear"))

        model.add(layers.Conv2D(32, 7, padding='same')) 
        model.add(layers.ReLU()) 
        model.add(layers.UpSampling2D((2,2), interpolation="bilinear")) 

        model.add(layers.Conv2D(image_shape[-1], 3, padding='same', activation='tanh')) 

        return model     
    
    def plot_images(self, images):   
        grid_row = 1
        grid_col = 8
        f, axarr = plt.subplots(grid_row, grid_col, figsize=(grid_col*1.5, grid_row*1.5))
        for col in range(grid_col):
            axarr[col].imshow((images[col,:,:,0]+1)/2, cmap='gray')
            axarr[col].axis('off') 
        plt.show()


In [None]:
z_dim = 100

def z_generator(batch_size, z_dim):
    while True:
         yield tf.random.normal((batch_size, z_dim))        
            
z_gen = z_generator(batch_size, z_dim)

In [None]:
batch_size = 200
z_dim = 100
image_shape = (28,28,1)

In [None]:
# noise image 
noise_image = tf.random.normal((28*28, 100))
noise_image = noise_image.reshape((28,28))
plt.imshow(noise_image, cmap='gray'
plt.show()

In [None]:
ds_train, ds_info = tfds.load('mnist', split='train', shuffle_files=True, with_info=True)
ds_train = ds_train.map(preprocess)
ds_train = ds_train.cache() # put dataset into memory
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size).repeat()
train_num = ds_info.splits['train'].num_examples
train_steps_per_epoch = round(train_num/batch_size)

In [None]:
mnist_gan = DCGAN(z_dim, image_shape)
mnist_gan.D.summary()

In [None]:
mnist_gan.G.summary()

In [None]:
num_epochs = 10
#the display component is included in the train method
mnist_gan.train(iter(ds_train), z_gen, 
          Adam(2e-3), Adam(2e-3),
          num_epochs*train_steps_per_epoch, 
          1*train_steps_per_epoch)

In [None]:
num_epochs = 25
#the display component is included in the train method
mnist_gan.train(iter(ds_train), z_gen, 
          Adam(2e-3), Adam(2e-3),
          num_epochs*train_steps_per_epoch, 
          2*train_steps_per_epoch)