In [None]:
# import libraries
import os
#os.environ["CUDA_VISIBLE_DEVICES"]="-1"
import tensorflow as tf # for modell training
import keras_tuner as kt
import matplotlib.pyplot as plt # to show graphical results
from matplotlib.ticker import MaxNLocator
from datetime import datetime
import cv2
import math
from datetime import datetime

In [None]:
# own modules
import modules.config as config
from modules.ai_dataset import ai_dataset
from modules.export_trained_model import export_trained_model
from modules.conf_matrix import conf_matrix
from modules.roc import roc

In [None]:
# import data
raw_data = ai_dataset(config.TRAINING_DATA, config.IMG_SIZE, True)

In [None]:
# split the raw data into training and test data
split_index = math.ceil(len(raw_data.get_tf_images()) * (1-config.PERCENT_TEST))

training_images = raw_data.get_tf_images()[0:split_index]
training_labels = raw_data.get_tf_labels()[0:split_index]

test_images = raw_data.get_tf_images()[split_index:]
test_labels = raw_data.get_tf_labels()[split_index:]

print("tensor shape:", raw_data.get_tf_images().get_shape())
print("training images:", len(training_images))
print("validation images:", len(test_images))

In [None]:
# show an image from the training data
plt.figure()
plt.imshow(cv2.cvtColor(raw_data.get_image(0), cv2.COLOR_BGR2RGB))
plt.colorbar()
plt.grid(False)
plt.show()

In [None]:
# verify preperation
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(cv2.cvtColor(raw_data.get_image(i), cv2.COLOR_BGR2RGB))
    plt.xlabel(raw_data.get_label(i), color="blue")
plt.show()

In [None]:
# debug - show available hardware for tf.keras
tf.config.list_physical_devices()

In [None]:
# list of optimizers
optimizers = ["adadelta", "adagrad", "adam", "adamax", "ftrl", "nadam", "rmsprop", "sgd"]

In [None]:
# Create the convolutional base
# https://www.mydatahack.com/building-alexnet-with-keras/
def build_model(hp):

    optim = "adadelta"
    #optim = "adagrad"
    #optim = "adam"
    #optim = "rmsprop"

    if optim == "adadelta":
        opt = tf.keras.optimizers.Adadelta(hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4]))
    elif optim == "adagrad":
        opt = tf.keras.optimizers.Adagrad(hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4]))
    elif optim == "adam":
        opt = tf.keras.optimizers.Adam(hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4]))
    elif optim == "rmsprop":
        opt = tf.keras.optimizers.RMSprop(hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4]))
    else:
        return 0

    model = tf.keras.models.Sequential(name=f"bo-{optim}")

    model.add(tf.keras.layers.Conv2D(filters=96, kernel_size=(11, 11), strides=4, padding="valid", activation="relu", input_shape=(config.IMG_SIZE, config.IMG_SIZE, 3)))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=2, padding="valid"))
    model.add(tf.keras.layers.Conv2D(filters=256, kernel_size=(5, 5), strides=1, padding="same", activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=2, padding="valid"))
    model.add(tf.keras.layers.Conv2D(filters=384, kernel_size=(3, 3), strides=1, padding="same", activation="relu"))
    model.add(tf.keras.layers.Conv2D(filters=384, kernel_size=(3, 3), strides=1, padding="same", activation="relu"))
    model.add(tf.keras.layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding="same", activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=2, padding="valid"))

    model.add(tf.keras.layers.Flatten())

    model.add(tf.keras.layers.Dense(units=hp.Int("Dense_01", min_value=2048, max_value=8192, step=1024), activation="relu", bias_initializer="random_normal"))
    model.add(tf.keras.layers.Dropout(rate=hp.Float("Dropout_01", min_value=0.0, max_value=0.8, step=0.05)))
    model.add(tf.keras.layers.Dense(units=hp.Int("Dense_02", min_value=2048, max_value=8192, step=1024), activation="relu", bias_initializer="random_normal"))
    model.add(tf.keras.layers.Dropout(rate=hp.Float("Dropout_02", min_value=0.0, max_value=0.8, step=0.05)))
    model.add(tf.keras.layers.Dense(units=2, activation="softmax", bias_initializer="random_normal"))

    # Compile the model
    model.compile(
        optimizer=opt,
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    return model

In [None]:
tuner = kt.BayesianOptimization(
    build_model,
    objective="val_accuracy",
    max_trials=100, # number of different configurations
    executions_per_trial=3, # number of trainings per configuration
    overwrite=True, # ignore previous results
    directory=config.MODEL_PATH,
    project_name=f"bo_{datetime.now().strftime('%y%m%d%H%M')}"
)

tuner.search_space_summary()

In [None]:
exec_times = [datetime.now()]
print("Start:", exec_times[0])

In [None]:
# start the search for the best hyperparameters
tuner.search(training_images, training_labels, epochs=config.TRAINING_EPOCHS, 
            validation_data=(test_images, test_labels))

In [None]:
tuner.results_summary(1)

In [None]:
# get best model
model = tuner.get_best_models(num_models=1)[0]

In [None]:
# train the model
history = model.fit(training_images, training_labels, epochs=config.TRAINING_EPOCHS, 
                    validation_data=(test_images, test_labels))

In [None]:
# evaluate model
test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)
print("validated accuracy:", f"{test_acc:.4f}")
print("validated loss:", f"{test_loss:.4f}")

In [None]:
exec_times.append(datetime.now())
print("Stop:", exec_times[1])
print("Elapsed9:", exec_times[1] - exec_times[0])

In [None]:
# Plot learning curve
fig_hist = plt.figure()
ax1 = fig_hist.add_subplot()
ax2 = ax1.twinx()
ax1.set_title(f"model: {model.name}")
ax1.plot(history.history["accuracy"], label = "accuracy", color = "k", linestyle="-")
ax1.plot(history.history["val_accuracy"], label = "val_accuracy", color = "k", linestyle="--")
ax2.plot(history.history["loss"], label = "loss", color = "r", linestyle="-")
ax2.plot(history.history["val_loss"], label = "val_loss", color = "r", linestyle="--")
ax1.set_xlabel("epoch",)
ax1.xaxis.set_major_locator(MaxNLocator(integer=True))
ax1.set_ylabel("accuracy",)
ax2.set_ylabel("loss",)
fig_hist.legend(bbox_to_anchor=(1.22,0.8), loc="center right", ncol=1)

In [None]:
test_labels_for_dataframe = []

for i in test_labels:
    if int(i) == 0:
        # 'Nicht-Mikrometeorit' , 'Mikrometeorit'
        test_labels_for_dataframe.append([1, 0])
    else:
        # 'Nicht-Mikrometeorit' , 'Mikrometeorit'
        test_labels_for_dataframe.append([0, 1])


In [None]:
# create confusion matrix
cm, ekm = conf_matrix(model, test_images, test_labels_for_dataframe)
print("TPR: ", ekm["TPR"], " FPR: ", ekm["FPR"], " Precision: ", ekm["Precision"])

In [None]:
# roc
roc_plt = roc(model, test_images, test_labels_for_dataframe)

In [None]:
export_trained_model(model, config.TRAINING_EPOCHS, test_loss, test_acc, history, fig_hist, cm, ekm, roc_plt, exec_times, raw_data, config.MODEL_PATH)

In [None]:
if False:
    time.sleep(60)
    os.system('shutdown -s')