## Data Preparation

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os, shutil, glob, random
import numpy as np
import cv2
from PIL import Image

In [None]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [None]:
df= pd.read_csv('./data/car_imgs_4000.csv')
df.shape

In [None]:
df.head()

In [None]:
df['perspective_score_hood']= round(df.perspective_score_hood, 2)
df['perspective_score_backdoor_left']= round(df.perspective_score_backdoor_left, 2)
df.head()

In [None]:
df.shape

In [None]:
# show the class imbalance in the data
fig, (ax1, ax2) = plt.subplots(2,1, figsize=(30,10))
sns.countplot((df.perspective_score_hood.values * 100).astype(int), ax= ax1)
plt.ylabel('hood_count');

sns.countplot((df.perspective_score_backdoor_left.values * 100).astype(int), ax= ax2)
plt.ylabel('door_count');


In [None]:
#df.perspective_score_backdoor_left.value_counts(normalize=False).sort_index(ascending=False)

In [None]:
# show 5 samples of the croped images
src_dir= 'data/croped_imgs2/'
fig, axes = plt.subplots(1, 5, figsize=(15, 3))
for i in range(5):
    f= os.listdir(src_dir)[i]
    img_path= os.path.join(src_dir, f)
    #print(img_path)
    img= image.load_img(img_path)
    img= img.resize((90,90))
    img= image.img_to_array(img)/255.
    axes[i].imshow(img)
    axes[i].axis('off')
plt.show();


In [None]:
# show the classes with majority sample

#np.where(df.perspective_score_hood.value_counts(normalize=True) > 0.050)
#df[df.groupby('perspective_score_hood')['perspective_score_hood'].transform('size') > 0.50]
count= df.perspective_score_hood.value_counts(normalize=True)
df[df.perspective_score_hood.map(count >0.06)]['perspective_score_hood'].unique()

In [None]:
# mergeing classes into 5 to have enough samples per class for training
#classes ranges for modelling:
# 0 = 0.0 - 0.20
# 1 = 0.21 - 0.40
# 2 = 0.41 - 0.60
# 3 = 0.61 - 0.80
# 4 = 0.81 - 0.93

src_dir= './data/croped_imgs/'
save_dir= './data/merged_hood/'
class_0 = []
#missing= [i for i in set(df.perspective_score_hood.values) if str(i) not in os.listdir(save_dir)]

if not os.path.exists(save_dir):
    os.makedirs(save_dir, exist_ok=True)
for image in os.listdir(src_dir):
    for i in np.arange(0.91,0.92,0.01):
        i = round(i,2)
        for v, f in zip(df.perspective_score_hood.values, df.filename):
            if v == i and f in os.path.join(src_dir, image):
                #print(v)
                class_0.append([f, v])
                sub_dir= os.path.join(save_dir, str(5))
                os.makedirs(sub_dir, exist_ok= True)
                #print('*'*50)
                shutil.move(os.path.join(src_dir, f), os.path.join(sub_dir, f))
                print(f'moved from {os.path.join(src_dir, f)} to { os.path.join(sub_dir, f)}')

In [None]:
# count the images in a subfolder
count = 0
save_dir= 'data/split_hood2/test'
# Iterate directory
for path in os.listdir(save_dir):
    for file in os.listdir(os.path.join(save_dir,path)):
    # check if current path is a file
        if os.path.isfile(os.path.join(save_dir, path, file)):
            count += 1
print('File count:', count)

In [None]:
# count images in each subfolder
for dir,subdir,files in os.walk('data/merged_hood'):
    print(np.sort([dir, str(len(files))]))

In [None]:
# sepecify images in each class up to max_number and remove the rest to a folder for the sake of class balancing

# Set the path to the directory containing labeled subdirectories
base_dir = 'data/merged_hood'

# Specify the desired subdirectory labels
desired_labels = ['0.00', '0.90', '0.91']

# Set the destination directory
dest_dir = 'data/residue_hood/'
if not os.path.exists(dest_dir):
    os.makedirs(dest_dir, exist_ok=True)
# Specify the maximum allowed count
max_count = 400

# Loop through each subdirectory, count images, and move if count exceeds max_count
for subdir_label in os.listdir(base_dir):
    if subdir_label == str(4):
        subdir_path = os.path.join(base_dir, subdir_label)
        # Get a list of files in the subdirectory
        files = [file for file in os.listdir(subdir_path) if os.path.isfile(os.path.join(subdir_path, file))]
        
        # Count the number of images in the subdirectory
        count = len(files)
        
        # Print or process the count
        print(f"Number of images in {subdir_label}: {count}")
        
        # Move images if count exceeds max_count
        if count > max_count:
            # Select the first (count - max_count) images to move
            images_to_move = files[:count - max_count]
            
            # Move selected images to the destination directory
            for image in images_to_move:
                source_path = os.path.join(subdir_path, image)
                sub_dest_dir = os.path.join(dest_dir, subdir_label)
                os.makedirs(sub_dest_dir, exist_ok= True)
                shutil.move(source_path, os.path.join(sub_dest_dir, str(image)))
                print(f"Moved {image} to {dest_dir}")


In [None]:
len(os.listdir('data/merged_hood/4'))

## Crop Images

In [None]:
from ultralytics import YOLO

In [None]:
# crop images with a pretrained Yolo-v8 model
# We trained Yolo with 500 samples and validated and tested with 500 each. We ran the model for 50 epochs on TPU
# Colab – the loss in the detection box was roughly 1.6 – we could have trained the model for 100 epochs with
# more samples (e.g 1000) for better croping accuracy

custom_model = YOLO('best.pt')
base_dir= 'data/original_data/imgs/'

for image in os.listdir(base_dir):
    img_path= os.path.join(base_dir, image)
    img = cv2.imread(img_path, cv2.IMREAD_COLOR)
    img= cv2.resize(img, (160, 160), interpolation= cv2.INTER_AREA)
    results = custom_model(img)
    for n, box in enumerate(results[0].boxes.xywhn):
        h, w = img.shape[:2]
        x1, y1, x2, y2 = box.numpy()
        x_center, y_center = int(float(x1) * w), int(float(y1) * h)
        box_width, box_height = int(float(x2) * w), int(float(y2) * h)

        x_min = int(x_center - (box_width / 2))
        y_min = int(y_center - (box_height / 2))
        crop_img= img[y_min:y_min+int(box_height), x_min:x_min+int(box_width)]

        save_path= f'data/croped_imgs/'
        if not os.path.exists(save_path):
            os.makedirs(save_path, exist_ok= True)
        cv2.imwrite(save_path + f"{image}", crop_img)
        print(f'crop image {img_path} to {os.path.join(save_path,image)}')

In [None]:
len(os.listdir('data/croped_imgs2'))

## Split Data for Modelling

In [None]:
import splitfolders
import cv2
import seaborn as sns
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import vgg16, resnet50
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import warnings
warnings.filterwarnings('ignore')

In [None]:
data_hood= 'data/merged_hood/'
#data_door= 'data/balanced_door_croped/'

split_hood= 'data/split_hood2/'
#split_door= 'data/split_door/'

#splitfolders.ratio(data_hood, split_hood, seed=1337, ratio= (0.70, 0.25, 0.05))
#splitfolders.ratio(data_door, split_door, seed=1337, ratio= (0.70, 0.25, 0.05))

In [None]:
# create features and corresponding labels

def create_dataset(img_folder):
    features=[]
    labels=[]
    IMG_WIDTH=160
    IMG_HEIGHT=160
    for label in os.listdir(img_folder):
        image_path= os.path.join(img_folder,label)
        for img in os.listdir(image_path):
            image= cv2.imread(os.path.join(image_path, img), cv2.COLOR_BGR2RGB)
            image=cv2.resize(image, (IMG_HEIGHT, IMG_WIDTH))
            features.append(image)
            labels.append(label)
    features, labels = shuffle(features, labels)
    return np.array(features), np.array(labels)

In [None]:
X_train_hood, y_train_hood= create_dataset(split_hood + 'train')
#X_train_door, y_train_door= create_dataset(split_door + 'train')

In [None]:
X_val_hood, y_val_hood= create_dataset(split_hood + 'val')
#X_val_door, y_val_door= create_dataset(split_door + 'val')

In [None]:
num_classes_hood= len(np.unique(y_train_hood, return_counts=False))
#num_classes_door= len(np.unique(y_train_door, return_counts=False))
print(num_classes_hood)

In [None]:
# manual split of data (or by using splitfolders library as above)

# train_size_hood= int(len(features_hood) * 0.80)

# X_train_hood= features_hood[:train_size_hood]
# y_train_hood= labels_hood[:train_size_hood]
# X_val_hood= features_hood[train_size_hood:]
# y_val_hood= labels_hood[train_size_hood:]

# train_size_door= int(len(features_door) * 0.80)

# X_train_door= features_door[:train_size_door]
# y_train_door= labels_door[:train_size_door]
# X_val_door= features_door[train_size_door:]
# y_val_door= labels_door[train_size_door:]

In [None]:
# check the shape of features and labels
X_train_hood.shape, y_train_hood.shape, X_val_hood.shape, y_val_hood.shape

In [None]:
#X_train_door.shape, y_train_door.shape, X_val_door.shape, y_val_door.shape

In [None]:
# categorical encoding for integer labels
from tensorflow.keras.utils import to_categorical

y_train_hood= to_categorical(y_train_hood, num_classes= num_classes_hood)
y_val_hood= to_categorical(y_val_hood, num_classes= num_classes_hood)

#y_train_door= to_categorical(y_train_door, num_classes= num_classes_door)
#y_val_door= to_categorical(y_val_door, num_classes= num_classes_door)

In [None]:
# For base Modelling or implement it in the model architecture as Rescaling layer
# X_train_hood= X_train_hood/255.
# X_val_hood= X_val_hood/255.

# X_train_door= X_train_door/255.
# X_val_door= X_val_door/255.

In [None]:
# For Transfer Learning Modelling
X_train_hood= resnet50.preprocess_input(X_train_hood).astype(float)
X_val_hood= resnet50.preprocess_input(X_val_hood).astype(float)

#X_train_door= vgg16.preprocess_input(X_train_door).astype(float)
#X_val_door= vgg16.preprocess_input(X_val_door).astype(float)

y_train_hood= y_train_hood.astype(float)
y_val_hood= y_val_hood.astype(float)

#y_train_door= y_train_door.astype(float)
#y_val_door= y_val_door.astype(float)

In [None]:
X_train_door[0,:,:].shape

In [None]:
def base_model():
    '''instanciate and return the CNN architecture with augmenting and rescaling layers'''
    
    augmentation = Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomZoom(0.1),
        layers.RandomTranslation(0.2, 0.2),
        layers.RandomRotation(0.1)
    ])
    
    model= Sequential([
        layers.Input(X_train_hood[0,:,:].shape),
        layers.Rescaling(scale= 1./255.),
        augmentation,
        layers.Conv2D(128, (3,3), activation='relu', padding='same'),
        layers.MaxPool2D(pool_size= (2,2), padding= 'same'),
        
        layers.Conv2D(128, (3,3), activation='relu', padding='same'),
        layers.MaxPool2D(pool_size= (2,2), padding= 'same'),

        layers.Conv2D(64, (3,3), activation='relu', padding='same'),
        layers.MaxPool2D(pool_size= (2,2), padding= 'same'),
        
        layers.Conv2D(64, (3,3), activation='relu', padding='same'),
        layers.MaxPool2D(pool_size= (2,2), padding= 'same'),
        
        layers.Conv2D(32, (3,3), activation='relu', padding='same'),
        layers.MaxPool2D(pool_size= (2,2), padding= 'same'),
        
        layers.Conv2D(32, (3,3), activation='relu', padding='same'),
        layers.MaxPool2D(pool_size= (2,2), padding= 'same'),

        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(num_classes_hood, activation='softmax')

    ])
    
    return model

In [None]:
def transferLearn_model():
    '''Uses pretrained model as a base and build dense layers on top of it'''
    
    augmentation = Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomZoom(0.1),
        layers.RandomTranslation(0.2, 0.2),
        layers.RandomRotation(0.1)
    ])
    
    base_model= resnet50.ResNet50(weights='imagenet', input_shape=(160,160,3), include_top=False,\
                           pooling= None)
    
    base_model.trainable=False
    
    model= Sequential([
        layers.Input((160,160,3)),
        augmentation,
        base_model,
        layers.Flatten(),
        layers.Dense(2048, activation="relu"),
        layers.Dense(1024, activation="relu"),
        layers.Dense(512, activation="relu"),
        #layers.Dropout(0.2),
        layers.Dense(128, activation="relu"),
        layers.Dense(64, activation="relu"),
        layers.Dropout(0.2),
        layers.Dense(num_classes_hood, activation="softmax"),

    ])
    
    return model

In [None]:
model= base_model()
model.summary()

In [None]:
# plot the model architecture
model= transferLearn_model()
model.summary()
#plot_model(model, show_shapes= True)

In [None]:
def compile_model(model, lr, epochs=None):
    '''return a compiled model suited for the task'''
    
    #opt= Adam(learning_rate= lr, decay= lr/epochs)
    
    model.compile(optimizer=Adam(learning_rate= lr), loss= 'categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
def train_augment(model, batch_size, epochs, patience, train_flow=None):
    """This function returns the fitted model and its train history"""
    
    # apply model regularization techniques
    MODEL= 'model_base_door'
    modelCheckpoint= ModelCheckpoint("{}.h5".format(MODEL), monitor="val_loss", verbose=0,\
                                               save_best_only=True)
    earlyStop= EarlyStopping(monitor='val_loss', mode='min', restore_best_weights=True, patience=patience)
    lreducer= ReduceLROnPlateau(monitor="val_loss",factor=0.1,patience= patience, verbose=2
                            ,mode="min", min_delta=0.0001, cooldown=0, min_lr=0)
    
    # fit the model
    history = model.fit(X_train_hood,
                        y_train_hood,
                      batch_size=batch_size,
                      steps_per_epoch= int(len(X_train_hood)/batch_size),
                      epochs = epochs,
                      callbacks = [modelCheckpoint, earlyStop, lreducer],
                      validation_data = (X_val_hood, y_val_hood))

    return model, history

In [None]:
# train the base model
model= base_model()
compiled_model= compile_model(model, lr= 1e-3, epochs= 50)
model, history= train_augment(compiled_model, batch_size= 64, epochs= 50, patience= 10)
model.save('./baseModel_hood')

In [None]:
# train the transfer learning model
model= transferLearn_model()
compiled_model= compile_model(model, lr= 1e-4, epochs= 50)
model, history= train_augment(compiled_model, batch_size= 32, epochs= 50, patience= 10)
model.save('./transferLearn_model_hood')

## Alternative Way for more Augmentation

In [None]:
X_train_hood= X_train_hood / 255.

In [None]:
# apply augmentation to batches of images and store them in memory
train_datagen = ImageDataGenerator(
    featurewise_center = False,
    featurewise_std_normalization = False,
    rotation_range = 10,
    width_shift_range = 0.1,
    height_shift_range = 0.1,
    horizontal_flip = True,
    zoom_range = (0.8, 1.2)
)

train_datagen.fit(X_train_hood)

train_flow = train_datagen.flow(X_train_hood, shuffle= False, batch_size = 1)

In [None]:
# show augmented images alongside original ones
for i, (raw_img, aug_img) in enumerate(zip(X_train_hood, train_flow)):
    _,(ax1, ax2)= plt.subplots(1,2, figsize=(6,3))
    ax1.imshow(raw_img)
    ax2.imshow(aug_img[0])
    ax1.axis('off')
    ax2.axis('off')
    ax1.set_title('true_img')
    ax2.set_title('aug_img')
    plt.show();
    
    if i > 10:
        break

## Model Evaluation

In [None]:
from tensorflow.keras.preprocessing import image
from sklearn.metrics import classification_report, confusion_matrix, multilabel_confusion_matrix, ConfusionMatrixDisplay

In [None]:
# plot the model training history
def plot_history(history):
    fig, ax = plt.subplots(1, 2, figsize=(15,5))
    ax[0].set_title('loss')
    ax[0].plot(history.epoch, history.history["loss"], label="Train loss")
    ax[0].plot(history.epoch, history.history["val_loss"], label="Validation loss")
    ax[1].set_title('accuracy')
    ax[1].plot(history.epoch, history.history["accuracy"], label="Train acc")
    ax[1].plot(history.epoch, history.history["val_accuracy"], label="Validation acc")
    ax[0].legend()
    ax[1].legend()

In [None]:
plot_history(history)

In [None]:
# prepare testing data
split_hood= 'data/split_hood2/'
X_test_hood, y_test_hood= create_dataset(split_hood + 'test')
#X_test_hood, y_test_hood= shuffle(X_test_hood, y_test_hood)

In [None]:
# scale pixels of test data according to the trained model
X_test_hood= resnet50.preprocess_input(X_test_hood).astype(float)
y_test_hood= y_test_hood.astype(float)

In [None]:
X_test_hood.shape, y_test_hood.shape

In [None]:
num_classes_hood= len(np.unique(y_test_hood, return_counts=False))
num_classes_hood

In [None]:
# put the labels in categorical vectors
y_test_hood= to_categorical(y_test_hood, num_classes= num_classes_hood)
y_test_hood.shape

In [None]:
# load the saved model
model_path= './transferLearn_model_hood'
loaded_model= tf.keras.models.load_model(model_path)

In [None]:
# the test accuracy is low since we specified the test data to be only 5% of the total amount, we wanted to save 
# most of data for training, we could have split data as (0.6, 0.2, 0.2) to have enough for testing
metrics= loaded_model.evaluate(X_test_hood, y_test_hood, return_dict=True)
metrics['loss'], metrics['accuracy']

In [None]:
y_pred_hood= loaded_model.predict(X_test_hood)

In [None]:
# the rmetrics are bad for classes 3,4 due to lack of sample data
print(classification_report(y_test_hood, y_pred_hood))

In [None]:
# show the confusion matrix, focus on how many true positives and true negatives are captured compared to false 
# positives and false negatives for each class -- again we could have saved more samples for testing

labels= [str(i) for i in range(0,6)]
conf_mat= {}
for label in range(len(labels)):
    #print(label)
    y_test_label= y_test_hood[:,label]
    #print(y_test_label)
    y_pred_label= y_pred_hood[:, label]
    conf_mat[labels[label]]= confusion_matrix(y_pred= y_pred_label, y_true= y_test_label)
for label, matrix in conf_mat.items():
    print('confusion matrix for label {}:'.format(label))
    print(matrix)
    print()

In [None]:
y_pred_hood= np.round(y_pred_hood).astype(float)

In [None]:
cm= multilabel_confusion_matrix(y_pred_hood, y_test_hood)

In [None]:
#label= [i for i in labels]
cm= multilabel_confusion_matrix(y_pred_hood, y_test_hood)
for i, confusion_matrix in enumerate(cm):
    #print(confusion_matrix)
    disp = ConfusionMatrixDisplay(confusion_matrix)
    disp.plot()
    disp.ax_.set_title('{}'.format(labels[i]))

    #disp.plot(include_values=True, cmap="viridis", ax=None, xticks_rotation="vertical")
    #plt.show()


In [None]:
# show testing images from each class with corresponding true and predicted labels
for f in os.listdir(split_hood + '/test'):
    subfolders= os.path.join(split_hood, 'test',f)
    if os.path.isdir(subfolders):
        files = [file for file in os.listdir(subfolders) if os.path.isfile(os.path.join(subfolders, file))]
        # Select random images
        random_images = random.sample(files, min(5, len(files)))
        print(f'taking images from {subfolders}'); print()
        for i,img in enumerate(random_images):
            #print(f'loading image from {os.path.join(subfolders, img)}')
            loaded_img = image.load_img(os.path.join(subfolders, img), target_size=(60, 60))
            test_img= cv2.imread(os.path.join(subfolders, img), cv2.COLOR_BGR2RGB)
            test_img= cv2.resize(test_img, (160,160))
            img_array = image.img_to_array(test_img)

            img_batch = np.expand_dims(img_array, axis=0)

            img_preprocessed = resnet50.preprocess_input(img_batch)

            prediction = loaded_model.predict(img_preprocessed)
            prediction= np.argmax(prediction, axis=1)
            plt.subplot(1,5,i+1)
            #plt.subplots_adjust(top=0.88)
            plt.imshow(loaded_img)
            plt.title(f' predicted label: {prediction[0]}', fontsize=8)
            plt.axis('off')
        plt.suptitle(f'taking images from class {f}', ha='center', va='bottom', fontsize=12, y=0.65)
        plt.tight_layout()
        plt.show();
            #print(f'predicted label: {prediction[0]}')

## Approach2: Combined Model – One Input and Multitarget Output

In [None]:
import seaborn as sns
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn import set_config; set_config(display='diagram')
from sklearn.metrics import mean_absolute_error,mean_squared_error, r2_score
from sklearn.utils import class_weight

import imblearn
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline

In [None]:
# Weighted loss function to balance the data – can only be applied to training data after split
# class_counts= df['filename'].value_counts()
# class_weights= len(class_counts)/class_counts
# resample_df= df.sample(n= class_counts.max()*len(class_counts), weights= df['filename'].map(class_weights), replace= True)

In [None]:
def create_features_labels(base_dir):
    images=[]
    labels_hood=[]
    labels_door=[]

    IMG_WIDTH=160
    IMG_HEIGHT=160
    for img in df.filename:
        img_path= os.path.join(base_dir, img)
        image= cv2.imread(img_path, cv2.COLOR_BGR2RGB)
        image=cv2.resize(image, (IMG_HEIGHT, IMG_WIDTH))
        images.append(image)
    for label_hood, label_door in zip(df.perspective_score_hood.values, df.perspective_score_backdoor_left.values):
        labels_hood.append(label_hood)
        labels_door.append(label_door)
    
    #images, labels_hood, label_door= shuffle(images, label_hood, label_door)
    return np.array(images), np.array(labels_hood), np.array(labels_door) 

In [None]:
len(os.listdir('data/original_data/imgs'))

In [None]:
base_dir= 'data/original_data/imgs'
X_train, y_train_hood, y_train_door = create_features_labels(base_dir)

In [None]:
X_train.shape, y_train_hood.shape, y_train_door.shape

In [None]:
X_train, X_test, y_train_hood, y_test_hood, y_train_door, y_test_door= train_test_split(X_train, y_train_hood, \
                                                    y_train_door, test_size= 0.20, random_state=42)

In [None]:
X_train.shape, X_test.shape, y_train_hood.shape, y_test_hood.shape, y_train_door.shape, y_test_door.shape

In [None]:
X_train, X_val, y_train_hood, y_val_hood, y_train_door, y_val_door = train_test_split(X_train, y_train_hood,\
                                        y_train_door, test_size=0.20, random_state=42)

In [None]:
X_train.shape, X_val.shape, y_train_hood.shape, y_val_hood.shape, y_train_door.shape, y_val_door.shape

In [None]:
y_train_door

In [None]:
# This can be applied only to one target, unfortunately these methods dont accept multi-target variable

# over = SMOTE(sampling_strategy= 'minority')
# under= RandomUnderSampler(sampling_strategy= 'majority')
# steps= [('o', over), ('u', under)]
# pipeline= Pipeline(steps= steps)

# X_train_balance, y_train_hood_balance= under.fit_resample(X_train.reshape(len(X_train),160*160*3)\
#                         ,y_train_cat)

# X_train_balance, y_train_door_balance= under.fit_resample(X_train.reshape(len(X_train),160*160*3)\
#                         ,y_train_door_cat)

# X_train_balance.shape, y_train_hood_balance.reshape(-1,1).shape

# X_train_balance.shape, y_train_door_balance.reshape(-1,1).shape

In [None]:
def augment(x):
    x= layers.RandomFlip("horizontal")(x)
    x= layers.RandomZoom(0.1)(x)
    x= layers.RandomTranslation(0.2, 0.2)(x)
    x= layers.RandomRotation(0.1)(x)
    return x

In [None]:
def initialize_model():
    '''instanciate and return the CNN architecture'''
    
    input_layer= layers.Input(shape= (160,160,3))
    x= layers.Rescaling(scale= 1/255.)(input_layer)
    x= augment(x)
    x= layers.Conv2D(16, (3,3), activation='relu', padding='same')(x)
    x= layers.MaxPool2D(2,2)(x)

    x= layers.Conv2D(32, (3,3), activation='relu', padding='same')(x)
    x= layers.MaxPool2D(2,2)(x)

    x= layers.Conv2D(64, (2,2), activation='relu', padding='same')(x)
    x= layers.MaxPool2D(2,2)(x)

    x= layers.Flatten()(x)
    x= layers.Dense(64, activation= 'relu')(x)
    x= layers.Dense(32, activation= 'relu')(x)
    #x= layers.Dropout(0.2)(x)
    out_hood= layers.Dense(1, activation= 'sigmoid', name= 'out_hood')(x)
    out_door= layers.Dense(1, activation= 'sigmoid', name= 'out_door')(x)
    model= Model(inputs= input_layer, outputs= [out_hood, out_door])
    
    return model

In [None]:
initialize_model()

In [None]:
plot_model(initialize_model())

In [None]:
def compile_model(model):
    model.compile(loss= ['binary_crossentropy', 'binary_crossentropy'], optimizer= 'adam', metrics=['mae'])
    return model

In [None]:
# result for activation= sigmoid, loss= binary_crossentropy
model= initialize_model()
compiled_model= compile_model(model) 
compiled_model.fit(x= X_train, y= [y_train_hood, y_train_door], \
                   validation_data=(X_val, [y_val_hood, y_val_door]),\
                    epochs= 30, batch_size= 32, verbose=2)

In [None]:
# result for activation= linear, loss= mse

def compile_model(model):
    model.compile(loss= ['mse', 'mse'], optimizer= 'adam', metrics=['mae'])
    return model

model= initialize_model()
compiled_model= compile_model(model) 
compiled_model.fit(x= X_train, y= [y_train_hood, y_train_door], \
                   validation_data=(X_val, [y_val_hood, y_val_door]),\
                    epochs= 30, batch_size= 32, verbose=2)

In [None]:
out_hood= model.layers[-2].output
out_hood

In [None]:
#https://stackoverflow.com/questions/66715975/class-weights-in-cnn
hood_class_weight= class_weight.compute_class_weight(class_weight= 'balanced',\
                        classes= sorted(np.unique(y_train_hood)), y= y_train_hood)
door_class_weight= class_weight.compute_class_weight(class_weight= 'balanced',\
                        classes= sorted(np.unique(y_train_door)),y= y_train_door)
hood_class_weight= {i:hood_class_weight[i] for i, label in enumerate(sorted(np.unique(y_train_hood)))}
door_class_weight= {i:door_class_weight[i] for i, label in enumerate(sorted(np.unique(y_train_door)))}

class_weights={
    'out_hood':hood_class_weight,
    'out_door':door_class_weight
}
#hood_class_weight

In [None]:
#https://github.com/keras-team/keras/issues/11735
def class_loss(class_weight):
  """Returns a loss function for a specific class weight tensor
  
  Params:
    class_weight: 1-D constant tensor of class weights
    
  Returns:
    A loss function where each loss is scaled according to the observed class"""
  def loss(y_obs, y_pred):
    y_obs = tf.dtypes.cast(y_obs, tf.int32)
    hothot = tf.one_hot(tf.reshape(y_obs, [-1]), depth=class_weight.shape[0])
    weight = tf.math.multiply(class_weight, hothot)
    weight = tf.reduce_sum(weight, axis=-1)
    losses = tf.compat.v1.losses.sparse_softmax_cross_entropy(labels=y_obs,
                                                              logits=y_pred,
                                                              weights=weight)
    return losses
  return loss

In [None]:
# tried to balance the classes with compute_class_weight from sklearn, but the class_weight in the fit() method
# accepts only dict and neither list of dict or dict of dict for balancing multitargets simultaneously
def compile_model(model):
    
    model.compile(loss= {k: class_loss(v) for k,v in class_weights.items()}, optimizer= 'adam', metrics=['mae'])
    return model

In [None]:
compiled_model.history.history.keys()

In [None]:
compiled_model.save('regres_linear_act')

In [None]:
# Evaluation results for model with activation= linear, loss= mse
# plot the model training history

def plot_history(history):
    fig, ax = plt.subplots(1, 2, figsize=(15,5))
    ax[0].set_title('loss')
    ax[0].plot(history.epoch, history.history["loss"], label="Train loss")
    ax[0].plot(history.epoch, history.history["val_loss"], label="Validation loss")
    ax[1].set_title('mae')
    ax[1].plot(history.epoch, history.history["dense_41_mae"], label="Train mae")
    ax[1].plot(history.epoch, history.history["val_dense_41_mae"], label="Validation mae")
    ax[0].legend()
    ax[1].legend()
    

In [None]:
# one can increase the batch size to reduce the stochastic behaviour of the loss,
#however the model converged at end of training
plot_history(compiled_model.history)

In [None]:
X_test_scaled= X_test/255.

In [None]:
y_pred_hood, y_pred_door= compiled_model.predict(X_test_scaled)

In [None]:
y_pred_hood.shape, y_pred_door.shape, y_test_hood.shape, y_test_door.shape

In [None]:
mae_hood= mean_absolute_error(y_test_hood, y_pred_hood)
mae_door= mean_absolute_error(y_test_door, y_pred_door)
print('MAE hood error: %.3f' % mae_hood)
print('MAE door error: %.3f' % mae_door)

In [None]:
mse_hood_error= mean_squared_error(y_test_hood, y_pred_hood)
mse_door_error= mean_squared_error(y_test_door, y_pred_door)
print('mse hood score: %.3f' % mse_hood_error)
print('mse door score: %.3f' % mse_door_error)

In [None]:
metrics= compiled_model.evaluate(X_test_scaled, y_test_hood, return_dict=True)
metrics

In [None]:
metrics_door= compiled_model.evaluate(X_test_scaled, y_test_door, return_dict=True)
metrics_door

In [None]:
#model_path= './regres_linear_act'
#loaded_model= tf.keras.models.load_model(model_path)