In [19]:
import time
import keras
import pandas as pd
import numpy as np
import os
import cv2

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Dropout
import tensorflow as tf

from keras import backend as k

ImportError: No module named keras

In [None]:
class RoadSignClassifier:
    def __init__(self):
        pass
    
    @staticmethod
    def createCNN(width, height, depth, classes):
        model = Sequential()
        input_shape = (height, width, depth)
        model.add(Conv2D(filters=8, kernel_size=(5, 5), input_shape=input_shape, activation="relu"))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        
        model.add(Conv2D(filters=16, kernel_size=(3, 3), activation="relu"))
        model.add(BatchNormalization())
        model.add(Conv2D(filters=16, kernel_size=(3, 3), activation="relu"))
        model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=(2, 2)))
 
        model.add(Conv2D(filters=32, kernel_size=(3, 3), padding="same", activation="relu"))
        model.add(BatchNormalization())
        model.add(Conv2D(filters=32, kernel_size=(3, 3), padding="same", activation="relu"))
        model.add(BatchNormalization())
        
        model.add(Flatten())
        model.add(Dropout(0.5))
        model.add(Dense(512, activation="relu"))
        model.add(Dense(classes, activation="softmax"))
        return model

In [None]:
def load_data(dataset, csv):
    images = []
    classes = []
    rows = pd.read_csv(dataset + csv)
    rows = rows.sample(frac=1).reset_index(drop=True)

    for i, row in rows.iterrows():
        img_class = row["ClassId"]
        img_path = row["Path"]
        image = os.path.join(dataset, img_path)
        image = cv2.imread(image)
        image_rs = cv2.resize(image, (30, 30), 3)

        r, g, b = cv2.split(image_rs)

        img_r = cv2.equalizeHist(r)
        img_g = cv2.equalizeHist(g)
        img_b = cv2.equalizeHist(b)

        new_image = cv2.merge((img_r, img_g, img_b))

        if i % 500 == 0:
            print(f"loaded: {i}")
        images.append(new_image)
        classes.append(img_class)

    x = np.array(images)
    y = np.array(classes)
    return x, y

In [None]:
train_data = r"/home/sspc/Desktop/gtsrb-german-traffic-sign"
test_data = r"/home/sspc/Desktop/gtsrb-german-traffic-sign"
(train_X, train_Y) = load_data(train_data, "/Train.csv")
(test_X, test_Y) = load_data(test_data, "/Test.csv")

In [None]:
print("UPDATE: Normalizing data")
x_train = train_X.astype("float64") / 255.0
x_test = test_X.astype("float64") / 255.0
print("UPDATE: One-Hot Encoding data")
num_labels = len(np.unique(train_Y))
y_train = to_categorical(train_Y)
y_test = to_categorical(test_Y)

y_test_one_hot = np.argmax(y_test,1)
y_train_one_hot = np.argmax(y_train,1)

class_totals = y_train.sum(axis=0)
class_weight = class_totals.max() / class_totals

In [None]:
learning_rate = 0.001
epochs = 100
batch_size = 64
optimizer = Adam(lr=learning_rate, decay=learning_rate / epochs)
model = RoadSignClassifier.createCNN(width=30, height=30, depth=3, classes=43)


In [None]:
model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"])
model_fit = model.fit(
    x=x_train,
    y=y_train,
    batch_size=batch_size, 
    epochs=epochs,
    validation_split=0.2,
    class_weight=class_weight,
    verbose=1)

In [None]:
def submodel(self,number_layer,flatten=True):
    if flatten:
        out = Flatten()(self.model.layers[number_layer].output)
    else:
        out = self.model.layers[number_layer].output
    print('name: ',self.model.layers[number_layer].name)
    new_model = keras.Model(inputs=[self.model.input],
                      output=out)
    # out = keras.layers.MaxPool2D((3, 3))(base_model.output)
    # out = Dense(fc1_size, activation='sigmoid')(out)
    return new_model

In [None]:
def calculate_SI(featuremap, label, mode='dontCare'):
    tf.reset_default_graph()
    # featuremap = tf.convert_to_tensor(featuremap)
    def cond(count_true, i ,size_loop):
        return tf.less(i, size_loop)

    def body(count_true, i, size_loop):
        norm = tf.subtract(square, 2 * tf.tensordot(array, array[i, :], axes=1))

        delta = tf.get_variable("delta", [number], dtype=tf.float32, initializer=tf.constant_initializer(0))
        delta = tf.scatter_update(delta, i - 1, 0)
        delta = tf.scatter_update(delta, i, np.inf)
        norm = tf.math.add(norm, delta)

        min_index_norm = tf.argmin(norm)
        equal = tf.equal(label[min_index_norm], label[i])
        count_true = tf.cond(equal, lambda: tf.add(count_true, 1), lambda: count_true)

        return count_true,tf.add(i, 1), size_loop

    with tf.Session() as sess:
        [number, size] = featuremap.shape
        array_plhdr = tf.placeholder(dtype=tf.float32, shape=[number, size])
        array = tf.get_variable('array', [number, size])
        label = tf.convert_to_tensor(label)

        square = tf.math.reduce_sum(tf.math.square(array), axis=1)

        size_loop = tf.constant(number)
        i = tf.constant(0)
        count_true = tf.constant(0)
        count_true, i, _= tf.while_loop(cond, body, [count_true,i, size_loop])  # ,parallel_iterations=100

        sess.run(tf.initialize_all_variables())
        sess.run(array.assign(array_plhdr), {array_plhdr: featuremap})
        count,_ = sess.run([count_true,i])

        print(count)
        return count,number

In [None]:
print("predict(train)")
predicted_x = model.predict(x_train, normalize=False)
residuals = np.argmax(predicted_x, 1) != np.argmax(y_train, 1)
loss = sum(residuals)/len(residuals)
print("loss (train): ",loss)
#
equal = np.argmax(predicted_x, 1) == np.argmax(y_train, 1)
acc = sum(equal) / len(equal)
print('accuracy (train): ', acc)

print("predict(test)")
predicted_x = model.predict(x_test, normalize=False)
residuals = np.argmax(predicted_x, 1) != np.argmax(y_test, 1)
loss = sum(residuals)/len(residuals)
print("loss (test): ",loss)
#
equal = np.argmax(predicted_x, 1) == np.argmax(y_test, 1)
acc = sum(equal) / len(equal)
print('accuracy (test): ', acc)

output_file_train = open('train.txt', 'a+', 1)
output_file_test = open('test.txt', 'a+', 1)
model.summary()
for i in range(40, 100):
    # tf.reset_default_graph()
    print("_____________________________________________________")
    model = RoadSignClassifier.createCNN(width=30, height=30, depth=3, classes=43)
    if i >= 53:
        sub_model = model.submodel(i, False)
    else:
        sub_model = model.submodel(i)
    #
    x_train_out = sub_model.predict(x_train)
    x_test_out = sub_model.predict(x_test)
    print('predict: ', i, x_train_out.shape, x_test_out.shape)

    print('calculate SI test')
    # prev = time.time()
    result2, number2 = calculate_SI(x_test_out, y_test_one_hot)
    output_file_test.write("%i %f %i %f\n" % (i, result2, number2, float(result2 / number2)))
    # new = time.time()
    print(i, result2, number2, float(result2 / number2))
    # prev = new
    # K.clear_session() #after call calculate_SI must clear session
    prev = 0
    print('calculate SI train')
    result1, number1 = calculate_SI(x_train_out, y_train_one_hot)
    output_file_train.write("%i %f %i %f\n" % (i, result1, number1, float(result1 / number1)))
    print(i, result1, number1, float(result1 / number1), time.time() - prev)
    x_train_out = None
    x_test_out = None
    sub_model = None
    k.clear_session()  # after call calculate_SI must clear session
