In [None]:
"""code required to preprocess downloaded data to appropriate directories"""
#import os, shutil
#from PIL import Image  
#import PIL
#parent_dir = r"C:\Users\aniru\Downloads"
#annotations = r"C:\Users\aniru\Downloads\mirflickr25k_annotations_v080"
#images = r"C:\Users\aniru\Downloads\mirflickr25k\mirflickr"
#dest = r"C:\Users\aniru\Downloads\final"
#cat = [f for f in os.listdir(annotations)]
#fs_img = [f for f in os.listdir(images) if '.jpg' in f]
#for f in cat:
#    det = f[0:-4]
#    path = os.path.join(dest,det)
#    os.mkdir(path)
#    src = os.path.join(annotations,f)
#    with open(src, 'r') as f:
#        for line in f:
#            cnt = 0
#            a = line.strip()
#            name = "im"+a+".jpg"
#            shutil.copy(os.path.join(images,name),path)
#            cnt = cnt + 1
#            if cnt>=100:
#                break

In [None]:
from __future__ import print_function, division

from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, multiply, Concatenate
from keras.layers import BatchNormalization, Activation, Embedding, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
import tensorflow as tf
import os 
import numpy as np
from keras.applications.vgg19 import VGG19, preprocess_input
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/drive',force_remount=True)

In [None]:
#create database file from the directories with images stored
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
  "/drive/MyDrive/Data/final",
   labels='inferred', label_mode='int',
   image_size=(224, 224),
   batch_size= 512
  )

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip

In [None]:
!unzip -q glove.6B.zip

In [None]:
#implemeting pretrained vgg19 model for image processing
model = VGG19()
imagevec = Model(inputs=model.inputs, outputs=model.layers[-2].output)

In [None]:
path_to_glove_file =  "/content/glove.6B.200d.txt"
#creating embedding matrix from glove file

embeddings_index = {}
with open(path_to_glove_file) as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs

print("Found %s word vectors." % len(embeddings_index))


In [None]:
class CGAN():
    def __init__(self):
        self.img_shape = 4096
        self.latent_dim = 4096
        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss=['binary_crossentropy'],
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise and the img as input
        # and generates the corresponding digit of that label
        noise = Input(shape=(self.latent_dim,))
        img = Input(shape=(self.img_shape,))
        label = self.generator([noise, img])

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated image and label as input and determines validity
        valid = self.discriminator([img, label])

        # The combined model  (stacked generator and discriminator)
        # Trains generator to fool discriminator
        self.combined = Model([noise, img], valid)
        self.combined.compile(loss=['binary_crossentropy'],
            optimizer=optimizer)
        

    def sortsecond(val):
      return val[1] #defined for use in cosine loss


    def build_generator(self):

        model = Sequential()

        model.add(Dense(256, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(200), activation='tanh'))
        model.add(Reshape((200,)))

        model.summary()

        noise = Input(shape=(self.latent_dim,)) #noise vector input in generator
        image = Input(shape=(4096,)) #image vector input in generator

        model_input = multiply([noise, image])
        label = model(model_input)

        return Model([noise, image], label)

    def build_discriminator(self):

        model = Sequential()

        model.add(Dense(512, input_dim=4296))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.4))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.4))
        model.add(Dense(1, activation='sigmoid'))
        model.summary()

        # img = Input(shape=self.img_shape)
        wordvec = Input(shape=(200,),dtype='float32') #label vector input in discriminator
        imageVec = Input(shape=(4096,))               #image vector input in discriminator
        print('dis wordvc=' , np.shape(wordvec))
        print('dis imgvec=', np.shape(imageVec))


        model_input = Concatenate(axis=1)([imageVec,wordvec])

        validity = model(model_input)

        return Model([imageVec, wordvec], validity)

    def train(self, epochs, batch_size=256, sample_interval=100):

        # initiate arrays
        epoch_array,g_loss_array,d_loss_real_array,d_loss_fake_array = [],[],[],[]

        #load dataset as tensors
        image_batch, labels_batch = next(iter(train_ds))

        #convert into respective vectors through vgg19 and glove
        labelsf = []
        imagesf = np.array(imagevec.predict(image_batch))
        for i in range(np.shape(image_batch)[0]):
          labelsf.append(embeddings_index[train_ds.class_names[labels_batch[i]]])
        labelsf = np.array(labelsf)
        

        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):
            

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half batch of image
            idx = np.random.randint(0, imagesf.shape[0], size=batch_size)
            images, labels = imagesf[idx], labelsf[idx]


            # Sample noise as generator input
            noise = np.random.normal(0, 1, (batch_size, 4096))

            # Generate a half batch of new images
            gen_labels = self.generator.predict([noise, images])

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch([images, labels], valid)
            d_loss_fake = self.discriminator.train_on_batch([images, gen_labels], fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Train the generator
            g_loss = self.combined.train_on_batch([noise, images], valid)
            epoch_array.append(epoch)
            g_loss_array.append(g_loss)
            d_loss_real_array.append(d_loss_real)
            d_loss_fake_array.append(d_loss_fake)

            # Plot the progress
            if epoch%sample_interval==0:
              self.loss_plot(epoch_array,g_loss_array,d_loss_real_array,d_loss_fake_array)
              print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))
        
        sampleint = np.random.randint(0, imagesf.shape[0], size=1)
        print(sampleint)
        sample, labels = imagesf[sampleint], labelsf[idx]
        t=0
        
        cosine_loss = tf.keras.losses.CosineSimilarity(axis=0)
        tags = []
        for t in range(100):
          noiser = np.random.normal(0, 1, (1, 4096))
          k = self.generator.predict([noiser, sample])
          cosine = 0
          tag = ''
          for key,_ in embeddings_index.items():
            diff = cosine_loss(k,embeddings_index[key])
            if cosine>diff:
              cosine = diff
              tag = key
          word = [tag, cosine]
          tags.append(word)
          t = t+1
          print(t)
        print(tags)
        plt.imshow(image_batch[int(sampleint)])
        tags.sort(key=sortsecond)

        print(tags[:20][0])
        plt.imshow(image_batch[sampleint])



    def loss_plot(self,epoch_array,g_loss_array,d_loss_real_array,d_loss_fake_array):
      plt.clf()
      plt.plot(epoch_array, g_loss_array, label = "Generator Loss")
      plt.plot(epoch_array, d_loss_real_array, label = "Discriminator Loss for Real Image")
      plt.plot(epoch_array, d_loss_fake_array, label = "Discriminator Loss for Fake Image")
      plt.xlabel("number of epochs")
      plt.ylabel("Loss")
      plt.title("Loss Plots")
      plt.legend()
      # plt.savefig("/ritik/Desktop/nnflsinglemodal/nnfl1/lossplots/lossplot"+str(epoch_array[-1]))
      plt.savefig("/lossplots/lossplot"+str(epoch_array[-1]))

      plt.close()


if __name__ == '__main__':
    cgan = CGAN()
    cgan.train(epochs=2000, batch_size=256, sample_interval=100)
  