# Case Study 2022 - Group 3

In [None]:
# For use in colab only
'''
from google.colab import drive
drive.mount('/content/drive')
%cd drive/MyDrive/
%cd ./da2-project2
'''

In [None]:
from importlib import reload 
import augmentData
import loadAndStoreData
import processData
import drawImages
import loadAdditionalData
reload(augmentData)
reload(loadAndStoreData)
reload(processData)
reload(drawImages)
reload(loadAdditionalData)

## Data Augmentation

In this section, the training data is augmented. The function allows to choose the classes for which the augmentations should be done.
It also allows to define the augmentation techniques that are used. 

For each augmentation technique a new subfolder is created. Each subfolder contains the augmented images of the classes chosen.
Depending on the augmentation techniques chosen, this process may neeed a minute or two.

In [None]:
# load additional data
# Loads the additional training data from the specified directories
# Each tupel specifies the path to the additional training images and the class that the images belong to
# Result data are saved udnder
loadAdditionalData.loadAdditionalData(path="additional_data", directories_and_labels=[("solar_alt", "solar")])

In [None]:
augmentData.performDataAugmentation(
    directory="training_patches/", 
    categories=["pond", "pool","solar","trampoline"], 
    augmentations=["rotate_images"]
)

In [None]:
augmentData.performDataAugmentation(
    directory="additional_data/", 
    categories=["pond", "pool","solar","trampoline"], 
    augmentations=["rotate_images", "zoom_images", "change_brightness"]
)

## Data Loading

This section loads the training patches into a numpy array and creates the corresponding label vector.
The result are X_train, X_val, y_train and y_val. 

The images are converted to RGB values, which is why there are 3 channels in the training data.

The training data sets are of dimension (number_of_instances x height x width x 3 channels). 
The label vectors only have one dimension (number_of_instances).

In [None]:
training_data, labels = loadAndStoreData.loadTrainingDataAndLabels(
    folders=[
        "training_patches/", 
        "training_patches_rotation",
        "additional_data" 
    ], 
    subdirectories=["background", "pond", "pool", "solar", "trampoline"])

In [None]:
# Old version (classiscal convnet)
'''
from sklearn.model_selection import train_test_split

labels_categorical = processData.labels_to_categorical(labels)
X_train, X_val, y_train, y_val = train_test_split(training_data, labels_categorical, test_size=0.33, random_state=1, stratify=labels)
print(X_train.shape)
print(y_train.shape)
'''



In [None]:

# Inception v3 version
from keras.applications.inception_v3 import preprocess_input
from sklearn.model_selection import train_test_split

# Preprocessing only needed for inception net
# training_data =  preprocess_input(training_data)

labels_categorical = processData.labels_to_categorical(labels)
X_train, X_val, y_train, y_val = train_test_split(training_data, labels_categorical, test_size=0.33, random_state=1, stratify=labels)


## Model Training 

In [None]:
y_train_encoded = processData.encodeLabels(y_train)


In [None]:
# Old verision (classical convnet)
'''
from tensorflow.keras.layers import InputLayer, Dense, Flatten, Conv2D, MaxPool2D
from tensorflow import keras

model = keras.models.Sequential()
model.add(InputLayer(input_shape=(256,256,3)))
model.add(Conv2D(filters=10, kernel_size=(3,3), strides=1, padding="same", activation="relu"))
model.add(MaxPool2D(pool_size=(2,2)))
model.add(Flatten())
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(256, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(5, activation='softmax'))

model.summary()
'''

In [None]:
# Inception v3 for feature extraction
from tensorflow import keras
base_model = keras.applications.InceptionV3(weights="imagenet", include_top=False, pooling="avg", input_shape=(256, 256, 3))

model=keras.models.Sequential()
model.add(base_model)
model.add(keras.layers.Flatten())
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(256, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(128, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(64, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dense(5, activation='softmax'))

for layer in base_model.layers:
    layer.trainable = False

model.summary()

In [None]:
# Train on X_train
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])


history = model.fit(X_train, 
                    y_train_encoded, 
                    epochs=25,
                    batch_size=64,
                    validation_split=0.1,
                   )

In [None]:
'''
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])


history = model.fit(training_data, 
                    labels_encoded, 
                    epochs=20,
                    batch_size=64,
                    validation_split=0.1,
                   )
'''

In [None]:
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score
import numpy as np

preds = model.predict(X_val)
preds_argmaxed = np.apply_along_axis(np.argmax, 1, preds)
f1_score(y_val,preds_argmaxed, average='macro'), accuracy_score(y_val, preds_argmaxed) 

In [None]:
from sklearn.metrics import confusion_matrix

confusion_matrix(y_val, preds_argmaxed)

## Create predictions

In [None]:
processData.makePredictions("validation_images", convnet=model, stepSize=64, windowSize=(256,256))

In [None]:
import gc
preprocessed_patches = None
del preprocessed_patches
patch_coordinates = None
del patch_coordinates
X_train = None
del X_train
X_val = None 
del X_val
y_train = None
del y_train
y_val = None
training_data = None
del training_data
X_train_preprocessed = None
del X_train_preprocessed
predictions_array = None
del predictions_array
gc.collect()

In [None]:
processData.nonMaxSuppressBoundingBoxes("validation_images/", iou_threshold=0.0, score_threshold=0.9)

## Draw Images with predictions

In [None]:
drawImages.saveOrPrintImages(path="./validation_images", print_to_output=False, valBoundingBoxes=True,saveImagesPath="./validation_images", thickness=5)

## Validating the results

In [None]:
import csv
import json
import glob
import cv2
import random
import os
from shapely.geometry import Polygon

def calc_performance(gt_path, pred_path, image_name=None, verbose=0):
    ground_truth = []
    predictions = []

    # Create default performance values
    performances = {
        'file': image_name,
        'tp': 0,
        'fn': 0,
        'fp': 0,
        'f1': 0,
    }

    # Load ground truth
    with open(gt_path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            row = {k: int(row[k]) if k != 'label' else row[k]
                   for k in row.keys()}
            ground_truth.append(row)

    # load predictions if path exists
    if os.path.exists(pred_path):
        with open(pred_path) as f:
            reader = csv.DictReader(f)
            for row in reader:
                row = {k: int(row[k]) if k != 'label' else row[k]
                       for k in row.keys()}
                predictions.append(row)

    # Number of false positives equals number of left predictions
    performances['fp'] = max(len(predictions) - len(ground_truth), 0)

    for j, gt in enumerate(ground_truth):
        gt_box = Polygon([(gt['y_upper_left'],  gt['x_upper_left']),
                          (gt['y_upper_left'],  gt['x_lower_right']),
                          (gt['y_lower_right'], gt['x_lower_right']),
                          (gt['y_lower_right'], gt['x_upper_left'])])

        if gt_box.area != (256. * 256.):
            print(
                f'### Warning {j}: false ground truth shape of {gt_box.area} detected in {image_name}!')
            print(gt['y_lower_right'] - gt['y_upper_left'],
                  gt['x_lower_right'] - gt['x_upper_left'])

        best_found_iou = (None, 0.)  # (idx, IoU)
        for i, pred in enumerate(predictions):
            if gt['label'] == pred['label']:
                pred_box = Polygon([(pred['y_upper_left'],  pred['x_upper_left']),
                                    (pred['y_upper_left'],
                                     pred['x_lower_right']),
                                    (pred['y_lower_right'],
                                     pred['x_lower_right']),
                                    (pred['y_lower_right'], pred['x_upper_left'])])

                if pred_box.area != (256. * 256.):
                    print(
                        f'### Warning {i}: false predicted shape of {pred_box.area} detected in {image_name}!')
                    print(pred['y_lower_right'] - pred['y_upper_left'],
                          pred['x_lower_right'] - pred['x_upper_left'])

                # Calculate IoU
                next_iou = (gt_box.intersection(pred_box).area +
                            1) / (gt_box.union(pred_box).area + 1)

                # If the next found IoU is larger than the previous found IoU -> override
                if next_iou > best_found_iou[1]:
                    best_found_iou = (i, next_iou)

        # Append metric. If IoU is larger 0.5, then its a true positive, else false negative
        if best_found_iou[0] is not None and best_found_iou[1] >= 0.5:
            del predictions[best_found_iou[0]]  # Remove prediction from list!
            performances['tp'] += 1  # Increase number of True Positives
            if verbose == 1:
                print(
                    f'Found correct prediction with IoU of {round(best_found_iou[1], 3)} and label {gt["label"]}!')
        else:
            performances['fn'] += 1  # Increase number of False Negatives
            if verbose == 1:
                print(
                    f'Found false prediction with IoU of {round(best_found_iou[1], 3)} and label {gt["label"]}!')

    # Calculate F1-Score
    performances['f1'] = (performances['tp'] + 1e-8) / \
        (performances['tp'] + 0.5 *
         (performances['fp'] + performances['fn']) + 1e-8)
    return performances


if __name__ == "__main__":
    path = 'validation_images'  # Change if needed

    # Iterate over all validation images
    for image_path in glob.glob(path + '/*.png'):
        image_name = image_path.split('/')[-1]
        gt_path = image_path[:-4] + '.csv'  # Ground Truth path
        pred_path = image_path[:-4] + \
            '_prediction_suppressed.csv'  # Prediction path
        performance = calc_performance(gt_path, pred_path, image_name)
        print(performance)

## 