In [2]:
"""
This code draws significant inspiration from Jason Brownlee's work on https://machinelearningmastery.com/ while incorporating my own modifications.
"""

"\nThis code draws significant inspiration from Jason Brownlee's work on https://machinelearningmastery.com/ while incorporating my own modifications.\n"

In [3]:
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import vstack
from numpy.random import randn
from numpy.random import randint
from keras import models
from keras.datasets.cifar10 import load_data
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import *
from tensorflow.compat.v1.keras.layers import BatchNormalization
from keras.utils import image_dataset_from_directory
from keras.applications import ResNet50

import matplotlib.pyplot as plt
import csv
import os
import numpy as np




In [None]:
class DCGAN:
    def __init__(self, latent_dim, img_rows, img_col, channel, trained_generator=None, trained_discriminator=None):
        self.latent_dim = latent_dim
        self.channel = channel
        self.img_shape = (img_rows, img_col, channel)
        self.generator = trained_generator or self.build_generator()
        self.discriminator = trained_discriminator or self.build_discriminator()

    def build_discriminator(self):
        model = Sequential()
        
        model.add(Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding='same', input_shape=self.img_shape, kernel_initializer='glorot_uniform'))
        model.add(LeakyReLU(0.2))

        model.add(Conv2D(128,   kernel_size=(5, 5), strides=(2, 2), padding='same', kernel_initializer='glorot_uniform'))
        model.add(BatchNormalization(momentum=0.5))
        model.add(LeakyReLU(0.2))

        model.add(Conv2D(256, kernel_size=(5, 5), strides=(2, 2), padding='same', kernel_initializer='glorot_uniform'))
        model.add(BatchNormalization(momentum=0.5))
        model.add(LeakyReLU(0.2))

        model.add(Conv2D(512,  kernel_size=(5, 5), strides=(2, 2), padding='same',kernel_initializer='glorot_uniform'))
        model.add(BatchNormalization(momentum=0.5))
        model.add(LeakyReLU(0.2))

        model.add(Flatten())
        model.add(Dense(1))
        model.add(Activation('sigmoid'))

        optimizer = Adam(learning_rate=0.0002, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

        return model

    def build_generator(self):
        model = Sequential()
        n_nodes = 512 * 4 * 4
 
        model.add(Dense(n_nodes,  input_dim=self.latent_dim, kernel_initializer='glorot_uniform'))
        model.add(Reshape((4, 4, 512)))
        model.add(BatchNormalization(momentum=0.5))
        model.add(ReLU())

        model.add(Conv2DTranspose(256, kernel_size=(5, 5), strides=(2, 2), padding='same', kernel_initializer='glorot_uniform'))
        model.add(BatchNormalization(momentum=0.5))
        model.add(ReLU())

        model.add(Conv2DTranspose(128, kernel_size=(5, 5), strides=(2, 2), padding='same', kernel_initializer='glorot_uniform'))
        model.add(BatchNormalization(momentum=0.5))
        model.add(ReLU())

        model.add(Conv2DTranspose(64, kernel_size=(5, 5), strides=(2, 2), padding='same', kernel_initializer='glorot_uniform'))
        model.add(BatchNormalization(momentum=0.5))
        model.add(ReLU())

        model.add(Conv2DTranspose(3, kernel_size=(5, 5), strides=(2, 2), padding='same', kernel_initializer='glorot_uniform'))
        model.add(Activation('tanh'))
        

        optimizer = Adam(learning_rate=0.00015, beta_1=0.5)
        model.compile(loss='binary_crossentropy', optimizer=optimizer,metrics=['accuracy'])

        return model

    def generate_real_samples(self, dataset, n_samples):
        idx = np.random.randint(0, dataset.shape[0], n_samples)
        X = dataset[idx]
        y = np.ones((n_samples, 1)) - np.random.random_sample((n_samples, 1)) * 0.2
        return X, y

    def generate_latent_points(self, n_samples):
        x_input = np.random.normal(0, 1, size=(n_samples, self.latent_dim))
        return x_input

    def generate_fake_samples(self, n_samples):
        x_input = self.generate_latent_points(n_samples)
        X = self.generator.predict(x_input)
        y = np.random.random_sample((n_samples, 1)) * 0.2
        return X, y

def build_gan(generator,discriminator):

    discriminator.trainable = False
    model = Sequential()
    model.add(generator)
    model.add(discriminator)
    opt = Adam(learning_rate=0.00015, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    return model

In [7]:
def load_real_samples(batch_size=32, size=128):
    dataset = tf.keras.utils.image_dataset_from_directory(f'data/train',
                                        shuffle=True, seed=42, batch_size=1,
                                        label_mode=None, image_size=(size, size))
    dataset = dataset.map(lambda x: (x - 127.5) / 127.5)
    dataset = dataset.cache()  # Cache the dataset in memory
    dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Use prefetching to overlap data preprocessing and model training
    samples = np.concatenate([batch for batch in dataset])
    return samples

def save_loss_to_file(epoch, d_loss, g_loss, file_path='losses.csv'):
    with open(file_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if file.tell() == 0:  # Check if file is empty
            writer.writerow(['Epoch', 'Discriminator Loss', 'Generator Loss'])
        writer.writerow([epoch, d_loss, g_loss])
    print(f"Losses saved to {file_path}")

def save_checkpoint(generator,discriminator, gan, epoch):
    checkpoint_prefix = os.path.join("checkpoints_1/", f'ckpt_{epoch}')
    checkpoint = tf.train.Checkpoint(generator=generator,
                                     discriminator=discriminator)
    checkpoint.save(file_prefix = checkpoint_prefix)

def restore_checkpoint(dc_gan: DCGAN, checkpoint_dir='checkpoints_1'):
    checkpoint = tf.train.Checkpoint(generator=dc_gan.generator,
                                     discriminator=dc_gan.discriminator)
    latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
    if latest_checkpoint:
        checkpoint.restore(latest_checkpoint)
        print(f"Checkpoint restored successfully: {latest_checkpoint}")
    else:
        print("No checkpoint found in the checkpoint directory.")

def save_plot(examples, epoch, n=5):
    examples = (examples + 1) / 2.0
    examples = (examples*255).astype(np.uint8)
    plt.figure(figsize=(14,14))
    for i in range(n * n):
        plt.subplot(n, n, 1 + i)
        plt.axis('off')
        plt.imshow(examples[i, :, :,:])
    filename = f'generated_plot_{epoch + 1}.png'
    plt.savefig(f"images/"+filename)
    plt.close()

def display(examples, n=5):
    examples = (examples + 1) / 2.0
    examples = (examples*255).astype(np.uint8)
    plt.figure(figsize=(12,12))
    for i in range(n * n):
        plt.subplot(n, n, 1 + i)
        plt.axis('off')
        plt.imshow(examples[i, :, :,:])

def summarize_performance(gan: DCGAN, epoch, dataset, latent_dim, n_samples=100, d_loss=0, g_loss=0):
    # prepare real samples
    X_real, y_real = gan.generate_real_samples(dataset, n_samples)
    # evaluate discriminator on real examples
    _, acc_real = gan.discriminator.evaluate(X_real, y_real, verbose=0)
    # prepare fake examples
    x_fake, y_fake = gan.generate_fake_samples(n_samples)
    # evaluate discriminator on fake examples
    _, acc_fake = gan.discriminator.evaluate(x_fake, y_fake, verbose=0)
    # summarize discriminator performance
    print('>Accuracy real: %.0f%%, fake: %.0f%%' % (acc_real * 100, acc_fake * 100))
    save_loss_to_file(epoch+1, d_loss, g_loss, file_path=f'training_loss4.csv')
    save_plot(x_fake, epoch)
    if (epoch+1) % 50 != 0:
       return
    save_checkpoint(dc_gan.generator,dc_gan.discriminator,gan,epoch+1)


def train(dc_gan:DCGAN, gan, dataset, n_epochs=20, batch_size=64, start_epoch=0, augmentation_policies='color,translation,cutout'):
    bat_per_epo = int(dataset.shape[0] / batch_size)
    half_batch = int(batch_size / 2)
    # manually enumerate epochs
    for i in range(start_epoch,n_epochs):
        # enumerate batches over the training set
        for j in range(bat_per_epo):

            dc_gan.discriminator.trainable = True
            # get randomly selected 'real' samples
            X_real, y_real = dc_gan.generate_real_samples(dataset, half_batch)
            d_loss_real, acc = dc_gan.discriminator.train_on_batch(X_real, y_real)
            # update discriminator model weights
 

            # generate 'fake' examples
            X_fake, y_fake = dc_gan.generate_fake_samples(half_batch)
            d_loss, _ = dc_gan.discriminator.train_on_batch(X_fake, y_fake)
            
            d_loss = np.add(d_loss_real,d_loss)

            dc_gan.discriminator.trainable = False
            X_gan = dc_gan.generate_latent_points(half_batch)
            y_gan = tf.convert_to_tensor(np.ones((half_batch, 1)) - np.random.random_sample((half_batch, 1)) * 0.2)
            g_loss = gan.train_on_batch(X_gan, y_gan)

            # summarize loss on this batch
            print('>%d, %d/%d, d=%.3f, g=%.3f' % (i+1, j+1, bat_per_epo, d_loss, g_loss))
        if (i+1) % 50 == 0 or i==0:
            summarize_performance(dc_gan, i, dataset, dc_gan.latent_dim, d_loss=d_loss, g_loss=g_loss)

In [None]:
dataset = load_real_samples(size=64)

In [None]:
dc_gan = DCGAN(100, 64, 64, 3)
restore_checkpoint(dc_gan, checkpoint_dir="checkpoints_1/")
gan = build_gan(dc_gan.generator, dc_gan.discriminator)

In [None]:
train(dc_gan, gan, dataset, n_epochs=70000, batch_size=64, start_epoch=3850)

>3851, 1/82, d=0.665, g=2.178
>3851, 2/82, d=0.649, g=2.234
>3851, 3/82, d=0.677, g=1.969
>3851, 4/82, d=0.657, g=2.036
>3851, 5/82, d=0.729, g=2.047
>3851, 6/82, d=0.652, g=1.761
>3851, 7/82, d=0.660, g=2.385
>3851, 8/82, d=0.688, g=2.632
>3851, 9/82, d=0.637, g=1.860
>3851, 10/82, d=0.597, g=2.135
>3851, 11/82, d=0.673, g=2.262
>3851, 12/82, d=0.697, g=2.195
>3851, 13/82, d=0.694, g=2.032
>3851, 14/82, d=0.686, g=2.341
>3851, 15/82, d=0.658, g=2.405
>3851, 16/82, d=0.682, g=2.108
>3851, 17/82, d=0.624, g=1.960
>3851, 18/82, d=0.658, g=2.390
>3851, 19/82, d=0.689, g=2.347
>3851, 20/82, d=0.671, g=1.436
>3851, 21/82, d=0.724, g=3.129
>3851, 22/82, d=0.794, g=2.848
>3851, 23/82, d=0.770, g=1.999
>3851, 24/82, d=0.666, g=1.888
>3851, 25/82, d=0.610, g=2.339
>3851, 26/82, d=0.703, g=1.743
>3851, 27/82, d=0.613, g=1.733
>3851, 28/82, d=0.657, g=2.036
>3851, 29/82, d=0.647, g=2.170
>3851, 30/82, d=0.656, g=2.531
>3851, 31/82, d=0.732, g=2.595
>3851, 32/82, d=0.668, g=1.664
>3851, 33/82, d=0

In [None]:
save_checkpoint(dc_gan.generator,dc_gan.discriminator,gan,920)

In [None]:
x_fake, y_fake = dc_gan.generate_fake_samples(50)

In [None]:
display(x_fake)

In [None]:
import shutil

In [None]:
shutil.make_archive("images", 'zip', "images_3")