In [None]:
# importing Neccessary Library and constant variable
# !pip install tensorflow_transform


import itertools
import tensorflow as tf
import numpy as np
from glob import glob
import time
import os

from datetime import datetime

from sklearn.metrics import roc_curve, auc, precision_score, recall_score

from matplotlib import pyplot as plt

IMG_H = 128
IMG_W = 128
IMG_C = 3  ## Change this to 1 for grayscale.

        
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.io.decode_bmp(img)
    img = tf.image.resize_with_crop_or_pad(img, IMG_H, IMG_W)
    img = tf.cast(img, tf.float32)
    img = (img - 127.5) / 127.5
    return img

In [None]:
def tf_dataset(images_path, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices(images_path)
    dataset = dataset.shuffle(buffer_size=10240)
    dataset = dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return dataset

In [None]:
# load image dataaset for testing with labels
def load_image_test(filename, class_names, size=(128,128)):
	# load image with the preferred size
	pixels = tf.keras.preprocessing.image_dataset_from_directory(
        filename, labels='inferred',
        label_mode='int',
        image_size=size,
        batch_size=1,
        class_names=class_names,
        shuffle=True
    )

	return pixels

In [None]:
''' calculate the auc value for lables and scores'''
def roc(labels, scores, saveto=None):
    """Compute ROC curve and ROC area for each class"""
    roc_auc = dict()
    # True/False Positive Rates.
    fpr, tpr, _ = roc_curve(labels, scores)
    roc_auc = auc(fpr, tpr)
    return roc_auc

In [None]:
def plot_confusion_matrix(cm, classes,
                        normalize=False,
                        title='Confusion matrix',
                        cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
def conv_block(input, num_filters):
    x = tf.keras.layers.Conv2D(num_filters, kernel_size=(1,1), padding="same")(input)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    x = tf.keras.layers.Conv2D(num_filters, kernel_size=(3,3), padding="same")(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    return x

def decoder_block(input, skip_features, num_filters):
    x = tf.keras.layers.Conv2DTranspose(num_filters, (1, 1), strides=2, padding="same")(input)
    x = tf.keras.layers.Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

In [None]:
# generator loss function
# def generator_loss(fake_output):
#     return cross_entropy(tf.ones_like(fake_output), fake_output)

# generator loss function
# def generator_loss(adv_val, rec_val, ssim_val, feat_val):
#     return (adv_val * ADV_REG_RATE_LF) + (rec_val * REC_REG_RATE_LF) + (ssim_val * SSIM_REG_RATE_LF) + (feat_val * FEAT_REG_RATE_LF)

In [None]:
# discriminator loss function
# def discriminator_loss(real_output, fake_output):
#     real_loss = cross_entropy(tf.ones_like(real_output), real_output)
#     fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
#     total_loss = real_loss + fake_loss
#     return total_loss

# discriminator loss function
# def discriminator_loss(adv_val, feat_val):
#     return (adv_val * ADV_REG_RATE_LF) + (feat_val * FEAT_REG_RATE_LF)

In [None]:
# create generator model based on resnet50 and unet network
def build_generator_resnet50_unet(input_shape):
        # print(inputs)
        # print("pretained start")
        """ Pre-trained ResNet50 Model """
        resnet50 = tf.keras.applications.ResNet50(include_top=False, weights="imagenet", input_tensor=input_shape)
        # print("testing")
        """ Encoder using resnet50"""
        # for layer in resnet50.layers:
#         resnet50.summary()
        #   print(layer.name)
        s1 = resnet50.get_layer("input_1").output           ## (128 x 128)
        # print(s1)
        s2 = resnet50.get_layer("conv1_relu").output        ## (64 x 64)
        s3 = resnet50.get_layer("conv2_block3_out").output  ## (32 x 32)
        s4 = resnet50.get_layer("conv3_block4_out").output  ## (16 x 16)
        s5 = resnet50.get_layer("conv4_block6_out").output  ## (8 x 8)

        """ Bridge """
        b1 = resnet50.get_layer("conv5_block3_out").output  ## (4 x 4)

        # print("test")
        # print(b1.get_weights())
        """ Decoder unet"""
        d1 = decoder_block(b1, s5, 128)                     ## (16 x 16)
        d2 = decoder_block(d1, s4, 64)                     ## (32 x 32)
        d3 = decoder_block(d2, s3, 32)                     ## (64 x 64)
        d4 = decoder_block(d3, s2, 16)                      ## (128 x 128)
        d5 = decoder_block(d4, s1, 8)                      ## (128 x 128)

        """ Output """
        outputs = tf.keras.layers.Conv2D(3, 1, padding="same", activation="sigmoid")(d5)

        model = tf.keras.models.Model(inputs, outputs)

        return model

In [None]:
# create discriminator model
def build_discriminator(inputs):

    x = tf.keras.layers.SeparableConvolution2D(32,kernel_size= (1, 1), strides=(2, 2), padding='same')(inputs)
    x = tf.keras.layers.LeakyReLU()(x)
    x = tf.keras.layers.Dropout(0.3)(x)

    x = tf.keras.layers.SeparableConvolution2D(64,kernel_size=(1, 1), strides=(2, 2), padding='same')(x)
    x = tf.keras.layers.LeakyReLU()(x)
    x = tf.keras.layers.Dropout(0.3)(x)

    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(1)(x)
    
   
    
    model = tf.keras.models.Model(inputs, x)
    return model
    # return x

In [None]:
class ResUnetGAN(tf.keras.models.Model):
    def __init__(self, discriminator, generator):
        super(ResUnetGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        # Regularization Rate for each loss function
        self.ADV_REG_RATE_LF = 1
        self.REC_REG_RATE_LF = 50
        self.SSIM_REG_RATE_LF = 50
        self.FEAT_REG_RATE_LF = 1
        
        
        
        
        self.d_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5, beta_2=0.999)
        self.g_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5, beta_2=0.999)
        
    

    

    def compile(self, d_optimizer, g_optimizer):
        super(ResUnetGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        
        
        
        
        # setting for checkpoint
        checkpoint_dir = './training_checkpoints'
        self.checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
        self.checkpoint = tf.train.Checkpoint(generator_optimizer=self.g_optimizer,
                                 discriminator_optimizer=self.d_optimizer,
                                 generator=self.generator,
                                 discriminator=self.discriminator)

  
# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
    @tf.function
    def train_step(self, images):


        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            # tf.print("Images: ", images)
            reconstructed_images = self.generator(images, training=True)
            real_output = self.discriminator(images, training=True)
            # print(generated_images.shape)
            fake_output = self.discriminator(reconstructed_images, training=True)

            # ss 1: ADVERSARIAL loss
#             loss_adv =  tf.reduce_mean( cross_entropy(real_output, fake_output) )
            
            
            d_loss_real = tf.reduce_mean(
              cross_entropy(tf.ones_like(real_output), real_output))
            d_loss_fake = tf.reduce_mean(
              cross_entropy(tf.zeros_like(fake_output), fake_output))
            
            loss_adv = d_loss_real + d_loss_fake
            
            # loss_adv = tf.math.reduce_mean(tf.math.log(real_output) + tf.math.log(1 - fake_output))
            
            # Loss 2: RECONSTRUCTION loss (L1)
            loss_rec =  tf.reduce_mean( tf.keras.losses.mae(reconstructed_images, images) )
#             loss_rec = tf.math.reduce_sum(tf.math.abs(images - reconstructed_images))
            
            # Loss 3: SSIM loss
            loss_ssim =  tf.reduce_mean( 1 - tf.image.ssim(images, reconstructed_images, max_val=1.0)[0] )
            
            # Loss 4: FEATURE loss (L2)
            loss_feat =  tf.reduce_mean( tf.keras.losses.mse(real_output, fake_output) )

            gen_loss = tf.reduce_mean( (loss_adv * self.ADV_REG_RATE_LF) + (loss_rec * self.REC_REG_RATE_LF) + (loss_ssim * self.SSIM_REG_RATE_LF) + (loss_feat * self.FEAT_REG_RATE_LF) )
            disc_loss = tf.reduce_mean( (loss_adv * self.ADV_REG_RATE_LF) + (loss_feat * self.FEAT_REG_RATE_LF) )

        gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
        gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
        

        
        self.d_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))
        self.g_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))
        


        return {
            "gen_loss": gen_loss,
            "disc_loss": disc_loss,
            "loss_adv": loss_adv,
            "loss_rec": loss_rec,
            "loss_ssim": loss_ssim,
            "loss_feat": loss_feat
        }

    def saved_model(self, filepath, num_of_epoch):
        self.generator.save(filepath + "g_model" + str(num_of_epoch) + ".h5")
        self.discriminator.save(filepath + "d_model" + str(num_of_epoch) + ".h5")

    def loaded_model(self, g_filepath, d_filepath):
        self.generator.load_weights(g_filepath)
        self.discriminator.load_weights(d_filepath)

    def test_and_eval(self, filepath, g_filepath, d_filepath):
        threshold = 0.8
        class_names = ["normal", "defect"]
        test_dateset = load_image_test(filepath, class_names)
        # print(test_dateset)
        
        # range between 0-1
        anomaly_weight = 0.1
       
       
#         predictions = np.array([])
#         labels =  np.array([])
        scores_ano = []
        real_label = []
        i = 0
        self.generator.load_weights(g_filepath)
        self.discriminator.load_weights(d_filepath)
        
        
        for images, labels in test_dateset:
            i += 1
            
            reconstructed_images = self.generator(images, training=False)
            real_output = self.discriminator(images, training=False)
            # print(generated_images.shape)
            fake_output = self.discriminator(reconstructed_images, training=False)

            
            # Loss 2: RECONSTRUCTION loss (l1)
            loss_rec =  tf.reduce_mean( tf.keras.losses.mae(reconstructed_images, images) )
  
            # Loss 4: FEATURE loss
            loss_feat =  tf.reduce_mean( tf.keras.losses.mse(real_output, fake_output) )

            
            score = (anomaly_weight * loss_rec) + ((1-anomaly_weight) * loss_feat)
#             print(score, loss_rec, loss_feat)
            print(i, score.numpy(),labels.numpy()[0] )
#          
            scores_ano = np.append(scores_ano, score.numpy())
            real_label = np.append(real_label, labels.numpy()[0])
            
        
        
        ''' Scale scores vector between [0, 1]'''
        # scores_ano = (scores_ano - scores_ano.min())/(scores_ano.max()-scores_ano.min())
        # scores_ano = tft.scale_to_0_1(scores_ano)
        scores_ano = (scores_ano - np.min(scores_ano))/np.ptp(scores_ano)
        print("before conversion: ",scores_ano)
        scores_ano = (scores_ano > threshold).astype(int)
        print("scores_ano: ", scores_ano)
        print("real_label: ", real_label)
        auc_out = roc(real_label, scores_ano)
        print("auc: ", auc_out)

        cm = tf.math.confusion_matrix(labels=real_label, predictions=scores_ano).numpy()
        TP = cm[0][0]
        FP = cm[0][1]
        FN = cm[1][0]
        TN = cm[1][1]
        plot_confusion_matrix(cm, class_names)


        diagonal_sum = cm.trace()
        sum_of_all_elements = cm.sum()

        print("Accuracy: ", diagonal_sum / sum_of_all_elements )
        print("Leakage Rate: ", FP/(FP+TP))
        print("False Alarm Rate: ", FN/(FN+TN))
        print("precision_score: ",precision_score(real_label, scores_ano))
        print("recall_score: ",recall_score(real_label, scores_ano))

In [None]:
if __name__ == "__main__":
    # run the function here
    print("start")
    ## Hyperparameters
    batch_size = 24
    input_shape = (IMG_W, IMG_H, IMG_C)
    # print(input_shape)

    """ Input """
    inputs = tf.keras.layers.Input(input_shape, name="input_1")

    num_epochs = 2000
    train_images_path = glob("mura_data/mura_data/train_data/*.bmp")


    d_model = build_discriminator(inputs)
    g_model = build_generator_resnet50_unet(inputs)

    resunetgan = ResUnetGAN(d_model, g_model)


    g_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5, beta_2=0.999)
    d_optimizer = tf.keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5, beta_2=0.999)
    resunetgan.compile(d_optimizer, g_optimizer)

    # print(train_images_path)
    train_images_dataset = tf_dataset(train_images_path, batch_size)

    # resunetgan.fit(train_images_dataset)

    # resunetgan.train(train_images_dataset, num_epochs)
    saved_model_path = "mura_data/mura_data/saved_model/"
    test_data_path = "mura_data/mura_data/test_data/"
    
    array_elapsed = []
    for epoch in range(num_epochs):
        print("Epoch: ", epoch)
        now = datetime.now()
        for image_batch in train_images_dataset:
        # print(image_batch.shape)
            # print("Images_batch: ", image_batch)
            resunetgan.fit(image_batch)
            resunetgan.saved_model(saved_model_path, num_epochs)
        later = datetime.now()
        elapsed_time =  (later - now).total_seconds()
        array_elapsed = np.append(array_elapsed, elapsed_time)
        print("Time Consumend of this epoch: ", elapsed_time)
    
    print("Duration of trainning Data: ", np.sum(array_elapsed), " seconds")

    resunetgan.test_and_eval(test_data_path,
                             saved_model_path + "g_model" + str(num_epochs) + ".h5",
                             saved_model_path + "d_model" + str(num_epochs) + ".h5")
