# Installation

In [None]:
# !pip install -r requirements.txt
# !pip install keras
!pip list | grep "tensorflow"
!pip list | grep "keras"


tensorflow                       2.15.0
tensorflow-datasets              4.9.4
tensorflow-estimator             2.15.0
tensorflow-gcs-config            2.15.0
tensorflow-hub                   0.16.1
tensorflow-io-gcs-filesystem     0.37.0
tensorflow-metadata              1.15.0
tensorflow-probability           0.23.0
keras                            2.15.0
tf_keras                         2.15.1


In [None]:
# !pip uninstall -y keras tensorflow
# !pip install -r requirements.txt
# !pip install --upgrade keras

In [None]:
!pip install keras-metrics

Collecting keras-metrics
  Downloading keras_metrics-1.1.0-py2.py3-none-any.whl (5.6 kB)
Installing collected packages: keras-metrics
Successfully installed keras-metrics-1.1.0


# Models

## Model 1

In [None]:
'''
LeNet-1
'''

import keras
import tensorflow as tf
import numpy as np
from keras.layers import Input

def Model1(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10
        image_size = 28


        # input image dimensions
        img_rows, img_cols = 28, 28

        mnist = tf.keras.datasets.mnist
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train = np.reshape(x_train, [-1, image_size, image_size, 1])
        x_test = np.reshape(x_test, [-1, image_size, image_size, 1])
        x_train = x_train.astype('float32') / 255
        x_test = x_test.astype('float32') / 255


        # network parameters
        input_shape = (image_size, image_size, 1)
        input_tensor = Input(shape=input_shape)

    elif input_tensor is None:
        print('You have to provide input_tensor when testing.')
        exit()

    # Define the model architecture.
    model = keras.Sequential([
      keras.layers.InputLayer(input_shape=(28, 28, 1)),
      keras.layers.Conv2D(filters=4, kernel_size=kernel_size, activation='relu'),
      keras.layers.MaxPooling2D(pool_size=(2, 2)),
      keras.layers.Conv2D(filters=12, kernel_size=kernel_size, activation='relu'),
      keras.layers.MaxPooling2D(pool_size=(2, 2)),
      keras.layers.Flatten(),
      keras.layers.Dense(nb_classes, name='before_softmax'),
      keras.layers.Activation('softmax')
    ])

    if train:
        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        # compiling
        model.fit(
          x_train,
          y_train,
          epochs=nb_epoch,
          validation_split=0.1,
        )


        # save model
        model.save_weights('./Model1.weights.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:

        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        model.load_weights('./Model1.weights.h5')

    return model


if __name__ == '__main__':
    Model1(train=True)


Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/10


  output, from_logits = _get_logits(


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 0.046109363436698914
Overall Test accuracy: 0.9854000210762024


## Model 2

In [None]:
'''
LeNet-1
'''

import keras
import tensorflow as tf
import numpy as np
from keras.layers import Input

def Model2(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10
        image_size = 28


        # input image dimensions
        img_rows, img_cols = 28, 28

        mnist = tf.keras.datasets.mnist
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train = np.reshape(x_train, [-1, image_size, image_size, 1])
        x_test = np.reshape(x_test, [-1, image_size, image_size, 1])
        x_train = x_train.astype('float32') / 255
        x_test = x_test.astype('float32') / 255


        # network parameters
        input_shape = (image_size, image_size, 1)
        input_tensor = Input(shape=input_shape)

    elif input_tensor is None:
        print('You have to provide input_tensor when testing.')
        exit()

    # Define the model architecture.
    model = keras.Sequential([
      keras.layers.InputLayer(input_shape=(28, 28, 1)),
      keras.layers.Conv2D(filters=6, kernel_size=kernel_size, activation='relu'),
      keras.layers.MaxPooling2D(pool_size=(2, 2)),
      keras.layers.Conv2D(filters=16, kernel_size=kernel_size, activation='relu'),
      keras.layers.MaxPooling2D(pool_size=(2, 2)),
      keras.layers.Flatten(),
      keras.layers.Dense(nb_classes, name='before_softmax'),
      keras.layers.Activation('softmax')
    ])

    if train:
        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        # compiling
        model.fit(
          x_train,
          y_train,
          epochs=nb_epoch,
          validation_split=0.1,
        )


        # save model
        model.save_weights('./Model2.weights.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:

        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        model.load_weights('./Model2.weights.h5')

    return model


if __name__ == '__main__':
    Model2(train=True)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 0.040065836161375046
Overall Test accuracy: 0.9868999719619751


## Model 3

In [None]:
'''
LeNet-1
'''

import keras
import tensorflow as tf
import numpy as np
from keras.layers import Input

def Model3(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10
        image_size = 28


        # input image dimensions
        img_rows, img_cols = 28, 28

        mnist = tf.keras.datasets.mnist
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train = np.reshape(x_train, [-1, image_size, image_size, 1])
        x_test = np.reshape(x_test, [-1, image_size, image_size, 1])
        x_train = x_train.astype('float32') / 255
        x_test = x_test.astype('float32') / 255


        # network parameters
        input_shape = (image_size, image_size, 1)
        input_tensor = Input(shape=input_shape)

    elif input_tensor is None:
        print('You have to provide input_tensor when testing.')
        exit()

    # Define the model architecture.
    model = keras.Sequential([
      keras.layers.InputLayer(input_shape=(28, 28, 1)),
      keras.layers.Conv2D(filters=6, kernel_size=kernel_size, activation='relu'),
      keras.layers.MaxPooling2D(pool_size=(2, 2)),
      keras.layers.Conv2D(filters=16, kernel_size=kernel_size, activation='relu'),
      keras.layers.MaxPooling2D(pool_size=(2, 2)),
      keras.layers.Flatten(),
      keras.layers.Dense(120, activation='relu'),
      keras.layers.Dense(84, activation='relu'),
      keras.layers.Dense(nb_classes, name='before_softmax'),
      keras.layers.Activation('softmax')
    ])

    if train:
        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        # compiling
        model.fit(
          x_train,
          y_train,
          epochs=nb_epoch,
          validation_split=0.1,
        )


        # save model
        model.save_weights('./Model3.weights.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:

        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        model.load_weights('./Model3.weights.h5')

    return model


if __name__ == '__main__':
    Model3(train=True)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 0.037026043981313705
Overall Test accuracy: 0.9896000027656555


## Model 4

In [None]:
'''
LeNet-1
'''

import keras
import tensorflow as tf
import numpy as np
from keras.layers import Input

def Model4(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10
        image_size = 28


        # input image dimensions
        img_rows, img_cols = 28, 28

        mnist = tf.keras.datasets.mnist
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train = np.reshape(x_train, [-1, image_size, image_size, 1])
        x_test = np.reshape(x_test, [-1, image_size, image_size, 1])
        x_train = x_train.astype('float32') / 255
        x_test = x_test.astype('float32') / 255


        # network parameters
        input_shape = (image_size, image_size, 1)
        input_tensor = Input(shape=input_shape)

    elif input_tensor is None:
        print('You have to provide input_tensor when testing.')
        exit()

    # Define the model architecture.
    model = keras.Sequential([
      keras.layers.InputLayer(input_shape=(28, 28, 1)),
      keras.layers.Conv2D(filters=32, kernel_size=kernel_size, activation='relu', padding='valid'),
      keras.layers.Conv2D(filters=32, kernel_size=kernel_size, activation='relu', padding='valid'),
      keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),
      keras.layers.Conv2D(filters=64, kernel_size=kernel_size, activation='relu', padding='valid'),
      keras.layers.Conv2D(filters=64, kernel_size=kernel_size, activation='relu', padding='valid'),
      keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),
      keras.layers.Flatten(),
      keras.layers.Dense(200, activation='relu'),
      keras.layers.Dense(200, activation='relu'),
      keras.layers.Dense(nb_classes, name='before_softmax'),
      keras.layers.Activation('softmax')
    ])


    if train:
        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        # compiling
        model.fit(
          x_train,
          y_train,
          epochs=nb_epoch,
          validation_split=0.1,
        )


        # save model
        model.save_weights('./Model4.weights.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:

        # Train the digit classification model
        model.compile(optimizer='adam',
                      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      metrics=['accuracy'])

        model.load_weights('./Model4.weights.h5')

    return model


if __name__ == '__main__':
    Model4(train=True)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 0.04910917207598686
Overall Test accuracy: 0.9887999892234802


# Utils

In [None]:
import random
from collections import defaultdict
import os
import glob
import cv2
import numpy as np
from keras.datasets import mnist
from keras import backend as K
from keras.models import Model
# import scikitplot as skplt
import matplotlib.pyplot as plt
from PIL import Image
import keras
from sklearn.metrics import precision_score, recall_score, f1_score
import math

# util function to convert a tensor into a valid image
def deprocess_image(x):
    x *= 255
    x = np.clip(x, 0, 255).astype('uint8')
    return x.reshape(x.shape[1], x.shape[2])  # original shape (img_rows, img_cols,1)


def normalize(x):
    # utility function to normalize a tensor by its L2 norm
    return x / (K.sqrt(K.mean(K.square(x))) + 1e-5)


def constraint_occl(gradients, start_point, rect_shape):
    new_grads = np.zeros_like(gradients)
    new_grads[:, start_point[0]:start_point[0] + rect_shape[0],
    start_point[1]:start_point[1] + rect_shape[1]] = gradients[:, start_point[0]:start_point[0] + rect_shape[0],
                                                     start_point[1]:start_point[1] + rect_shape[1]]
    return new_grads


def constraint_light(gradients):
    new_grads = np.ones_like(gradients)
    grad_mean = np.mean(gradients)
    return grad_mean * new_grads


def constraint_black(gradients, rect_shape=(6, 6)):
    start_point = (
        random.randint(0, gradients.shape[1] - rect_shape[0]), random.randint(0, gradients.shape[2] - rect_shape[1]))
    new_grads = np.zeros_like(gradients)
    patch = gradients[:, start_point[0]:start_point[0] + rect_shape[0], start_point[1]:start_point[1] + rect_shape[1]]
    if np.mean(patch) < 0:
        new_grads[:, start_point[0]:start_point[0] + rect_shape[0],
        start_point[1]:start_point[1] + rect_shape[1]] = -np.ones_like(patch)
    return new_grads


def init_coverage_tables(model1, model2, model3):
    model_layer_dict1 = defaultdict(bool)
    model_layer_dict2 = defaultdict(bool)
    model_layer_dict3 = defaultdict(bool)
    init_dict(model1, model_layer_dict1)
    init_dict(model2, model_layer_dict2)
    init_dict(model3, model_layer_dict3)
    return model_layer_dict1, model_layer_dict2, model_layer_dict3


def init_dict(model, model_layer_dict):
    for layer in model.layers:
        if 'flatten' in layer.name or 'input' in layer.name:
            continue
        for index in range(layer.output_shape[-1]):
            model_layer_dict[(layer.name, index)] = False


def neuron_to_cover(model_layer_dict):
    not_covered = [(layer_name, index) for (layer_name, index), v in model_layer_dict.items() if not v]
    if not_covered:
        layer_name, index = random.choice(not_covered)
    else:
        layer_name, index = random.choice(model_layer_dict.keys())
    return layer_name, index


def neuron_covered(model_layer_dict):
    covered_neurons = len([v for v in model_layer_dict.values() if v])
    total_neurons = len(model_layer_dict)
    return covered_neurons, total_neurons, covered_neurons / float(total_neurons)


def update_coverage(input_data, model, model_layer_dict, threshold=0):
    layer_names = [layer.name for layer in model.layers if
                   'flatten' not in layer.name and 'input' not in layer.name]

    intermediate_layer_model = Model(inputs=model.input,
                                     outputs=[model.get_layer(layer_name).output for layer_name in layer_names])
    intermediate_layer_outputs = intermediate_layer_model.predict(input_data)

    for i, intermediate_layer_output in enumerate(intermediate_layer_outputs):
        scaled = scale(intermediate_layer_output[0])
        for num_neuron in range(scaled.shape[-1]):
            if np.mean(scaled[..., num_neuron]) > threshold and not model_layer_dict[(layer_names[i], num_neuron)]:
                model_layer_dict[(layer_names[i], num_neuron)] = True


def full_coverage(model_layer_dict):
    if False in model_layer_dict.values():
        return False
    return True


def scale(intermediate_layer_output, rmax=1, rmin=0):
    X_std = (intermediate_layer_output - intermediate_layer_output.min()) / (
        intermediate_layer_output.max() - intermediate_layer_output.min())
    X_scaled = X_std * (rmax - rmin) + rmin
    return X_scaled


def fired(model, layer_name, index, input_data, threshold=0):
    intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)
    intermediate_layer_output = intermediate_layer_model.predict(input_data)[0]
    scaled = scale(intermediate_layer_output)
    if np.mean(scaled[..., index]) > threshold:
        return True
    return False


def diverged(predictions1, predictions2, predictions3, target):
    #     if predictions2 == predictions3 == target and predictions1 != target:
    if not predictions1 == predictions2 == predictions3:
        return True
    return False


def cumulative_neuron_coverage(model_layer_dict1, model_layer_dict2, model_layer_dict3):
    for (layer_name, index), v in model_layer_dict1.items():
        model_layer_dict3[(layer_name, index)] = v or model_layer_dict2[(layer_name, index)]


def neurons_covered_uncommon(model_layer_dict1, model_layer_dict2):
    result = []
    #dict1 are valid tests and dict2 are invalid
    for (layer_name, index), v in model_layer_dict1.items():
        if (not v) and model_layer_dict2[(layer_name, index)]:
            result.append((layer_name, index))
    return result

def neuron_not_covered(model_layer_dict1):
    result = []
    for (layer_name, index), v in model_layer_dict1.items():
        if (not v):
            result.append((layer_name, index))
    return result



def delete_files_from_dir(dirPath, ext):
    # eg input = /tmp/*.txt
    fileFormat = dirPath + '*.' + ext
    files = glob.glob(fileFormat)
    for f in files:
        try:
            os.remove(f)
        except OSError as e:
            print("Error: %s : %s" % (f, e.strerror))


# This api is for sampling from latent space of VAE
def sampling(args):
    """Reparameterization trick by sampling from an isotropic unit Gaussian.
    # Arguments
        args (tensor): mean and log of variance of Q(z|X)
    # Returns
        z (tensor): sampled latent vector
    """

    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    # by default, random_normal has mean = 0 and std = 1.0
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon


# Logic for calculating reconstruction probability
def reconstruction_probability(decoder, z_mean, z_log_var, X):
    """
    :param decoder: decoder model
    :param z_mean: encoder predicted mean value
    :param z_log_var: encoder predicted sigma square value
    :param X: input data
    :return: reconstruction probability of input
            calculated over L samples from z_mean and z_log_var distribution
    """
    reconstructed_prob = np.zeros((X.shape[0],), dtype='float32')
    L = 1000
    for l in range(L):
        print("[DEBUG] l = ", l)
        sampled_zs = sampling([z_mean, z_log_var])
        mu_hat, log_sigma_hat = decoder.predict(sampled_zs, steps=1)
        log_sigma_hat = np.float64(log_sigma_hat)
        sigma_hat = np.exp(log_sigma_hat) + 0.00001

        loss_a = np.log(2 * np.pi * sigma_hat)
        loss_m = np.square(mu_hat - X) / sigma_hat
        reconstructed_prob += -0.5 * np.sum(loss_a + loss_m, axis=1)
    reconstructed_prob /= L
    return reconstructed_prob


# Calculates and returns probability density of test input
def calculate_density(x_target_orig, vae):
    print("Flag 7")

    x_target_orig = np.clip(x_target_orig, 0, 1)
    x_target_orig = np.reshape(x_target_orig, (-1, 28*28))
    x_target = np.reshape(x_target_orig, (-1, 28, 28, 1))
    print("Flag 8")

    z_mean, z_log_var, _ = vae.get_layer('encoder').predict(x_target,
                                                            batch_size=128)
    print("Flag 9")

    reconstructed_prob_x_target = reconstruction_probability(vae.get_layer('decoder'), z_mean, z_log_var, x_target_orig)
    print("Flag 10")

    return reconstructed_prob_x_target


# checks whether a test input is valid or invalid
#Returns true if invalid
def isInvalid(gen_img, vae, vae_threshold):
    print("Flag 5")
    gen_img_density = calculate_density(gen_img, vae)
    print("Flag 6")

    if gen_img_density < vae_threshold or math.isnan(gen_img_density):
        return True
    else:
        return False


# VAE


In [None]:
'''
Code is implemented over the baseline provided in keras git repo
https://github.com/keras-team/keras/blob/master/examples/variational_autoencoder.py

Example of VAE on MNIST dataset using MLP
The VAE has a modular design. The encoder, decoder and VAE
are 3 models that share weights. After training the VAE model,
the encoder can be used to generate latent vectors.
The decoder can be used to generate MNIST digits by sampling the
latent vector from a Gaussian distribution with mean = 0 and std = 1.
# Reference
[1] Kingma, Diederik P., and Max Welling.
"Auto-Encoding Variational Bayes."
https://arxiv.org/abs/1312.6114
'''

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from keras.layers import Lambda, Input, Dense, Reshape
from keras.models import Model
from keras.datasets import mnist, fashion_mnist
from keras.losses import mse, binary_crossentropy
from keras import backend as K
# from keras.optimizers import Adam
from keras.models import model_from_json

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from scipy.stats import multivariate_normal

import argparse
import os
# from utils import *
import tensorflow as tf


# reparameterization trick
# instead of sampling from Q(z|X), sample epsilon = N(0,I)
# z = z_mean + sqrt(var) * epsilon
def sampling(args):
    """Reparameterization trick by sampling from an isotropic unit Gaussian.
    # Arguments
        args (tensor): mean and log of variance of Q(z|X)
    # Returns
        z (tensor): sampled latent vector
    """

    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    # by default, random_normal has mean = 0 and std = 1.0
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

def Vae_MNIST_NN1(input_tensor=None, train=False):
    np.random.seed(0)
    # MNIST dataset
    image_size = 28
    if train:
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train = np.reshape(x_train, [-1, image_size, image_size, 1])
        x_test = np.reshape(x_test, [-1, image_size, image_size, 1])
        x_train = x_train.astype('float32') / 255
        x_test = x_test.astype('float32') / 255

        # network parameters
        input_shape = (image_size, image_size, 1)
        input_tensor = Input(shape=input_shape)


        # mnist = tf.keras.datasets.mnist
        # (train_images, train_labels), (test_images, test_labels) = mnist.load_data()

        # # Normalize the input image so that each pixel value is between 0 to 1.
        # train_images = train_images / 255.0
        # test_images = test_images / 255.0
        # input_tensor = test_images


        batch_size = 128
        epochs = 1

    elif input_tensor is None:
        print('you have to proved input_tensor when testing')
        exit()

    latent_dim = 200
    intermediate_dims = np.array([400])

    # VAE model = encoder + decoder
    # build encoder model
    original_dim = image_size * image_size
    inputs = Reshape((original_dim,), name='encoder_input')(input_tensor)
    x = Dense(intermediate_dims[0], activation='relu')(inputs)
    for i in range(intermediate_dims.shape[0]):
        if i != 0:
            x = Dense(intermediate_dims[i], activation='relu')(x)
    z_mean = Dense(latent_dim, name='z_mean')(x)
    z_log_var = Dense(latent_dim, name='z_log_var')(x)

    # use reparameterization trick to push the sampling out as input
    # note that "output_shape" isn't necessary with the TensorFlow backend
    z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

    # instantiate encoder model
    encoder = Model(input_tensor, [z_mean, z_log_var, z], name='encoder')
    #encoder.summary()

    # build decoder model
    intermediate_dims = np.flipud(intermediate_dims)
    latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
    x = Dense(intermediate_dims[0], activation='relu')(latent_inputs)
    for i in range(intermediate_dims.shape[0]):
        if i != 0:
            x = Dense(intermediate_dims[i], activation='relu')(x)
    pos_mean = Dense(original_dim, name='pos_mean')(x)
    pos_log_var = Dense(original_dim, name='pos_log_var')(x)

    # instantiate decoder model
    decoder = Model(latent_inputs, [pos_mean, pos_log_var], name='decoder')
    #decoder.summary()

    # instantiate VAE model
    outputs = decoder(encoder(input_tensor)[2])
    vae = Model(input_tensor, outputs, name='vae_mlp')
    #vae.summary()
    if train:
        # VAE loss = reconstruction_loss + kl_loss
        loss_a = float(np.log(2 * np.pi)) + outputs[1]
        loss_m = K.square(outputs[0] - inputs) / K.exp(outputs[1])
        reconstruction_loss = -0.5 * K.sum((loss_a + loss_m), axis=-1)

        kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
        kl_loss = K.sum(kl_loss, axis=-1)
        kl_loss *= -0.5
        vae_loss = K.mean(-reconstruction_loss + kl_loss)
        vae.add_loss(vae_loss)
        vae.compile(optimizer="adam")
        # vae.compile()
        vae.summary()
        # vae.add_metric(reconstruction_loss, "reconstruct")
        # vae.add_metric(kl_loss, "kl")
        vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_test, None))
        # save model
        vae.save_weights('./vae_mnist_nn1.h5')
    else:
        # VAE loss = reconstruction_loss + kl_loss
        loss_a = float(np.log(2 * np.pi)) + outputs[1]
        loss_m = K.square(outputs[0] - inputs) / K.exp(outputs[1])
        reconstruction_loss = -0.5 * K.sum((loss_a + loss_m), axis=-1)

        kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
        kl_loss = K.sum(kl_loss, axis=-1)
        kl_loss *= -0.5
        vae_loss = K.mean(-reconstruction_loss + kl_loss)
        vae.add_loss(vae_loss)

        vae.compile(optimizer="adam")


        vae.load_weights('./vae_mnist_nn1.h5')

    return vae

if __name__ == '__main__':
    Vae_MNIST_NN1(train=True)


Model: "vae_mlp"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_9 (InputLayer)        [(None, 28, 28, 1)]          0         []                            
                                                                                                  
 encoder (Functional)        [(None, 200),                474400    ['input_9[0][0]']             
                              (None, 200),                                                        
                              (None, 200)]                                                        
                                                                                                  
 decoder (Functional)        [(None, 784),                709168    ['encoder[0][2]']             
                              (None, 784)]                                                  

# Test-Case Generator

## Prepare

In [None]:
'''
Code is built on top of DeepXplore code base.
Density objective and VAE validation is added to the original objective function.
We use DeepXplore as a baseline technique for test generation.

DeepXplore: https://github.com/peikexin9/deepxplore
'''

from __future__ import print_function

import argparse

from keras.datasets import mnist
from keras.layers import Input
from keras.utils import to_categorical

import imageio
import numpy as np
import math
import time
import datetime
random.seed(3)

# from __future__ import print_function

# import argparse

# from keras.datasets import mnist
# from keras.layers import Input
# # from keras.utils import to_categorical
# # from Model1 import Model1
# # from Model2 import Model2
# # from Model3 import Model3
# # from Model4 import Model4
# # from Vae_MNIST_NN1 import Vae_MNIST_NN1
# # from utils import *
# import imageio
# import numpy as np
# import math
# import time
# import datetime
# import tensorflow as tf
tf.compat.v1.disable_eager_execution()

# random.seed(3)

# read the parameter
# argument parsing
# parser = argparse.ArgumentParser(description='Main function for difference-inducing input generation in MNIST dataset')
# parser.add_argument('transformation', help="realistic transformation type", choices=['light', 'occl', 'blackout'])
# parser.add_argument('weight_diff', help="weight hyperparm to control differential behavior", type=float)
# parser.add_argument('weight_nc', help="weight hyperparm to control neuron coverage", type=float)
# parser.add_argument('weight_vae', help="weight hyperparm to control vae goal", type=float)
# parser.add_argument('step', help="step size of gradient descent", type=float)
# parser.add_argument('seeds', help="number of seeds of input", type=int)
# parser.add_argument('grad_iterations', help="number of iterations of gradient descent", type=int)
# parser.add_argument('threshold', help="threshold for determining neuron activated", type=float)
# parser.add_argument('-t', '--target_model', help="target model that we want it predicts differently",
#                     choices=[0, 1, 2, 3], default=0, type=int)
# parser.add_argument('-sp', '--start_point', help="occlusion upper left corner coordinate", default=(0, 0), type=tuple)
# parser.add_argument('-occl_size', '--occlusion_size', help="occlusion size", default=(10, 10), type=tuple)
# args = parser.parse_args()

# python3 dist_gen_diff.py occl 3 .5 .1 .1 50 20 .25 --target_model=$model;

class Args:
  def __init__(self):
    self.transformation = 'occl'
    self.weight_diff = 3
    self.weight_nc = 0.5
    self.weight_vae = .1
    self.step = .1
    self.seeds = 50
    self.grad_iterations = 20
    self.threshold = .25
    self.target_model = 0


args = Args()


print("\n\n")

if args.weight_vae == 0:
    output_directory = './baseline_generated_inputs_Model' + str(args.target_model + 1)+'/'+(args.transformation)+'/'
else:
    output_directory = './generated_inputs_Model' + str(args.target_model + 1)+'/'+(args.transformation)+'/'

#Create directory to store generated tests
if not os.path.exists(output_directory):
    os.makedirs(output_directory)
delete_files_from_dir(output_directory, 'png')

# Create a subdirectory inside output directory
# for saving original seed images used for test generation
orig_directory = output_directory+'seeds/'
if not os.path.exists(orig_directory):
    os.makedirs(orig_directory)
delete_files_from_dir(orig_directory, 'png')

# VAE density threshold for classifying invalid inputs
vae_threshold = -2708.34

# input image dimensions
img_rows, img_cols = 28, 28
img_dim = img_rows * img_cols
# the data, shuffled and split between train and test sets
(_, _), (x_test, y_test) = mnist.load_data()

x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)

x_test = x_test.astype('float32')
x_test /= 255

# define input tensor as a placeholder
input_tensor = Input(shape=input_shape)

# load multiple models sharing same input tensor
model1 = Model1(input_tensor=input_tensor)
model2 = Model2(input_tensor=input_tensor)
if args.target_model == 3:
    model3 = Model4(input_tensor=input_tensor)
else:
    model3 = Model3(input_tensor=input_tensor)
vae = Vae_MNIST_NN1(input_tensor=input_tensor)

# init coverage table
model_layer_dict1, model_layer_dict2, model_layer_dict3 = init_coverage_tables(model1, model2, model3)

if args.weight_vae == 0:
    print("*****Running baseline test....")
else:
    print("*****Running VAE+Baseline test....")








  output, from_logits = _get_logits(


*****Running VAE+Baseline test....



## Input Geneartion Loop

In [None]:

# ==============================================================================================
# start gen inputs

start_time = datetime.datetime.now()
seed_nums = np.load('./seeds/seeds_'+str(args.seeds)+'.npy')
result_loop_index = []
result_coverage = []
loop_index = 0
for current_seed in seed_nums:
    # below logic is to track number of iterations under progress
    loop_index += 1

    gen_img = np.expand_dims(x_test[current_seed], axis=0)
    orig_img = gen_img.copy()
    # first check if input already induces differences
    label1, label2, label3 = np.argmax(model1.predict(gen_img)[0]), np.argmax(model2.predict(gen_img)[0]), np.argmax(
        model3.predict(gen_img)[0])

    if not label1 == label2 == label3 and not isInvalid(gen_img, vae, vae_threshold):
        #print('input already causes different outputs: {}, {}, {}'.format(label1, label2, label3))

        update_coverage(gen_img, model1, model_layer_dict1, args.threshold)
        update_coverage(gen_img, model2, model_layer_dict2, args.threshold)
        update_coverage(gen_img, model3, model_layer_dict3, args.threshold)
        if args.target_model == 0:
            result_coverage.append(neuron_covered(model_layer_dict1)[2])
        elif args.target_model == 1:
            result_coverage.append(neuron_covered(model_layer_dict2)[2])
        elif args.target_model == 2:
            result_coverage.append(neuron_covered(model_layer_dict3)[2])
        elif args.target_model == 3:
            result_coverage.append(neuron_covered(model_layer_dict3)[2])
        #print('covered neurons percentage %d neurons %.3f, %d neurons %.3f, %d neurons %.3f'
        #      % (len(model_layer_dict1), neuron_covered(model_layer_dict1)[2], len(model_layer_dict2),
        #         neuron_covered(model_layer_dict2)[2], len(model_layer_dict3),
        #         neuron_covered(model_layer_dict3)[2]))
        #averaged_nc = (neuron_covered(model_layer_dict1)[0] + neuron_covered(model_layer_dict2)[0] +
        #               neuron_covered(model_layer_dict3)[0]) / float(
        #    neuron_covered(model_layer_dict1)[1] + neuron_covered(model_layer_dict2)[1] +
        #    neuron_covered(model_layer_dict3)[
        #        1])
        #print('averaged covered neurons %.3f' % averaged_nc)

        gen_img_deprocessed = deprocess_image(gen_img)

        # save the result to disk
        imageio.imwrite(output_directory + 'already_differ_' + str(current_seed) + '_' + str(label1) + '_' + str(label2) + '_' + str(label3) + '.png', gen_img_deprocessed)
        continue


    # if all label agrees
    orig_label = label1
    layer_name1, index1 = neuron_to_cover(model_layer_dict1)
    layer_name2, index2 = neuron_to_cover(model_layer_dict2)
    layer_name3, index3 = neuron_to_cover(model_layer_dict3)

    # construct joint loss function
    if args.target_model == 0:
        loss1 = -args.weight_diff * K.mean(model1.get_layer('before_softmax').output[..., orig_label])
        loss2 = K.mean(model2.get_layer('before_softmax').output[..., orig_label])
        loss3 = K.mean(model3.get_layer('before_softmax').output[..., orig_label])
    elif args.target_model == 1:
        loss1 = K.mean(model1.get_layer('before_softmax').output[..., orig_label])
        loss2 = -args.weight_diff * K.mean(model2.get_layer('before_softmax').output[..., orig_label])
        loss3 = K.mean(model3.get_layer('before_softmax').output[..., orig_label])
    elif args.target_model == 2:
        loss1 = K.mean(model1.get_layer('before_softmax').output[..., orig_label])
        loss2 = K.mean(model2.get_layer('before_softmax').output[..., orig_label])
        loss3 = -args.weight_diff * K.mean(model3.get_layer('before_softmax').output[..., orig_label])
    elif args.target_model == 3:
        loss1 = K.mean(model1.get_layer('before_softmax').output[..., orig_label])
        loss2 = K.mean(model2.get_layer('before_softmax').output[..., orig_label])
        loss3 = -args.weight_diff * K.mean(model3.get_layer('before_softmax').output[..., orig_label])
    loss1_neuron = K.mean(model1.get_layer(layer_name1).output[..., index1])
    loss2_neuron = K.mean(model2.get_layer(layer_name2).output[..., index2])
    loss3_neuron = K.mean(model3.get_layer(layer_name3).output[..., index3])

    # vae reconstruction probability
    vae_input = vae.get_layer('encoder').get_layer('encoder_input').output
    vae_output = vae.outputs
    loss_a = float(np.log(2 * np.pi)) + vae_output[1]
    loss_m = K.square(vae_output[0] - vae_input) / K.exp(vae_output[1])
    vae_reconstruction_prob = -0.5 * K.sum((loss_a + loss_m), axis=-1)
    vae_reconstruction_prob = vae_reconstruction_prob/img_dim

    layer_output = (loss1 + loss2 + loss3) + args.weight_nc * (loss1_neuron + loss2_neuron + loss3_neuron) + args.weight_vae * vae_reconstruction_prob

    # for adversarial image generation
    final_loss = K.mean(layer_output)

    # we compute the gradient of the input picture wrt this loss
    grads = normalize(K.gradients(final_loss, input_tensor)[0])

    # this function returns the loss and grads given the input picture
    iterate = K.function([input_tensor], [loss1, loss2, loss3, loss1_neuron, loss2_neuron, loss3_neuron, grads])

    # Running gradient ascent
    for iters in range(args.grad_iterations):
        gen_img = gen_img.reshape(28,28,1)
        gen_img = gen_img[None, ...]
        print(gen_img.shape)
        X = [gen_img]
        temp = iterate(X)

        loss_value1, loss_value2, loss_value3, loss_neuron1, loss_neuron2, loss_neuron3, grads_value = temp

        #Apply domain specific constraints
        if args.transformation == 'light':
            grads_value = constraint_light(grads_value)
        elif args.transformation == 'occl':
            grads_value = constraint_occl(grads_value, args.start_point,
                                          args.occlusion_size)
        elif args.transformation == 'blackout':
            grads_value = constraint_black(grads_value)

        # generate the new test input
        gen_img += grads_value * args.step
        gen_img = np.clip(gen_img, 0, 1)
        predictions1 = np.argmax(model1.predict(gen_img)[0])
        predictions2 = np.argmax(model2.predict(gen_img)[0])
        predictions3 = np.argmax(model3.predict(gen_img)[0])

        if not predictions1 == predictions2 == predictions3:
            if isInvalid(gen_img, vae, vae_threshold):
                # print("generated outlier, not saving ",loop_index, iters)
                continue

            #print(datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
            #print("generated valid input at loop index for current_seed", loop_index, current_seed)

            # Update coverage
            update_coverage(gen_img, model1, model_layer_dict1, args.threshold)
            update_coverage(gen_img, model2, model_layer_dict2, args.threshold)
            update_coverage(gen_img, model3, model_layer_dict3, args.threshold)

            #print('covered neurons percentage %d neurons %.3f, %d neurons %.3f, %d neurons %.3f'
            #      % (len(model_layer_dict1), neuron_covered(model_layer_dict1)[2], len(model_layer_dict2),
            #         neuron_covered(model_layer_dict2)[2], len(model_layer_dict3),
            #         neuron_covered(model_layer_dict3)[2]))
            #averaged_nc = (neuron_covered(model_layer_dict1)[0] + neuron_covered(model_layer_dict2)[0] +
            #               neuron_covered(model_layer_dict3)[0]) / float(
            #    neuron_covered(model_layer_dict1)[1] + neuron_covered(model_layer_dict2)[1] +
            #    neuron_covered(model_layer_dict3)[
            #        1])
            #print('averaged covered neurons %.3f' % averaged_nc)

            # Track the seed numbers and coverage achieved for final result
            result_loop_index.append(loop_index)
            if args.target_model == 0:
                result_coverage.append(neuron_covered(model_layer_dict1)[2])
            elif args.target_model == 1:
                result_coverage.append(neuron_covered(model_layer_dict2)[2])
            elif args.target_model == 2:
                result_coverage.append(neuron_covered(model_layer_dict3)[2])
            elif args.target_model == 3:
                result_coverage.append(neuron_covered(model_layer_dict3)[2])

            gen_img_deprocessed = deprocess_image(gen_img)
            orig_img_deprocessed = deprocess_image(orig_img)

            # save the result to disk
            imageio.imwrite(
                    output_directory + str(loop_index) + '_' + str(
                        predictions1) + '_' + str(predictions2) + '_' + str(predictions3)+'.png',
                    gen_img_deprocessed)
            imageio.imwrite(
                    orig_directory + str(loop_index) + '_' + str(
                        predictions1) + '_' + str(predictions2) + '_' + str(predictions3)+'_orig.png',
                    orig_img_deprocessed)
            break

duration = (datetime.datetime.now() - start_time).total_seconds()
no_tests = len(result_loop_index)

if args.weight_vae == 0:
    print("**** Result of baseline test:")
else:
    print("**** Result of VAE+Baseline test:")

print("No of test inputs generated: ", no_tests)
if no_tests == 0:
    print("Cumulative coverage for tests: 0")
    print('Avg. test generation time: NA s')
else:
    print("Cumulative coverage for tests: ", round(result_coverage[-1],3))
    print('Avg. test generation time: {} s'.format(round(duration/no_tests),2))
print('Total time: {} s'.format(round(duration, 2)))


  updates=self.state_updates,


Flag 5
Flag 7
Flag 8
Flag 9
[DEBUG] l =  0
[DEBUG] l =  1
[DEBUG] l =  2
[DEBUG] l =  3
[DEBUG] l =  4
[DEBUG] l =  5
[DEBUG] l =  6
[DEBUG] l =  7
[DEBUG] l =  8
[DEBUG] l =  9
[DEBUG] l =  10
[DEBUG] l =  11
[DEBUG] l =  12
[DEBUG] l =  13
[DEBUG] l =  14
[DEBUG] l =  15
[DEBUG] l =  16
[DEBUG] l =  17
[DEBUG] l =  18
[DEBUG] l =  19
[DEBUG] l =  20
[DEBUG] l =  21
[DEBUG] l =  22
[DEBUG] l =  23
[DEBUG] l =  24
[DEBUG] l =  25
[DEBUG] l =  26
[DEBUG] l =  27
[DEBUG] l =  28
[DEBUG] l =  29
[DEBUG] l =  30
[DEBUG] l =  31
[DEBUG] l =  32
[DEBUG] l =  33
[DEBUG] l =  34
[DEBUG] l =  35
[DEBUG] l =  36
[DEBUG] l =  37
[DEBUG] l =  38
[DEBUG] l =  39
[DEBUG] l =  40
[DEBUG] l =  41
[DEBUG] l =  42
[DEBUG] l =  43
[DEBUG] l =  44
[DEBUG] l =  45
[DEBUG] l =  46
[DEBUG] l =  47
[DEBUG] l =  48
[DEBUG] l =  49
[DEBUG] l =  50
[DEBUG] l =  51
[DEBUG] l =  52
[DEBUG] l =  53
[DEBUG] l =  54
[DEBUG] l =  55
[DEBUG] l =  56
[DEBUG] l =  57
[DEBUG] l =  58
[DEBUG] l =  59
[DEBUG] l =  60
[DEBUG

  if gen_img_density < vae_threshold or math.isnan(gen_img_density):


Flag 5
Flag 7
Flag 8
Flag 9
[DEBUG] l =  0
[DEBUG] l =  1
[DEBUG] l =  2
[DEBUG] l =  3
[DEBUG] l =  4
[DEBUG] l =  5
[DEBUG] l =  6
[DEBUG] l =  7
[DEBUG] l =  8
[DEBUG] l =  9
[DEBUG] l =  10
[DEBUG] l =  11
[DEBUG] l =  12
[DEBUG] l =  13
[DEBUG] l =  14
[DEBUG] l =  15
[DEBUG] l =  16
[DEBUG] l =  17
[DEBUG] l =  18
[DEBUG] l =  19
[DEBUG] l =  20
[DEBUG] l =  21
[DEBUG] l =  22
[DEBUG] l =  23
[DEBUG] l =  24
[DEBUG] l =  25
[DEBUG] l =  26
[DEBUG] l =  27
[DEBUG] l =  28
[DEBUG] l =  29
[DEBUG] l =  30
[DEBUG] l =  31
[DEBUG] l =  32
[DEBUG] l =  33
[DEBUG] l =  34
[DEBUG] l =  35
[DEBUG] l =  36
[DEBUG] l =  37
[DEBUG] l =  38
[DEBUG] l =  39
[DEBUG] l =  40
[DEBUG] l =  41
[DEBUG] l =  42
[DEBUG] l =  43
[DEBUG] l =  44
[DEBUG] l =  45
[DEBUG] l =  46
[DEBUG] l =  47
[DEBUG] l =  48
[DEBUG] l =  49
[DEBUG] l =  50
[DEBUG] l =  51
[DEBUG] l =  52
[DEBUG] l =  53
[DEBUG] l =  54
[DEBUG] l =  55
[DEBUG] l =  56
[DEBUG] l =  57
[DEBUG] l =  58
[DEBUG] l =  59
[DEBUG] l =  60
[DEBUG

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

# Plotting
data = result_coverage
plt.figure(figsize=(10, 6))
plt.plot(data, marker='o')
plt.xlabel('Time (Tick)')
plt.ylabel('Coverage(%)')
plt.title('Plot of the Provided Data List')
plt.grid(True)
plt.show()

In [None]:
!tar -czvf occl.tar.gz /content/generated_inputs_Model1/occl

In [None]:
!cp occl.tar.gz ./drive/MyDrive/occl.tar.gz