# Imports

In [None]:
import matplotlib
import numpy as np
import pandas as pd
import seaborn
import sklearn
import sklearn.model_selection
import sklearn.preprocessing
import tensorflow as tf
import random
from keras.callbacks import CSVLogger
import os
from enum import Enum
from tensorflow.keras import datasets, layers, models
import matplotlib.pyplot as plt

# Hyperparameters

In [None]:
model_name = "CNN" #it will create a directory with this name to save the model's weights and training history

size_batch = 2048 # is important to ensure that each batch has a decent chance of containing a few positive samples
epochs_to_train = 300 #300
# initial_learning_rate = 4.5e-03 #Eh?Predictor=0.05, default=0.001
# lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
#     initial_learning_rate,
#     decay_steps=100000,
#     decay_rate=0.94)

use_softmax = True

# CNN

In [None]:
model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation = 'relu', input_shape = (23, 23, 6)))
model.add(layers.Conv2D(32, (3, 3), activation = 'relu'))

model.add(layers.Conv2D(64, (3, 3), activation = 'relu'))
model.add(layers.Conv2D(64, (3, 3), activation = 'relu'))
model.add(layers.Conv2D(64, (3, 3), activation = 'relu'))

model.add(layers.AveragePooling2D((3, 3)))

model.add(layers.Flatten())

model.add(layers.Dense(128, activation = 'relu'))
model.add(layers.Dense(128, activation = 'relu'))

model.add(layers.Dense(2, activation = 'relu'))

loss_function = None
metrics_evaluate = None
if use_softmax:
    model.add(layers.Dense(2, activation = 'softmax'))
    loss_function = tf.keras.losses.CategoricalCrossentropy()
    metrics_evaluate = tf.keras.metrics.CategoricalCrossentropy()
else:
    model.add(layers.Dense(1, activation = 'sigmoid'))
    loss_function = tf.keras.losses.CategoricalCrossentropy()
    metrics_evaluate = [tf.keras.metrics.TruePositives(name = 'tp'),
                        tf.keras.metrics.FalsePositives(name = 'fp'),
                        tf.keras.metrics.TrueNegatives(name = 'tn'),
                        tf.keras.metrics.FalseNegatives(name = 'fn'),
                        tf.keras.metrics.BinaryAccuracy(name = 'accuracy'),
                        tf.keras.metrics.Precision(name = 'precision'),
                        tf.keras.metrics.Recall(name = 'recall'),
                        tf.keras.metrics.AUC(name = 'auc')]

model.compile(optimizer = tf.keras.optimizers.Adam(),
              loss = loss_function,
              metrics = metrics_evaluate)

# Load CNN data

In [None]:
circuits = ["ispd19_test"+str(x) for x in range(1, 11)]
test_circuit = "ispd19_test10"
if test_circuit in circuits:
    circuits.remove(test_circuit)
circuits.remove("ispd19_test4")#low density benchmark
circuits.remove("ispd19_test5")#low density benchmark
circuits.remove("ispd19_test9")#Simillar from test10

circuits.extend(['aes', 'blackParrot', 'dynamicNode', 'gcd', 'ibex', 'jpeg', 'swerv', 'TinyRocket'])

data_path = './NPYCNN/'

file_types = ['violatingNodes', 'surroundingViolNodes', 'nonViolatingNodes']

training_data = np.empty((0,23,23,6), dtype=np.int32)
image = None
if use_softmax:
    image = np.empty((0,2), dtype=np.int32)
else:
    image = np.empty((0), dtype=np.int32)
for circuit in circuits:
    for file_type in file_types:
        file_name = circuit+'_'+file_type+'.npy'
        print('Loading: ',circuit,' file_name: ',file_name)
        data = np.load(data_path+file_name)
        training_data = np.concatenate((training_data, data), axis=0)
        
        img = None
        if use_softmax:
            img = np.full((data.shape[0], 2), 0, dtype=np.int32)
            if file_type == 'violatingNodes':
                img[:, 1] = 1
            else:
                img[:, 0] = 1
        else:
            if file_type == 'violatingNodes':
                img = np.full((data.shape[0]), 1, dtype=np.int32)
            else:
                img = np.full((data.shape[0]), 0, dtype=np.int32)
        image = np.concatenate((image, img), axis=0)

print('training_data: ', training_data.shape)
print('image: ', image.shape)

In [None]:
checkpoint_path = model_name+"/cp.ckpt"
model.load_weights(checkpoint_path)

# Train

In [None]:
checkpoint_path = model_name+"/cp.ckpt"

if not os.path.exists(model_name):
    os.mkdir(model_name)
else:
    model.load_weights(checkpoint_path)

# Create a callback that saves the model's weights at the end of each epoch
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, save_weights_only=True)
# Create a callback that saves model history at the end of each epoch
csv_logger = CSVLogger(model_name+"/model_history_log.csv", append=True)

train_history = model.fit(training_data,
                         image,
                         batch_size=size_batch,
                         epochs=epochs_to_train,
                         callbacks=[cp_callback, csv_logger])

# Some plot functions

In [None]:
#matplotlib.rcParams['figure.figsize'] = (12, 10)
colors = matplotlib.pyplot.rcParams['axes.prop_cycle'].by_key()['color']

def computeFScoreAndMCC(df):
    df['F-score'] = (2 * df['precision'] * df['recall'])/(df['precision'] + df['recall'])
    sqrt = np.sqrt((df['tp']+df['fp'])*(df['tp']+df['fn'])*(df['tn']+df['fp'])*(df['tn']+df['fn']))
    df['MCC'] = (df['tp'] * df['tn'] - df['fp'] * df['fn'])/sqrt

# plot the training loss and accuracy
def plot_df(history_df, metric, size=None):
    if size == None:
        size = history_df.shape[0]
    matplotlib.pyplot.style.use("ggplot")
    matplotlib.pyplot.figure()
    matplotlib.pyplot.plot(np.arange(0, size), history_df[metric][0:size], label=metric)
    matplotlib.pyplot.title("Training performace: "+metric)
    matplotlib.pyplot.xlabel("Epoch #")
    matplotlib.pyplot.ylabel(metric)
    matplotlib.pyplot.show()

def plot_cm(labels, predictions, title=None, output_path=None, p=0.5):
    cm = sklearn.metrics.confusion_matrix(labels, predictions > p)
    matplotlib.pyplot.figure(figsize=(5,5))
    seaborn.heatmap(cm, annot=True, fmt="d")
    if title == None:
        matplotlib.pyplot.title('Confusion matrix')
    else:
        matplotlib.pyplot.title(title)
    matplotlib.pyplot.ylabel('Actual label')
    matplotlib.pyplot.xlabel('Predicted label')
    if output_path != None:
        matplotlib.pyplot.savefig(output_path)
    else:
        matplotlib.pyplot.show()

# Predict

In [None]:
if use_softmax:
    result = model.predict(training_data)
    result2 = np.argmax(result, axis=1)
    plot_cm(image[:,1], result2, "Test Confusion Matrix")
else:
    result = model.predict(training_data)
    plot_cm(image, result, "Test Confusion Matrix")

# Training performance

In [None]:
history_df = pd.read_csv(model_name+"/model_history_log.csv") #Path to "model_history_log.csv"
computeFScoreAndMCC(history_df)
metrics_to_draw = ['loss', 'F-score', 'MCC', 'precision', 'recall']
max_epochs = 30 #Use None to draw the entire history
for metric in metrics_to_draw:
    plot_df(history_df, metric) #plot_df(history_df, metric, max_epochs)

# Test and check performance

In [None]:
def calculate_test_metrics(model, results):
    m = {}
    for name, value in zip(model.metrics_names, results):
        m[name] = value
    if m['precision'] + m['recall'] != 0:
        f_score = (2 * m['precision'] * m['recall'])/(m['precision'] + m['recall'])
        m['F-score'] = f_score
    sqrt = math.sqrt((m['tp']+m['fp'])*(m['tp']+m['fn'])*(m['tn']+m['fp'])*(m['tn']+m['fn']))
    if sqrt != 0:
        mcc = (m['tp'] * m['tn'] - m['fp'] * m['fn'])/sqrt
        m['MCC'] = mcc
    return m

baseline_results = model.evaluate(test_array, test_labels, batch_size=batch_size, verbose=0)
metrics = calculate_test_metrics(model, baseline_results)
print(metrics)
test_predictions_baseline = model.predict(test_array, batch_size=batch_size)
plot_cm(test_labels, test_predictions_baseline, "Test Confusion Matrix")

# Pre-Save Data in NPY format

In [None]:
'''
circuits = ["ispd19_test"+str(x) for x in range(1, 11)]
circuits.remove("ispd19_test4")#low density benchmark (Older STD Lib)
circuits.remove("ispd19_test5")#low density benchmark (Older STD Lib)
circuits.remove("ispd19_test9")#Simillar from test10
circuits.extend(['aes', 'blackParrot', 'dynamicNode', 'gcd', 'ibex', 'jpeg', 'swerv', 'TinyRocket'])
'''
#circuits = ['aes', 'blackParrot', 'dynamicNode', 'gcd', 'ibex', 'jpeg', 'swerv', 'TinyRocket']
circuits = ["ispd19_test10"]

data_path = '/home/sheiny/workspace/Data/CNN/'
npy_path = '/home/sheiny/workspace/Data/NPYCNN/'
file_list = os.listdir(data_path)
file_map = {}
for file_name in file_list:
    circuit = file_name.split('_')
    if circuit[0] == 'ispd19':
        file_map[circuit[0]+'_'+circuit[1]+'_'+circuit[2]] = file_name
    else:
        file_map[circuit[0]+'_'+circuit[1]] = file_name

file_types = ['violatingNodes', 'surroundingViolNodes', 'nonViolatingNodes']

for circuit in circuits:
    print('Reading '+circuit+" files")
    for file_type in file_types:
        print("File: "+file_type)
        file_name = file_map[circuit+'_'+file_type]
        names = file_name.split('_')
        # Data (X)
        print('loading ',file_name)
        data = np.loadtxt(data_path+file_name, dtype=np.int32)
        print('reshaping')
        if names[0] == 'ispd19':
            data = data.reshape(int(names[3]),23,23,6)
        else:
            data = data.reshape(int(names[2]),23,23,6)
        if names[2] == 'nonViolatingNodes' or names[1] == 'nonViolatingNodes':# Undersample
            selected_rows = np.random.choice(data.shape[0], size=int(0.1*len(data)), replace=False)
            selected_rows.sort()
            data = data[selected_rows, :]
        np.save(npy_path+circuit+'_'+file_type+'.npy', data)
        print('Done, final shape is: ',data.shape)
