## Imports  & Configs

In [None]:
%matplotlib inline 
from matplotlib import rcParams
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import cv2
import os
import imutils
import seaborn as sns
import platform
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelBinarizer

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

import keras
from keras import backend as K, optimizers
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras.callbacks import ModelCheckpoint, TensorBoard
from keras_tqdm import TQDMNotebookCallback

rcParams['font.family'] = 'serif'
pd.options.mode.chained_assignment = None

np.random.seed(44)
train_step_size = 0
valid_step_size = 0
test_step_size = 0
y_oneHot = []
y_normalized = []

In [None]:
keras.backend.tensorflow_backend._get_available_gpus()

In [None]:
class DataConfig(object):
    if platform.system()[2] == 'xenial':
        print("[INFO] Running at home")
        ROOT = "/home/Develope/dataset/"
    elif platform.system() == 'Linux':
        print("[INFO] Running on Linux")
        ROOT = "/home/Develope/dataset/"  # in Linux
    else:
        print("[INFO] Running on Mac")
        ROOT = "/Users/../Develope/dataset/"  # in Mac

    DATA_PATH = ROOT + "experiments/"
    SET_DIR = ROOT + "sets/CNN/"
    SAVE_DIR = ROOT + "weights/CNN/"
    DATA_SET = ROOT + "@List/data_DL.csv"  # dataset_noSmooth  #Dataset.csv
    IMG_EXTENTION = ".tiff"
    IMG_SAVE_DIR = ROOT + "charts/"

    TRAIN_TEST_RATIO = 0.80
    TRAIN_VAL_RATIO = 0.80

    GEN_SETS = False

    # ORG IMG=272
    IMG_RESHAPE = True
    IMG_HEIGHT = 300
    IMG_WIDTH = IMG_HEIGHT
    IMG_CHANNEL_NUM = 1

    EPOCH_NUM = 30
    BATCH_SIZE = 25  # 30
    WORKERS_NUM = 2  # 2


config = DataConfig()

## Load / Generate sets


In [None]:
def load_dataset(path, name=""):
    print("[INFO] Loading " + name + ".")
    df_list = pd.read_csv(path, converters={'img_path': lambda x: str(x)})
    df_list.index.name = "index"
    print("[INFO] Done. ")
    return df_list


dataset_org = load_dataset(config.DATA_SET, name="data_DL")
data = dataset_org.copy()

In [None]:
labels = np.unique(data[['Label']])
label_transformer = LabelBinarizer()
label_transformer.classes_ = labels
label_transformer.y_type_ = "multiclass"
label_transformer.sparse_input_ = False

In [None]:
def generate_train_val_test(dataFrame):
    shuffled_indices = np.random.permutation(len(dataFrame))

    train_size = int(len(dataFrame) * config.TRAIN_TEST_RATIO)
    valid_size = int(train_size * config.TRAIN_VAL_RATIO)

    train_indices = shuffled_indices[:train_size]
    test_indices = shuffled_indices[train_size:]

    val_indices = train_indices[valid_size:]
    train_indices = train_indices[:valid_size]

    train_data = dataFrame.iloc[train_indices]
    val_data = dataFrame.iloc[val_indices]
    test_data = dataFrame.iloc[test_indices]

    global train_step_size, valid_step_size, test_step_size

    train_step_size = len(train_data)
    train_step_size = train_step_size - train_step_size % config.BATCH_SIZE
    print("Training Steps: ", train_step_size)

    valid_step_size = len(val_data)
    valid_step_size = valid_step_size - valid_step_size % config.BATCH_SIZE
    print("Validation Steps: ", valid_step_size)

    test_step_size = len(test_data)
    test_step_size = test_step_size - test_step_size % config.BATCH_SIZE
    print("Test Steps: ", test_step_size)

    train_data.to_csv(config.SET_DIR + "train_Direction.csv", index=False)
    print("\nTrain set:", config.SET_DIR + "train_Direction.csv")

    val_data.to_csv(config.SET_DIR + "val_Direction.csv", index=False)
    print("Valid set:", config.SET_DIR + "val_Direction.csv")

    test_data.to_csv(config.SET_DIR + "test_Direction.csv", index=False)
    print("Test  set:", config.SET_DIR + "test_Direction.csv")

    np.savetxt(config.SET_DIR + 'sets_Direction.txt',
               (train_step_size, valid_step_size, test_step_size),
               fmt='%i')

In [None]:
if config.GEN_SETS:
    generate_train_val_test(data)

else:
    print("[INFO] Loading sets.........\n")
    sets = np.loadtxt(config.SET_DIR + 'sets_Direction.txt')
    train_step_size, valid_step_size, test_step_size = int(sets[0]), int(sets[1]), int(sets[2])

    print("[INFO] Training Steps:" + "{:12d}".format(train_step_size))
    print("[INFO] Validation Steps:" + "{:10d}".format(valid_step_size))
    print("[INFO] Test Steps:" + "{:16d}".format(test_step_size))

path_train = config.SET_DIR + "train_Direction.csv"
path_val = config.SET_DIR + "val_Direction.csv"
path_test = config.SET_DIR + "test_Direction.csv"


## Batch Generator

In [None]:
def grayscale(img):
    return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)


def normalize(img, min_range=0.0, max_range=1.0):
    return min_range + (img * (max_range - min_range)) / 255.0

In [None]:
class DataGenerator(keras.utils.Sequence):
    def __init__(self, path, config, lbl_transformer=label_transformer, shuffle=True, display=False,
                 return_original=False):

        # initialization'
        self.df_train = None
        self.num_iter = 0
        self.indexes = 0
        self.dim = (config.IMG_HEIGHT, config.IMG_WIDTH)
        self.shuffle = shuffle
        self.display = display
        self.return_y = return_original
        self.lbl_transformer = lbl_transformer
        self.read_dataFrame(path)
        self.on_epoch_end()

    def read_dataFrame(self, path):
        self.df_train = pd.read_csv(path, header=0, converters={'img_path': lambda x: str(x)})

        self.df_train = self.df_train.sample(frac=1).reset_index(drop=True)
        self.num_iter = len(self.df_train) // config.BATCH_SIZE

    def __len__(self):

        step = len(self.df_train) // config.BATCH_SIZE
        return step

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.df_train))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __getitem__(self, index):
        indexes = self.indexes[index * config.BATCH_SIZE:(index + 1) * config.BATCH_SIZE]

        # find rows
        list_rows = [self.df_train.iloc[k] for k in indexes]
        X, y = self.__data_generation(list_rows)

        if self.return_y:
            global y_oneHot
            y_oneHot.append(y)

        return X, y

    def __data_generation(self, list_rows):

        X = np.empty((config.BATCH_SIZE, config.IMG_HEIGHT, config.IMG_WIDTH, config.IMG_CHANNEL_NUM))
        y = np.empty((config.BATCH_SIZE, 1), np.float32)

        for i, row in enumerate(list_rows):

            img_path = row[0]
            if platform.system() == 'Linux':
                img_path = config.ROOT + img_path[41:]
            img = cv2.imread(img_path)
            if config.IMG_RESHAPE:
                img = imutils.resize(img, width=config.IMG_WIDTH)
            img = grayscale(img)
            img = normalize(img)

            img = img.reshape(config.IMG_HEIGHT, config.IMG_WIDTH, config.IMG_CHANNEL_NUM)

            X[i,] = img
            y[i] = (row[1]).astype(np.float16).reshape([-1])

            if self.display:
                print("Label: ", (row[1]).astype(np.float16).reshape([-1]))

            if self.return_y:
                global y_normalized
                y_normalized.append((row[2]).astype(np.float16).reshape([-1]))

        y = self.lbl_transformer.transform(np.array(y))
        # print(y)

        return X, y

In [None]:
# y_normalized=[]
# y_oneHot=[]
# test = DataGenerator(path_train,config,display=False,return_original=True,shuffle=True)
# x = test[1]
# x[1][2]
# y_oneHot

## CNN Model

In [None]:
class Model(object):
    def conv_block(self, model_name):
        row = config.IMG_HEIGHT
        col = config.IMG_WIDTH
        ch = config.IMG_CHANNEL_NUM

        model = Sequential(name=model_name)
        K.set_image_data_format('channels_last')
        model.add(Conv2D(36,  # 32,
                         kernel_size=(5, 5),
                         strides=(2, 2),
                         padding="same",
                         activation='relu',
                         name="conv2d_1",
                         input_shape=(row, col, ch)))

        model.add(Conv2D(36,  # 32,
                         kernel_size=(5, 5),
                         strides=(2, 2),
                         padding="same",
                         activation='relu',
                         name="conv2d_2"
                         ))

        model.add(Conv2D(48,  # 32,
                         kernel_size=(5, 5),
                         strides=(2, 2),
                         padding="same",
                         activation='relu',
                         name="conv2d_3"
                         ))

        model.add(Conv2D(64,  # 32,
                         kernel_size=(3, 3),
                         strides=(2, 2),
                         padding="same",
                         activation='relu',
                         name="conv2d_4"
                         ))

        model.add(Conv2D(64,  # 32,
                         kernel_size=(3, 3),
                         strides=(2, 2),
                         padding="same",
                         activation='relu',
                         name="conv2d_5"
                         ))

        model.add(Flatten(name="flatten_1"))
        return model

    def CNN_direction(self):
        model = self.conv_block(model_name="CNN_Pred")
        model.add(Dropout(0.5))
        model.add(Activation('relu'))
        model.add(Dense(100))
        model.add(Activation('relu'))
        model.add(Dense(50))
        model.add(Activation('relu'))
        model.add(Dense(20))
        model.add(Activation('relu'))
        model.add(Dense(25, activation='softmax'))
        model.compile(loss=keras.losses.categorical_crossentropy,
                      optimizer=keras.optimizers.Adadelta(),
                      metrics=['accuracy'])

        print('CNN is created and compiled..')
        print(model.summary())
        return model

    def CNN_reg(self):
        model = self.conv_block(model_name="CNN_reg")
        model.add(Dropout(0.5))
        model.add(Activation('relu'))
        model.add(Dense(100))
        model.add(Activation('relu'))
        model.add(Dense(50))
        model.add(Activation('relu'))
        model.add(Dense(20))
        model.add(Activation('relu'))
        model.add(Dense(1))

        adam_custom = optimizers.Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=1e-8)
        model.compile(optimizer=adam_custom, loss="mse")
        print('CNN_reg is created and compiled..')
        print(model.summary())
        return model


In [None]:
model = Model().CNN_direction()
model_save_path = config.SAVE_DIR + str(model.name) + "-{epoch:02d}-{val_accurasy:.2f}.hdf5"

In [None]:
# checkpoint
checkpoint = ModelCheckpoint(model_save_path, monitor='val_acc', verbose=0, save_best_only=True)

# TensorBoard
vis = TensorBoard(log_dir=config.ROOT + "log/CNN/",
                  batch_size=config.BATCH_SIZE,
                  write_graph=False,
                  write_images=False)

callback_list = [TQDMNotebookCallback(leave_inner=False, leave_outer=True), checkpoint, vis]

## Train / Validation / Test

In [None]:
train_generator = DataGenerator(path_train, config)
valid_generator = DataGenerator(path_val, config)
test_generator = DataGenerator(path_test, config, return_original=True)

In [None]:
# model.fit_generator(generator=train_generator,
#                     validation_data=valid_generator,
#                     verbose=0,
#                     callbacks=callback_list,
#                     epochs=config.EPOCH_NUM,
#                     use_multiprocessing=True,
#                     workers=config.WORKERS_NUM
#                     )

In [None]:
y_oneHot = []
y_normalized = []

model.load_weights(config.SAVE_DIR + "trained_model.hdf5")
predict = model.predict_generator(generator=test_generator,
                                  verbose=1)


## Visualization: Conf. Matrix

In [None]:
y_oneHot_arr = np.asarray(y_oneHot)
y_oneHot_arr = y_oneHot_arr.reshape(y_oneHot_arr.shape[0] * y_oneHot_arr.shape[1], y_oneHot_arr.shape[2])
y_test = label_transformer.inverse_transform(predict)
y_hat = label_transformer.inverse_transform(y_oneHot_arr)
y_hat = y_hat[:predict.shape[0]]
y_normalized = y_normalized[:19575]
y_normalized_arr = np.asarray(y_normalized)

In [None]:
labels = np.roll(labels, 1)
matrix = confusion_matrix(y_test, y_hat)
matrix = matrix / matrix.astype(np.float).sum(axis=1)
df_cm = pd.DataFrame(matrix,
                     index=[i for i in labels],
                     columns=[i for i in labels]).round(2)

axis_labels = np.array(['center', 0, 15, 30, 45, 60, 75, 90,
                        105, 120, 135, 150, 165, 180, 195, 210, 225,
                        240, 255, 270, 285, 300, 315, 330, 345], dtype=str)

In [None]:
fig_size = 5.05
font_size = 8
plt.figure(figsize=(fig_size, fig_size))
sns.set(font_scale=1)

plot = sns.heatmap(df_cm,
                   annot=True,
                   square=0,
                   xticklabels=axis_labels,
                   yticklabels=axis_labels,
                   annot_kws={"size": font_size, 'rotation': 0},
                   fmt='.0%',  # add %
                   linecolor='black',
                   linewidths=0.5,
                   cbar=0,
                   cmap='gist_yarg'  # "YlGn" #viridis
                   )  # font size

for txt in plot.texts:
    if txt.get_text() == '0%':
        txt.set_text('')
    else:
        txt.set_text(txt.get_text().replace('%', ''))

    if txt.get_text() == '100':
        txt.set_size(7)

plot.xaxis.set_ticks_position('top')
plot.yaxis.set_ticks_position('left')
plt.xticks(size=font_size, rotation=90)
plt.yticks(size=font_size, rotation=0)

axes = plot.axes
axes.set_ylim(len(df_cm) + 0.01, -0.01)
plt.tight_layout()
# plt.savefig(config.IMG_SAVE_DIR+"razmy5",dpi=350, bbox_inches='tight')



## Visualization: Location of the mis-classifed samples 

In [None]:
df_results = pd.DataFrame()
df_results['y_normalized'] = y_normalized_arr.reshape(19575)
df_results['y'] = y_test
df_results['y_hat'] = y_hat

df_results = df_results[['y_hat', 'y']].assign(is_correct = df_results.y_hat.astype(str) == df_results.y.astype(str))
df_results['y_normalized'] = y_normalized_arr.reshape(19575)

df_error = df_results.loc[df_results['is_correct'] == False]
df_error = df_error.sort_values(by=['y'])
df_error = df_error.loc[df_error['y'] >= 0]

df_correct = df_results.loc[df_results['is_correct'] == True]
df_correct = df_correct.sort_values(by=['y'])
df_correct = df_correct.loc[df_correct['y'] >= 0]

In [None]:
sns.set_style('white', {'axes.linewidth': 0.5})
plt.rcParams['xtick.major.width'] = 1
plt.rcParams['xtick.bottom'] = True
plt.rcParams['ytick.left'] = True
factor=20

font_size=18
plt.figure(figsize = (10,5))

x_values = df_error['y_normalized']
x_values = x_values-np.mean(x_values)
x_values*=factor/2

sns.distplot(x_values  , color='r',   hist=0,
                
            kde_kws={"color": "r", "lw": 3.5, 'ls':'--'},
             label="miss-classified\n"+
             "\u03BC:" + str(round(np.mean(x_values),3))+ 
             "   \u03B4:"+ str(round(np.std(x_values),2)))

y = df_correct['y_normalized']

y_scaled = y-np.mean(y)
y_scaled*=factor
ax=sns.distplot(y_scaled  , color='k', hist=0, bins=100,
                kde_kws={"color": "k", "lw": 3.5},
            label="\ncorrectly-classified\n"+
             "\u03BC:" + str(round(np.mean(y_scaled),3))+ 
             "   \u03B4:"+ str(round(np.std(y_scaled),2))
            )

ax.xaxis.grid(color='gray', linestyle='--', linewidth=0.05)
ax.yaxis.grid(color='gray', linestyle='--', linewidth=0.05)


plt.legend(fontsize=font_size)
plt.xlabel('Tip deflection from the resting position (deg)', size=font_size+2) 
plt.ylabel('PDF', size=font_size)
plt.axvline(0, color='b',linestyle='-')
plt.xticks(size=font_size)
plt.yticks(size=font_size)

plt.setp(ax.get_legend().get_texts(), fontsize=font_size) # for legend text
plt.savefig(config.IMG_SAVE_DIR+"deviation",dpi=350, bbox_inches='tight');

## Visualization of the layers

In [None]:
sample = test_generator[0]
img = sample[0][2].reshape(300,300)
plt.imshow(img, cmap='hot')

In [None]:
def display_activation(activations, col_size, row_size, act_index):
    activation = activations[act_index]
    print("layer: ", str(act_index + 1), activation.shape)
    activation_index = 0

    fig, ax = plt.subplots(row_size, col_size, figsize=(row_size * 3, col_size * 3))

    for row in range(0, row_size):
        for col in range(0, col_size):
            ax[row][col].imshow(activation[0, :, :, activation_index], cmap='hot')
            activation_index += 1

In [None]:
layer_outputs = [layer.output for layer in model.layers]
activation_model = Model(inputs=model.input, outputs=layer_outputs)
activations = activation_model.predict(img.reshape(1, 300, 300, 1))

In [None]:
display_activation(activations, 6, 6, 0)