<a href="https://colab.research.google.com/github/michaeldlee23/cs390-lab4/blob/master/lab4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# CS390-NIP GAN lab
# Max Jacobson / Sri Cherukuri / Anthony Niemiec
# FA2020
# uses Fashion MNIST https://www.kaggle.com/zalando-research/fashionmnist 
# uses CIFAR-10 https://www.cs.toronto.edu/~kriz/cifar.html

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten
from tensorflow.keras.layers import Dropout, BatchNormalization, LeakyReLU
from tensorflow.keras.layers import Conv2D, MaxPool2D, Conv2DTranspose, UpSampling2D
from tensorflow.keras.optimizers import Adam
from PIL import Image
import random
import time
import imageio
import shutil

In [20]:
random.seed(1618)
np.random.seed(1618)
tf.compat.v1.set_random_seed(1618)

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

DRIVE_PREFIX = '/content/drive/My Drive/Colab Notebooks/lab4/'

# NOTE: mnist_d is no credit
# NOTE: cifar_10 is extra credit
# DATASET = "mnist_d"
DATASET = "mnist_f"
# DATASET = "cifar_10"

if DATASET == "mnist_d":
    IMAGE_SHAPE = (IH, IW, IZ) = (28, 28, 1)
    LABEL = "numbers"
elif DATASET == "mnist_f":
    IMAGE_SHAPE = (IH, IW, IZ) = (28, 28, 1)
    CLASSLIST = ["top", "trouser", "pullover", "dress", "coat", "sandal", "shirt", "sneaker", "bag", "ankle boot"]
    # TODO: choose a label to train on from the CLASSLIST above
    LABEL = "dress"
elif DATASET == "cifar_10":
    IMAGE_SHAPE = (IH, IW, IZ) = (32, 32, 3)
    CLASSLIST = ["airplane", "automobile", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck"]
    LABEL = "dog"

IMAGE_SIZE = IH*IW*IZ

NOISE_SIZE = 100    # length of noise array

TRAIN_RATIO = 1     # Number of generator updates per discriminator update

# file prefixes and directory
OUTPUT_NAME = DATASET + "_" + LABEL
OUTPUT_DIR = DRIVE_PREFIX + "./outputs/" + OUTPUT_NAME

# Tensorbord directories
ADV_LOG_DIR = DRIVE_PREFIX + './logs/' + OUTPUT_NAME + '/advLoss'
GEN_LOG_DIR = DRIVE_PREFIX + './logs/' + OUTPUT_NAME + '/genLoss'

# NOTE: switch to True in order to receive debug information
VERBOSE_OUTPUT = False

In [16]:
################################### HELPER FUNCTIONS ###################################
def namedLogs(model, logs):
    result = dict()
    for l in zip(model.metrics_names, logs):
        result[l[0]] = l[1]
    return result


In [17]:
################################### DATA FUNCTIONS ###################################

# Load in and report the shape of dataset
def getRawData():
    if DATASET == "mnist_f":
        (xTrain, yTrain), (xTest, yTest) = tf.keras.datasets.fashion_mnist.load_data()
    elif DATASET == "cifar_10":
        (xTrain, yTrain), (xTest, yTest) = tf.keras.datasets.cifar10.load_data()
    elif DATASET == "mnist_d":
        (xTrain, yTrain), (xTest, yTest) = tf.keras.datasets.mnist.load_data()
    print("Shape of xTrain dataset: %s." % str(xTrain.shape))
    print("Shape of yTrain dataset: %s." % str(yTrain.shape))
    print("Shape of xTest dataset: %s." % str(xTest.shape))
    print("Shape of yTest dataset: %s." % str(yTest.shape))
    return ((xTrain, yTrain), (xTest, yTest))

# Filter out the dataset to only include images with our LABEL, meaning we may also discard
# class labels for the images because we know exactly what to expect
def preprocessData(raw):
    ((xTrain, yTrain), (xTest, yTest)) = raw
    if DATASET == "mnist_d":
        xP = np.r_[xTrain, xTest]
    else:
        c = CLASSLIST.index(LABEL)
        x = np.r_[xTrain, xTest]
        y = np.r_[yTrain, yTest].flatten()
        ilist = [i for i in range(y.shape[0]) if y[i] == c]
        xP = x[ilist]
    # NOTE: Normalize from 0 to 1 or -1 to 1
    #xP = xP/255.0
    xP = xP/127.5 - 1
    print("Shape of Preprocessed dataset: %s." % str(xP.shape))
    return xP

In [25]:
################################# BUILDING NETWORKS ##################################
#################################      MNIST_F      ##################################

# Model that discriminates between fake and real dataset images
def buildDiscriminator(dropRate=0.2):
    # Calculating output size for Conv2D
    # When padding = same:
    #   H = H1 / stride
    # When padding = valid:
    #   H = (H1 - HF + 1) / stride
    # Where H = output height (resp, width), H1 = input height, HF = filter height
    model = Sequential(name="MNIST_DISCRIMINATOR")

    # output shape: (14, 14, 64)
    model.add(Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=IMAGE_SHAPE))
    model.add(LeakyReLU())
    model.add(Dropout(dropRate))

    # output shape: (7, 7, 128)
    model.add(Conv2D(128, (3, 3), strides=(2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Dropout(dropRate))

    model.add(Flatten())
    model.add(Dense(1024))
    model.add(LeakyReLU())
    model.add(Dropout(2 * dropRate))
    model.add(Dense(1, activation='sigmoid'))

    model.summary()
    # TODO: build a discriminator which takes in a (28 x 28 x 1) image - possibly from mnist_f
    #       and possibly from the generator - and outputs a single digit REAL (1) or FAKE (0)
    # Creating a Keras Model out of the network
    inputTensor = Input(shape = IMAGE_SHAPE)
    return Model(inputTensor, model(inputTensor))

# Model that generates a fake image from random noise
def buildGenerator():
    # from https://datascience.stackexchange.com/questions/26451/how-to-calculate-the-output-shape-of-conv2d-transpose
    # Calculating output size for Conv2DTranspose
    # When padding = same:
    #   H = H1 * stride
    # When padding = valid:
    #   H = (H1-1) * stride + HF
    # Where H = output height (resp, width), H1 = input height, HF = filter height
    kernelSize = (5, 5)
    model = Sequential(name="MNIST_GENERATOR")

    model.add(Dense(7 * 7 * 128, input_dim = NOISE_SIZE))
    model.add(LeakyReLU())
    model.add(BatchNormalization())
    
    model.add(Reshape((7, 7, 128)))

    # Output shape: (14, 14, 64)
    model.add(Conv2DTranspose(64, kernelSize, strides=(2, 2), padding='same', input_shape=IMAGE_SHAPE))
    model.add(LeakyReLU())
    model.add(BatchNormalization())

    # Output shape: (28, 28, 1)
    model.add(Conv2DTranspose(IZ, kernelSize, strides=(2, 2), padding='same', activation='tanh')) 

    model.summary()
    # TODO: build a generator which takes in a (NOISE_SIZE) noise array and outputs a fake
    #       mnist_f (28 x 28 x 1) image
    assert model.output_shape == (None, IH, IW, IZ)

    # Creating a Keras Model out of the network
    inputTensor = Input(shape = (NOISE_SIZE,))
    return Model(inputTensor, model(inputTensor))

In [None]:
################################# BUILDING NETWORKS ##################################
#################################     CIFAR_10      ##################################

# Model that discriminates between fake and real dataset images
def buildDiscriminator(dropRate=0.2):
    model = Sequential(name="CIFAR_DISCRIMINATOR")
    kernelSize = (3, 3)
    alpha = 0.2
    momentum = 0.8

    # output size will be (16, 16, 32)
    model.add(Conv2D(32, kernelSize, strides=(2, 2), padding='same', input_shape=IMAGE_SHAPE))
    model.add(LeakyReLU(alpha))
    model.add(Dropout(dropRate))

    # output size will be (8, 8, 64)
    model.add(Conv2D(64, kernelSize, strides=(2, 2), padding='same'))
    model.add(LeakyReLU(alpha))
    model.add(BatchNormalization(momentum=momentum))
    model.add(Dropout(dropRate))

    # output size will be (4, 4, 128)
    model.add(Conv2D(128, kernelSize, strides=(2, 2), padding='same'))
    model.add(BatchNormalization(momentum=momentum))
    model.add(LeakyReLU(alpha))
    model.add(Dropout(dropRate))

    # output size will be (2, 2, 256)
    # model.add(Conv2D(256, kernelSize, strides=(2, 2), padding='same'))
    # model.add(BatchNormalization(momentum=momentum))
    # model.add(LeakyReLU(alpha))
    # model.add(Dropout(dropRate))

    model.add(Flatten())
    # model.add(Dense(1024))
    # model.add(LeakyReLU(alpha))
    # model.add(Dropout(2 * dropRate))
    model.add(Dense(1, activation='sigmoid'))

    model.summary()
    # TODO: build a discriminator which takes in a (28 x 28 x 1) image - possibly from mnist_f
    #       and possibly from the generator - and outputs a single digit REAL (1) or FAKE (0)
    # Creating a Keras Model out of the network
    inputTensor = Input(shape = IMAGE_SHAPE)
    return Model(inputTensor, model(inputTensor))

# Model that generates a fake image from random noise
def buildGenerator():
    kernelSize = (3, 3)
    alpha = 0.2
    momentum = 0.8
    model = Sequential(name="CIFAR_GENERATOR")

    model.add(Dense(7 * 7 * 256, input_dim = NOISE_SIZE))
    model.add(LeakyReLU(alpha))
    # model.add(BatchNormalization(momentum))
    
    # output shape is (4, 4, 256)
    model.add(Reshape((7, 7, 256)))

    # output size will be (8, 8, 256)
    model.add(UpSampling2D())
    model.add(Conv2D(256, kernelSize, padding='same', activation='relu'))
    # model.add(Conv2DTranspose(256, kernelSize, strides=(2, 2), padding='same'))
    # model.add(LeakyReLU(alpha))
    model.add(BatchNormalization(momentum=momentum))

    # output size will be (16, 16, 256)
    model.add(UpSampling2D())
    model.add(Conv2D(256, kernelSize, padding='same', activation='relu'))
    # model.add(Conv2DTranspose(256, kernelSize, strides=(2, 2), padding='same'))
    # model.add(LeakyReLU(alpha))
    model.add(BatchNormalization(momentum=momentum))

    # output size will be (32, 32, 128)
    # model.add(UpSampling2D())
    # model.add(Conv2D(128, kernelSize, padding='same', activation='relu'))
    # # model.add(Conv2DTranspose(128, kernelSize, strides=(2, 2), padding='same'))
    # # model.add(LeakyReLU(alpha))
    # model.add(BatchNormalization(momentum=momentum))
    
    # output size will be (32, 32, 3)
    model.add(Conv2D(IZ, kernelSize, padding='same', activation='tanh')) 

    model.summary()
    # TODO: build a generator which takes in a (NOISE_SIZE) noise array and outputs a fake
    #       mnist_f (28 x 28 x 1) image
    assert model.output_shape == (None, IH, IW, IZ)

    # Creating a Keras Model out of the network
    inputTensor = Input(shape = (NOISE_SIZE,))
    return Model(inputTensor, model(inputTensor))

In [None]:
################################### CREATING A GAN ###################################

def buildGAN(images, epochs = 40000, batchSize = 32, loggingInterval = 0):
    # Setup
    opt = Adam(0.0002, 0.5)
    loss = "binary_crossentropy"

    # Setup adversary
    adversary = buildDiscriminator()
    adversary.compile(loss = loss, optimizer = opt, metrics = ["accuracy"])

    # Setup generator and GAN
    adversary.trainable = False                     # freeze adversary's weights when training GAN
    generator = buildGenerator()                    # generator is trained within GAN in relation to adversary performance
    noise = Input(shape = (NOISE_SIZE,))
    gan = Model(noise, adversary(generator(noise))) # GAN feeds generator into adversary
    gan.compile(loss = loss, optimizer = opt, metrics = ["accuracy"])

    # Tensorboard logging setup
    advTensorboard = tf.keras.callbacks.TensorBoard(
        log_dir=ADV_LOG_DIR,
    )
    advTensorboard.set_model(adversary)

    genTensorboard = tf.keras.callbacks.TensorBoard(
        log_dir=GEN_LOG_DIR,
    )
    genTensorboard.set_model(gan)

    # Training
    trueCol = np.ones((batchSize, 1))
    falseCol = np.zeros((batchSize, 1))
    start = time.time()
    for epoch in range(epochs):
        # Train discriminator with a true and false batch
        batch = images[np.random.randint(0, images.shape[0], batchSize)]
        noise = np.random.normal(0, 1, (batchSize, NOISE_SIZE))
        genImages = generator.predict(noise)
        if epoch % TRAIN_RATIO == 0:
            advTrueLoss = adversary.train_on_batch(batch, trueCol)
            advFalseLoss = adversary.train_on_batch(genImages, falseCol)
            advLoss = np.add(advTrueLoss, advFalseLoss) * 0.5

        # Train generator by training GAN while keeping adversary component constant
        noise = np.random.normal(0, 1, (batchSize, NOISE_SIZE))
        genLoss = gan.train_on_batch(noise, trueCol)

        # Logging
        if loggingInterval > 0 and epoch % loggingInterval == 0 or epoch == epochs - 1:
            end = time.time()
            print("\tEpoch %d:" % epoch)
            print("\t\tDiscriminator loss: %f." % advLoss[0])
            print("\t\tDiscriminator accuracy: %.2f%%." % (100 * advLoss[1]))
            print("\t\tGenerator loss: %f." % genLoss[0])
            print("\t\tCompleted in %ds" % (end - start))
            start = time.time()
            # Tensorboard logging
            runGAN(generator, OUTPUT_DIR + "/" + OUTPUT_NAME + "_test_%d.png" % (epoch / loggingInterval))
            advTensorboard.on_epoch_end(epoch, namedLogs(adversary, [advLoss[0]]))
            genTensorboard.on_epoch_end(epoch, namedLogs(gan, [genLoss[0]]))
    advTensorboard.on_train_end(None)
    genTensorboard.on_train_end(None)
    return (generator, adversary, gan)

# Generates an image using given generator
def runGAN(generator, outfile):
    noise = np.random.normal(0, 1, (1, NOISE_SIZE)) # generate a random noise array
    img = generator.predict(noise)[0]               # run generator on noise
    img = np.squeeze(img)                           # readjust image shape if needed
    img = (0.5*img + 0.5)*255                       # adjust values to range from 0 to 255 as needed
    imageio.imwrite(outfile, img.astype('uint8'))   # store resulting image
    print('\t\tSaved image to %s' % outfile)

In [26]:
################################### RUNNING THE PIPELINE #############################

def main():
    print("Starting %s image generator program." % LABEL)
    if os.path.exists(OUTPUT_DIR):
        print('Removing old outputs...')
        shutil.rmtree(OUTPUT_DIR)   # remove old outputs
    if os.path.exists(ADV_LOG_DIR):
        print('Removing old adversary logs...')
        shutil.rmtree(ADV_LOG_DIR)  # remove old adversary logs
    if os.path.exists(GEN_LOG_DIR):
        print('Removing old generator logs...')
        shutil.rmtree(GEN_LOG_DIR)  # remove old generator logs
    print('Making new output directory...')
    os.makedirs(OUTPUT_DIR)         # make new output directory
    print('Making new adversary logs directory...')
    os.makedirs(ADV_LOG_DIR)        # make new adversary logs
    print('Making new generator logs directory...')
    os.makedirs(GEN_LOG_DIR)        # make new generator logs
    # Receive all of mnist_f
    raw = getRawData()
    # Filter for just the class we are trying to generate
    data = preprocessData(raw)
    # Create and train all facets of the GAN
    (generator, adv, gan) = buildGAN(data, epochs = 10000, loggingInterval = 500)
    # Utilize our spooky neural net gimmicks to create realistic counterfeit images
    for i in range(10):
        runGAN(generator, OUTPUT_DIR + "/" + OUTPUT_NAME + "_final_%d.png" % i)
    print("Images saved in %s directory." % OUTPUT_DIR)

if __name__ == '__main__':
    main()

Starting dress image generator program.
Removing old outputs...
Removing old adversary logs...
Removing old generator logs...
Making new output directory...
Making new adversary logs directory...
Making new generator logs directory...
Shape of xTrain dataset: (60000, 28, 28).
Shape of yTrain dataset: (60000,).
Shape of xTest dataset: (10000, 28, 28).
Shape of yTest dataset: (10000,).
Shape of Preprocessed dataset: (7000, 28, 28).
Model: "MNIST_DISCRIMINATOR"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_15 (Conv2D)           (None, 14, 14, 64)        1664      
_________________________________________________________________
leaky_re_lu_32 (LeakyReLU)   (None, 14, 14, 64)        0         
_________________________________________________________________
dropout_18 (Dropout)         (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_16