In [None]:
import os
import cv2
import glob
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from ast import literal_eval
from IPython import display
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.config.run_functions_eagerly(True)
tf.__version__

In [None]:
def extract_iris_rectangle(image_dir, points, resize = False, size = (256, 256)):
  image = cv2.imread(image_dir)
  x,y,w,h = cv2.boundingRect(points)
  margin = 1
  crop = image[y-margin:y+h+margin,x-margin:x+w+margin]
  if resize:
    crop = cv2.resize(crop, size)
  return crop

def save_image(img, dir):
  cv2.imwrite(dir, img)

def convert_string_to_np_array(value):
  ptsStr = literal_eval(value)
  return np.array(ptsStr, np.int32)

In [None]:
def create_dataset(df, data_dir, output_dir):
  print(f'Number of images: {len(df)}')
  os.mkdir(output_dir)
  for _, row in df.iterrows():
    try:
      file = os.path.join(data_dir, str(row["identity"]), row["file"])
      dest = os.path.join(output_dir, row["file"])
      points = convert_string_to_np_array(row["outerPts"])
      iris = extract_iris_rectangle(file, points)
      save_image(iris, dest)
    except Exception as e:
      continue
  print(f'Number of processed images: {len(glob.glob(output_dir+"/*"))}')

In [None]:
def get_dataset(datapath, image_size, batch_size):
  dataset = keras.preprocessing.image_dataset_from_directory(datapath, label_mode=None, image_size=image_size, batch_size=batch_size, color_mode='rgb')
  dataset = dataset.shuffle(len(dataset))
  dataset = dataset.map(lambda x: (x - 127.5) / 127.5)
  return dataset

In [None]:
class GAN():
    cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)    

    def __init__(self, latent_dim, input_shape, batch_size, l_rate):
        self.latent_dim = latent_dim
        self.input_shape = input_shape
        self.batch_size = batch_size
        self.l_rate = l_rate
        self.d_losses = []
        self.g_losses = []        
        self.generator_optimizer = tf.keras.optimizers.Adam(l_rate)
        self.discriminator_optimizer = tf.keras.optimizers.Adam(l_rate)
        self.generator = self.make_generator_model()
        self.discriminator = self.make_discriminator_model()
    
    def make_generator_model(self):
        model = tf.keras.Sequential()
        model.add(layers.Dense(8*8*512, use_bias=False, input_shape=(self.latent_dim,)))
        model.add(layers.Reshape((8, 8, 512)))
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2DTranspose(256, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())
        
        model.add(layers.Conv2DTranspose(128, (4, 4), strides=(1, 1), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2DTranspose(3, (4, 4), strides=(2, 2), padding='same', activation='tanh'))

        return model
    
    def test_generator(self):
        noise = tf.random.normal([1, self.latent_dim])
        generated_image = self.generator(noise, training=False)
        plt.imshow(np.array(generated_image[0] * 127.5 + 127.5).astype(np.uint8))

    def make_discriminator_model(self):
        model = tf.keras.Sequential()
        model.add(layers.Conv2D(64, (4, 4), strides=(2, 2), padding='same', input_shape=self.input_shape))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2D(128, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())
        
        model.add(layers.Conv2D(128, (4, 4), strides=(1, 1), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2D(256, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Conv2D(512, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Flatten())
        model.add(layers.Dropout(0.4))

        model.add(layers.Dense(1, activation="sigmoid"))

        return model
    
    def test_discriminator(self):
        noise = tf.random.normal([1, self.latent_dim])
        generated_image = self.generator(noise, training=False)
        return self.discriminator(generated_image)
        

    def discriminator_loss(self, real_output, fake_output):
        real_loss = self.cross_entropy(tf.ones_like(real_output), real_output)
        fake_loss = self.cross_entropy(tf.zeros_like(fake_output), fake_output)
        total_loss = real_loss + fake_loss
        return total_loss

    def generator_loss(self, fake_output):
        return self.cross_entropy(tf.ones_like(fake_output), fake_output)
    
    def train(self, dataset, epochs):
      for epoch in range(epochs):
        start = time.time()

        for image_batch in dataset:
          self.train_step(image_batch)
        display.clear_output(wait=True)
        seed = tf.random.normal([16, self.latent_dim])
        self.show_generated_images(epoch + 1,seed)

        print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))
      display.clear_output(wait=True)   
      img = self.show_generated_images(epochs, seed)
    
    @tf.function
    def train_step(self, images):
        noise = tf.random.normal([self.batch_size, self.latent_dim])

        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            generated_images = self.generator(noise, training=True)

            real_output = self.discriminator(images, training=True)
            fake_output = self.discriminator(generated_images, training=True)

            gen_loss = self.generator_loss(fake_output)
            disc_loss = self.discriminator_loss(real_output, fake_output)
            d_l = disc_loss.numpy()
            g_l = gen_loss.numpy()
            self.d_losses.append(d_l)
            self.g_losses.append(g_l)
            print(f'\rd_loss: {d_l}, g_loss: {g_l}',end="")

        gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
        gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)

        self.generator_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))
        self.discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))
        
    def show_generated_images(self, epoch, test_input):
      predictions = self.generator(test_input, training=False)
      fig = plt.figure(figsize=(4, 4))
      for i in range(predictions.shape[0]):
          plt.subplot(4, 4, i+1)
          img = np.array(predictions[i] * 127.5 + 127.5).astype(np.uint8)
          plt.imshow(img)
          plt.axis('off')
      print(f'\r',end="")  
      plt.show()
      return img
        
    def save_model(self, dir):
        self.generator.save(dir)


In [None]:
image_w = 128
image_h = 128
image_size = (image_w, image_h)
channels = 3
image_shape = [image_w, image_h, channels]

latent_dim = 128
batch_size = 16
l_rate = 0.0001
epochs = 300

In [None]:
data_dir = "../input/iris-data/data"
info_csv = "../input/iris-data/data/info.csv"
output_dir = "../working/data"
models_dir = "../working/models/"

In [None]:
k_fold = 5
df = pd.read_csv(info_csv, sep=";")
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
if not os.path.exists(models_dir):
    os.mkdir(models_dir)
for i in range(k_fold,k_fold+1):
    dest = os.path.join(output_dir, str(i))
    if not os.path.exists(dest):
        create_dataset(df[df["group"] != i], data_dir, dest)
    model = GAN(latent_dim, image_shape, batch_size, l_rate)
    dataset = get_dataset(dest, image_size, batch_size)
    model.train(dataset, epochs)
    model.save_model(os.path.join(models_dir, str(i)+".h5"))

In [None]:
model.save_model(os.path.join(models_dir, str(i)+".h5"))

In [None]:
!zip -r file.zip "../working/models/"

In [None]:
from IPython.display import FileLink 
FileLink(r'file.zip')