In [1]:
from __future__ import print_function, division   # make print a function & changing a division operator

from keras.layers import Input, Dense, Reshape, Flatten, Dropout, multiply
# https://keras.io/api/layers/
# Keras layers API.
# class Dense: Regular densely-connected NN layer.
# class Reshape: Layer that reshapes inputs into the given shape.
""" class Flatten: Flattens the input. Does not affect the batch size.
 The Dropout layer randomly sets input units to 0 with a frequency of rate at each step during training time,
 which helps prevent overfitting. Inputs not set to 0 are scaled up by 1/(1 - rate) 
 such that the sum over all inputs is unchanged."""
# class Multiply: Layer that multiplies (element-wise) a list of inputs.

from keras.layers import BatchNormalization, Activation, Embedding, ZeroPadding2D
# https://www.tensorflow.org/api_docs/python/tf/keras/layers
# Batch normalization applies a transformation that maintains the mean output close to 0 and the output standard deviation close to 1.
# class Activation: Applies an activation function to an output.
# class Embedding: Turns positive integers (indexes) into dense vectors of fixed size.
# class ZeroPadding2D: Zero-padding layer for 2D input (e.g. picture).

from keras.layers.advanced_activations import LeakyReLU   # Leaky version of a Rectified Linear Unit.

# Upsampling layer for 2D inputs. Repeats the rows and columns of the data by size[0] and size[1] respectively.
from keras.layers.convolutional import UpSampling2D, Conv2D

from keras.models import Sequential, Model

# Adam optimization is a stochastic gradient descent method that is based on adaptive estimation of first-order and second-order moments.
from keras.optimizers import Adam

import matplotlib.pyplot as plt
import numpy as np
import pathlib
import os
import matplotlib.image as mpimg
import cv2
from sklearn import preprocessing

class CGAN():
    def __init__(self):
        # Input shape
        self.img_rows = 128
        self.img_cols = 128
        self.channels = 1
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        self.num_classes = 5
        self.latent_dim = 128

        optimizer = Adam(0.0002, 0.8)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        # is here
        self.discriminator.compile(loss='binary_crossentropy',
                                   optimizer=optimizer,
                                   metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise as input and generates imgs
        noise = Input(shape=(self.latent_dim,))
        label = Input(shape=(1,))

        img = self.generator([noise, label])
        # was here
        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator([img, label])

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model([noise, label], valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

    def load_images(self):
        # get data from the path to the COVID-19 images from google drive
        data = pathlib.Path('/content/drive/MyDrive/Colab Notebooks/Data Mining and Visualization/Assessment 1/DMV_Assess_1_Covid-19_Dataset/Covid-19') 
        x_train = []
        y_train = []
        (img_x, img_y) = 128,128 
        classes = ['Covid-19']    # we only have to generate one class images
        lb = preprocessing.LabelEncoder()   #Binarizer()
        lb.fit(classes)
        covid_image_list = []
        # getting all the files as we have files in .jpg, .png and .jpeg formats only
        image_list = list(data.glob('*.*'))
        count = 0
        for i in image_list:
          # creating list of all covid-
          covid_image_list.append(str(i))
        for i in covid_image_list:
          image = cv2.imread(i)   # loads an image from a specific file location
          image = image[:, :, 0]
          arr = cv2.resize(image, (img_x, img_y))   # resizing the image
          arr = arr.astype('float32')
          arr /= 255.0
          arr = arr - np.mean(arr)
          # print("shape of image: {}".format(arr1.shape))
          x_train.append(arr)
          class_label = lb.transform([classes[0]])[0]
          y_train.append(class_label)
          count += 1

        x_train = np.asarray(x_train)
        y_train = np.asarray(y_train)
        return x_train,y_train,count

    def build_generator(self):

        model = Sequential()

        model.add(Dense(128, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(256, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        model.summary()

        noise = Input(shape=(self.latent_dim,))
        label = Input(shape=(1,), dtype='int32')
        label_embedding = Flatten()(Embedding(self.num_classes, self.latent_dim)(label))

        model_input = multiply([noise, label_embedding])
        img = model(model_input)

        return Model([noise, label], img)

    def build_discriminator(self):
        model = Sequential()

        model.add(Dense(512, input_dim=np.prod(self.img_shape)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512, input_dim=np.prod(self.img_shape)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512, input_dim=np.prod(self.img_shape)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.3))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.3))
        model.add(Dense(1, activation='sigmoid'))

        model.summary()

        img = Input(shape=self.img_shape)
        label = Input(shape=(1,), dtype='int32')

        label_embedding = Flatten()(Embedding(self.num_classes, np.prod(self.img_shape))(label))
        flat_img = Flatten()(img)

        model_input = multiply([flat_img, label_embedding])

        validity = model(model_input)

        return Model([img, label], validity)

    def train(self, epochs, batch_size, sample_interval):
        # Load the dataset
        X_train, y_train, count = self.load_images()

        print("shape of x train: {}".format(len(X_train)))
        print("Y SHAPE BEFORE RESHAPING: {}".format(y_train.shape))

        X_train = X_train.reshape(count, self.img_cols, self.img_rows, 1)
        y_train = y_train.reshape(-1, 1)

        print("Y SHAPE: {}".format(y_train.shape))

        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half batch of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs, labels = X_train[idx], y_train[idx]

            # Sample noise as generator input
            noise = np.random.normal(0, 1, (batch_size, 128))

            # Generate a half batch of new images
            gen_imgs = self.generator.predict([noise, labels])

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch([imgs, labels], valid)
            d_loss_fake = self.discriminator.train_on_batch([gen_imgs, labels], fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Condition on labels
            sampled_labels = np.random.randint(0, 5, batch_size).reshape(-1, 1)

            # Train the generator
            g_loss = self.combined.train_on_batch([noise, sampled_labels], valid)

            # Plot the progress
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))

            # If at save interval => save generated image samples
    #         if epoch % sample_interval == 0:
    #             self.sample_images(epoch)

    # def sample_images(self, epoch):
    #     r, c = 2, 2
    #     noise = np.random.normal(0, 1, (r * c, 128))
    #     sampled_labels = np.arange(0, 4).reshape(-1, 1)

    #     gen_imgs = self.generator.predict([noise, sampled_labels])

    #     # Rescale images 0 - 1
    #     gen_imgs = 0.5 * gen_imgs + 0.5

    #     fig, axs = plt.subplots(r, c)
    #     cnt = 0
    #     for i in range(r):
    #         for j in range(c):
    #             axs[i,j].imshow(gen_imgs[cnt,:,:,0], cmap='gray')
    #             axs[i,j].set_title("Covid: %d" % sampled_labels[cnt])
    #             axs[i,j].axis('off')
    #             cnt += 1
    #     fig.savefig("/content/drive/MyDrive/Colab Notebooks/Data Mining and Visualization/Assessment 1/samples/%d.png" % epoch)
    #     plt.close()



if __name__ == '__main__':
  cgan = CGAN()
  cgan.train(epochs=40000, batch_size=256, sample_interval=5000)
  # cgan.train(epochs=20000, batch_size=256, sample_interval=2000)
  # cgan.train(epochs=20000, batch_size=128, sample_interval=2000)
  # cgan.train(epochs=100, batch_size=128, sample_interval=50)
  cgan.generator.save('models/gen.h5')
  cgan.discriminator.save('models/disc.h5')
  # Generate one-hot-encoded labels
  # prepare label binarizer

  from sklearn import preprocessing
  lb = preprocessing.LabelEncoder() #Binarizer()

  classes = ['Covid-19']

  OHE_labels = lb.fit_transform(classes)
  print(OHE_labels)
  # at the end, loop per class, per 1000 images
  cnt = 0
  fig, ax = plt.subplots()
  synthetic_images_path = '/content/drive/MyDrive/Colab Notebooks/Data Mining and Visualization/Assessment 1/images-covid/'
  
  for label in OHE_labels:
    for num in range(50):
      nlab = np.asarray([label]).reshape(-1, 1)
      noise1 = np.random.normal(0, 1, (1, 128))
      img = cgan.generator.predict([noise1, nlab])
      plt.imshow(img[cnt,:,:,0], cmap='gray', interpolation='nearest')
      plt.axis('off')
      fig.savefig(synthetic_images_path + str(num) + ".png", bbox_inches='tight')
      plt.clf()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
35001 [D loss: 0.088906, acc.: 97.66%] [G loss: 0.624208]
35002 [D loss: 0.081840, acc.: 97.85%] [G loss: 0.583617]
35003 [D loss: 0.096635, acc.: 97.46%] [G loss: 0.600395]
35004 [D loss: 0.088780, acc.: 97.85%] [G loss: 0.792050]
35005 [D loss: 0.079609, acc.: 98.05%] [G loss: 0.635988]
35006 [D loss: 0.059955, acc.: 98.63%] [G loss: 0.844545]
35007 [D loss: 0.108281, acc.: 97.07%] [G loss: 0.754439]
35008 [D loss: 0.078644, acc.: 98.05%] [G loss: 0.790570]
35009 [D loss: 0.076821, acc.: 98.05%] [G loss: 0.680676]
35010 [D loss: 0.095441, acc.: 97.27%] [G loss: 0.604977]
35011 [D loss: 0.067752, acc.: 98.24%] [G loss: 0.808682]
35012 [D loss: 0.091998, acc.: 97.66%] [G loss: 0.783223]
35013 [D loss: 0.064310, acc.: 98.44%] [G loss: 0.816440]
35014 [D loss: 0.107380, acc.: 97.07%] [G loss: 0.572617]
35015 [D loss: 0.082457, acc.: 98.05%] [G loss: 0.577208]
35016 [D loss: 0.111234, acc.: 97.27%] [G loss: 0.818388]
35017 [

<Figure size 432x288 with 0 Axes>