------------------------------------------------------------------------------------------------------------------------------------------------

### ML/DL Projekt Gruppe 12:
#### Classification Topic Choice:
Horse - Unicorn 

#### Authors:
- Tim Widholm (86179)
- Emre Tuelue (83128)

#### Team Member Contributions:
Tim Widholm:
- Image Collecting
- Data set Generation and Preprocessing
- Model training
- Model evaluation
- Model testing
- Programming

Emre Tuelue:
- Creation and Preparation of Presentation 

#### Data Collection
Horse data:
- Images of real horses found on the Web
Unicorn data:
- Generated images with Stability AI (DreamStudio) and DALL-E 3 (Bulk Image Generator)
- Images of the Web


#### Use of Generative AI:
- Image Generation (DALL-E 3, Stable Diffusion)
- Model performance improvements, Code improvements (GPT-4o) 

#### Sources:
Code:
- [1] A. Karpathy, "Neural Networks: Zero to Hero," YouTube, Oct. 13, 2022. [Online]. Available: https://www.youtube.com/watch?v=jztwpsIzEGc. [Accessed: Feb. 9, 2025].

Model for Transfer Learning: 
- [2] K. He, X. Zhang, S. Ren, and J. Sun, “Deep Residual Learning for Image Recognition,” in *Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition (CVPR)*, 2016, pp. 770–778. [Online]. Available: https://arxiv.org/abs/1512.03385
- [3] Keras Team, “Keras Applications: ResNet,” Keras Documentation, 2024. [Online]. Available: https://keras.io/api/applications/resnet/. [Accessed: Feb. 9, 2025].

Data Collection:
- [4] Stability AI, "DreamStudio AI - Image Generation Platform," Stability AI, 2025. [Online]. Available: https://beta.dreamstudio.ai/generate. [Accessed: Feb. 9, 2025].
- [5] Bulk Image Generation, "Bulk Image Generation - AI Image Generator," 2025. [Online]. Available: https://bulkimagegeneration.com. [Accessed: Feb. 9, 2025].

Generative AI:
- [6] OpenAI, "Hello GPT-4o," 2024. [Online]. Available: https://openai.com/index/hello-gpt-4o/. [Accessed: Feb. 9, 2025].
- [7] OpenAI, "DALL·E 3," 2025. [Online]. Available: https://openai.com/index/dall-e-3/. [Accessed: Feb. 9, 2025].
- [8] Stability AI, "StableStudio: Open Source Community-Driven Future of DreamStudio," 2025. [Online]. Available:    https:stabilityainewsstablestudio-open-source-community-driven-future-dreamstudio-release. [Accessed: Feb. 9, 2025].

Packages/Libs:
- tensorflow
- opencv-python
- matplotlib
- numpy
- scikit-learn
- seaborn
- ipykernel

Frameworks:
- Anaconda 3, Python v3.12.7, pip v24.2
- Jupyter Lab/ Notebook
- VS Code Jupyter Extension
- Visual Studio 2019 C/C++ Build Tools

--------------------------------------------------------------------------------------------------

#### Table of Contents:
##### 1. General Code for both Models
##### 1.1 Data Selection

##### 2. pre-trained Model with Transfer Learning
##### 2.1 Imports
##### 2.2 Data Preprocessing
##### 2.3 Model
###### 2.3.1 Model Callbacks
###### 2.3.2 Model Structure and Configuration
###### 2.3.3 Model Training
##### 2.4 Fine-Tuning
###### 2.4.1 Fine-Tuning Callbacks
###### 2.4.2 Fine-Tuning Model Structure and Configuration
###### 2.4.3 Fine-Tuning Process
##### 2.5 Model Evaluation
##### 2.6 Model Saving

##### 3. own Model
##### 3.1 Imports
##### 3.2 Data Preprocessing
##### 3.3 Base Model
###### 3.3.1 Base Model Data Preprocessing
###### 3.3.2 Base Model Callbacks
###### 3.3.3 Base Model Structure and Configuration
###### 3.3.4 Base Model Training
##### 3.4 Fine-Tuning
###### 3.4.1 Fine-Tuning Model Data Preprocessing
###### 3.4.2 Fine-Tuning Model Callbacks
###### 3.4.3 Fine-Tuning Model Structure and Configuration
###### 3.4.4 Fine-Tuning Model Trainin

---------------------------------------------------------------------------------------------------------------------------------------------------------------------

### 1. General Code for both Models

----------------------------------------------------------------------------------------------

1.1 Data Selection

In [None]:
import os
from pathlib import Path
import imghdr
import cv2

In [None]:
data_dir = 'data'

In [None]:
# [1]
image_exts = ['jpeg','jpg','bmp','png']

In [None]:
# Removes all images that do not meet the selected file endings [1]
for image_class in os.listdir(data_dir):
    for image in os.listdir(os.path.join(data_dir, image_class)):
        image_path = os.path.join(data_dir, image_class, image)
        try:
            img = cv2.imread(image_path)
            tip = imghdr.what(image_path)
            if tip not in image_exts:
                print('Image not in ext list {}'.format(image_path))
                os.remove(image_path)
        except Exception as e:
            print('Issue with image {}'.format(image_path))



In [None]:
# Removes all images that do not meet the selected file endings [1]
base_path = Path(data_dir)
allowed_extensions = {'.jpg', '.jpeg', '.png', '.bmp'}

for file in base_path.rglob('*'):
    if file.is_file() and file.suffix.lower() not in allowed_extensions:
        print(f"Remove unsupported files: {file}")
        file.unlink()  

In [None]:
# Removes all images that aren't encoded in UTF-8 [1]
base_path = 'data'
for root, dirs, files in os.walk(base_path):
    for name in dirs + files:
        try:
            print(os.path.join(root, name))
        except UnicodeEncodeError as e:
            print(f"Error with file: {name} - {e}")

In [None]:
# Encodes all file-names that aren't UTF-8 into UTF-8 [1]
base_path = 'data'
for root, dirs, files in os.walk(base_path):
    for name in dirs + files:
        try:
            full_path = os.path.join(root, name)
            full_path.encode('utf-8')  
        except UnicodeEncodeError as e:
            print(f"Problematic path: {full_path} - {e}")

In [None]:
# Replaces all whitespaces in file-names [1]
base_path = 'data'
counter = 0
for root, dirs, files in os.walk(base_path):
    for file_name in files:
        counter=counter+1
        new_name = file_name.replace("(", "").replace(")", "").replace(" ", f'_{counter}')
       
        old_file = os.path.join(root, file_name)
        new_file = os.path.join(root, new_name)
        os.rename(old_file, new_file)
        print(f"Renamed: {old_file} → {new_file}")

In [None]:
# Deletes all chars in file-names that aren't ASCII and replaces them [1]
base_path = data_dir

for root, dirs, files in os.walk(base_path):
    for name in dirs + files:
        new_name = name.encode('ascii', errors='ignore').decode('ascii')  
        old_path = os.path.join(root, name)
        new_path = os.path.join(root, new_name)
        os.rename(old_path, new_path)
        print(f"Renamed: {old_path} → {new_path}")

In [None]:
# Counts each folder of images of the two classes [1]

base_path = data_dir

folders = ['horse', 'unicorn']

for folder in folders:
    folder_path = os.path.join(base_path, folder)
    if os.path.exists(folder_path):
        num_files = sum([len(files) for _, _, files in os.walk(folder_path)])
        print(f"Number of files in '{folder}': {num_files}")
    else:
        print(f"Folder '{folder}' doesn't exist.")

---------------------------------------------------------------------------------------------------------------

### 2. Modell 1: Pre-trained Model with Transfer Learning

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

2.1 Imports

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, Input, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau,Callback
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import load_model

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report

import os
import shutil
import matplotlib.pyplot as plt
import random
import cv2
import numpy as np
import seaborn as sns


2.2 Data Preprocessing

In [None]:
# Gets all necessary paths
original_data_dir = "data"
base_dir = "split_data_model_1"
train_dir = os.path.join(base_dir, "train")
val_dir = os.path.join(base_dir, "val")
test_dir = os.path.join(base_dir, "test")

In [None]:
# Function for automatic and customized data split
def split_data(data_dir, train_dir, val_dir, test_dir, split_ratio=(0.5, 0.25, 0.25)):
    if os.path.exists(base_dir):
        shutil.rmtree(base_dir)
    os.makedirs(train_dir)
    os.makedirs(val_dir)
    os.makedirs(test_dir)

    for class_name in os.listdir(data_dir):
        class_path = os.path.join(data_dir, class_name)
        images = os.listdir(class_path)
        train_images, temp_images = train_test_split(images, test_size=split_ratio[1] + split_ratio[2], random_state=42)
        val_images, test_images = train_test_split(temp_images, test_size=split_ratio[2] / (split_ratio[1] + split_ratio[2]), random_state=42)

        for split, split_dir in zip([train_images, val_images, test_images], [train_dir, val_dir, test_dir]):
            class_split_dir = os.path.join(split_dir, class_name)
            os.makedirs(class_split_dir)
            for image in split:
                shutil.copy(os.path.join(class_path, image), os.path.join(class_split_dir, image))
                
split_data(original_data_dir, train_dir, val_dir, test_dir)

In [None]:
# Counting train data, validation data and test data
def count_images(directory):
    class_counts = {}
    for class_name in os.listdir(directory):
        class_path = os.path.join(directory, class_name)
        if os.path.isdir(class_path):
            class_counts[class_name] = len(os.listdir(class_path))
    return class_counts

print("Trainingsdaten:", count_images(train_dir))
print("Validierungsdaten:", count_images(val_dir))
print("Testdaten:", count_images(test_dir))

In [None]:
# Function that shows random images to verify correct class labels
def show_random_images(directory, class_name, num_images=5):
    class_path = os.path.join(directory, class_name)
    image_files = random.sample(os.listdir(class_path), num_images)

    fig, axes = plt.subplots(1, num_images, figsize=(15, 5))
    for i, img_file in enumerate(image_files):
        img_path = os.path.join(class_path, img_file)
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        axes[i].imshow(img)
        axes[i].axis("off")
    plt.show()

# Show 5 random unicorn images
show_random_images(train_dir, "unicorn")

# Show 5 random horse images
show_random_images(train_dir, "horse")

In [None]:
# Integrate Data Augmentation for improved learning and training
train_datagen = ImageDataGenerator(
    rescale=1.0 / 255,
    rotation_range=40,
    width_shift_range=0.3,
    height_shift_range=0.3,
    shear_range=0.3,
    zoom_range=0.3,
    horizontal_flip=True,
    brightness_range=[0.7, 1.3]  
)
# No Data augmentation for val_data, only rescaling
val_datagen = ImageDataGenerator(rescale=1.0 / 255)

In [None]:
# Data preparation and generation
# Batch size of 16 shows to be the optimum in this case
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(224, 224),
    batch_size=16,
    class_mode='binary'
)
val_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(224, 224),
    batch_size=16,
    class_mode='binary'
)

2.3 Model

2.3.1 Model Callbacks

In [None]:
# Set Callbacks used for training
# Early Stopping stops training process when the val_loss doesn't decrease after 10 epochs 
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Reduce Learning Rate reduces learning rate to 50% when the val_loss doesn't change in 4 epochs
# Limit of reduction is 4e-6
reduce_lr = ReduceLROnPlateau(monitor='val_loss',factor=0.5, patience=4, min_lr=4e-6 )

In [None]:
# Set custom callback: Training stops when certain loss is achieved (here 0.2)
class CustomStop(Callback):
    def __init__(self, target_loss = 0.2):
        super(CustomStop, self).__init__()
        self.target_loss = target_loss
    def on_epoch_end(self, epoch, logs = None):
        if logs.get("loss") and logs["loss"] < self.target_loss:
            print(f"\n Stopping training: Loss {logs['loss']:.4f} reached target {self.target_loss:.4f}!")
            self.model.stop_training=True

custom_stop = CustomStop(target_loss=0.2)

In [None]:
# Get Class weights to ensure balanced training
# In our training we had almost no difference in dataset size of the two classes, so class weights weren't essential

# Counting Classes
class_counts = count_images(train_dir)
classes = np.array(list(class_counts.keys()))
class_weights = compute_class_weight("balanced", classes=np.arange(len(classes)), y=np.hstack([np.ones(v) * i for i, v in enumerate(class_counts.values())]))

# Dictionary for Model-Training
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}

2.3.2 Model Structure and Configuration

In [None]:
# Loading pre-trained model (ResNet50) [2],[3]
base_model = ResNet50(weights="imagenet", include_top=False, input_shape=(224, 224, 3))

In [None]:
# Freeze base model to only train the added layers
base_model.trainable = False

In [None]:
# Transfer-Learning-Modell erstellen
# Dropout to improve stability of loss and decrease the chance of overfitting
# Batch Normalization also helps against overfitting
# GlobalAveragePooling calculates Average values of the base model feature map
model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(128, activation='relu'),
    BatchNormalization(),
    Dropout(0.4),
    Dense(64, activation='relu'),
    BatchNormalization(),
    Dropout(0.4),
    Dense(1, activation='sigmoid')
])


In [None]:
base_model.trainable = False

In [None]:
# Compile Model
# Learning rate of 0.001 was the optimum in our case to start with
# Using optimizer Adam
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

In [None]:
# Plotting model architecture (WARNING: Very Large)
model.summary()

2.3.3 Model Training

In [None]:
# Training with 50 epochs
# Training stops at epoch 42 because of Early Stopping
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=50,
    class_weight=class_weights_dict,
    callbacks=[reduce_lr,custom_stop,early_stopping]
)

In [None]:
# Plotting of loss and accuracy 

fig, ax = plt.subplots(1, 2, figsize=(12, 6))

ax[0].plot(history.history['loss'], label='Train Loss')
ax[0].plot(history.history['val_loss'], label='Val Loss')
ax[0].set_title('Loss')
ax[0].legend()

ax[1].plot(history.history['accuracy'], label='Train Accuracy')
ax[1].plot(history.history['val_accuracy'], label='Val Accuracy')
ax[1].set_title('Accuracy')
ax[1].legend()

plt.show()

2.4 Fine-Tuning Model

2.4.1 Fine-Tuning Callbacks

In [None]:
# ReduceLROnPlateau for fine-tuning
reduce_lr_fine = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=4,  
    min_lr=4e-6 
)


2.4.2 Fine-Tuning Model Structure and Configuration

In [None]:
# Activate base model for fine-tuning
base_model.trainable = True

In [None]:
# Only train the last 50 layers
for layer in base_model.layers[-50:]:  
    layer.trainable = True

In [None]:
# Compile Model for fine-tuning with learning rate 0.00005 
# Small learning rate ensures stable weight updates and improves error decrease
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

2.4.3 Fine-Tuning Process

In [None]:
# Fine-Tuning Training with 70 epochs
# Training stops at 59 epochs due to Early stopping
history_fine = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=70,
    callbacks=[early_stopping, reduce_lr_fine]
)

In [None]:
# Fine-Tuning + Training Results plotting
# plotting loss and accuracy

fig, ax = plt.subplots(1, 2, figsize=(12, 6))


ax[0].plot(history.history['loss'] + history_fine.history['loss'], label='Train Loss')
ax[0].plot(history.history['val_loss'] + history_fine.history['val_loss'], label='Val Loss')
ax[0].set_title('Loss')
ax[0].legend()


ax[1].plot(history.history['accuracy'] + history_fine.history['accuracy'], label='Train Accuracy')
ax[1].plot(history.history['val_accuracy'] + history_fine.history['val_accuracy'], label='Val Accuracy')
ax[1].set_title('Accuracy')
ax[1].legend()

plt.show()


2.5 Model Evaluation

In [None]:
# Load Test data
# No shuffle so that the order of the predictions matches the labels. 
test_generator = val_datagen.flow_from_directory(
    test_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    shuffle=False  
)

# Predictions for test set
# Threshold at 0.5 for binary classification
predictions = model.predict(test_generator)
predicted_classes = (predictions > 0.5).astype("int").flatten()  
true_classes = test_generator.classes  
class_labels = list(test_generator.class_indices.keys()) 

# Create Confusion Matrix
cm = confusion_matrix(true_classes, predicted_classes)

# Plotting of Confusion Matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

# Print metrics
print(classification_report(true_classes, predicted_classes, target_names=class_labels))

In [None]:
# Load image for operation mode test
img_path = "./testdata/test_horse.jpg"
img = image.load_img(img_path, target_size=(224, 224))
img_array = np.expand_dims(np.array(img) / 255.0, axis=0)

In [None]:
# Prediction
prediction = model.predict(img_array)
class_label = "unicorn" if prediction[0] > 0.5 else "horse"
print(f"Das Modell sagt: {class_label} mit Wahrscheinlichkeit {prediction[0][0]:.2f}")

2.6 Saving Model

In [None]:
model_1_name = 'ML_DL_Gruppe_12_Model_1'

In [None]:
# Saving as .h5-file
model.save(os.path.join('models',f'{model_1_name}.h5'))

In [None]:
# Saving as .keras-file
model.save(os.path.join('models',f'{model_1_name}.keras'))

In [None]:
model = None

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### 3. Modell 2: own Model 

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

3.1 Imports

In [None]:

import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomZoom, RandomContrast
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense,Flatten,Dropout,Input, BatchNormalization,LeakyReLU,GlobalAveragePooling2D
from tensorflow.keras.regularizers import l2
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import load_model

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report

import os
import imghdr
import shutil
import cv2
import shutil
import matplotlib.pyplot as plt
import random
import numpy as np
import seaborn as sns

3.2 Data Preprocessing

In [None]:
# Gets all necessary paths
original_data_dir = "data"
base_dir = "split_data_model_2"
train_dir = os.path.join(base_dir, "train")
val_dir = os.path.join(base_dir, "val")
test_dir = os.path.join(base_dir, "test")

In [None]:
# Function for automatic and customized split
def split_data(data_dir, train_dir, val_dir, test_dir, split_ratio=(0.5, 0.25, 0.25)):
    if os.path.exists(base_dir):
        shutil.rmtree(base_dir)
    os.makedirs(train_dir)
    os.makedirs(val_dir)
    os.makedirs(test_dir)

    for class_name in os.listdir(data_dir):
        class_path = os.path.join(data_dir, class_name)
        images = os.listdir(class_path)
        train_images, temp_images = train_test_split(images, test_size=split_ratio[1] + split_ratio[2], random_state=42)
        val_images, test_images = train_test_split(temp_images, test_size=split_ratio[2] / (split_ratio[1] + split_ratio[2]), random_state=42)

        for split, split_dir in zip([train_images, val_images, test_images], [train_dir, val_dir, test_dir]):
            class_split_dir = os.path.join(split_dir, class_name)
            os.makedirs(class_split_dir)
            for image in split:
                shutil.copy(os.path.join(class_path, image), os.path.join(class_split_dir, image))

split_data(original_data_dir, train_dir, val_dir, test_dir)

In [None]:
# Counting train data, validation data and test data
def count_images(directory):
    class_counts = {}
    for class_name in os.listdir(directory):
        class_path = os.path.join(directory, class_name)
        if os.path.isdir(class_path):
            class_counts[class_name] = len(os.listdir(class_path))
    return class_counts

print("Trainingsdaten:", count_images(train_dir))
print("Validierungsdaten:", count_images(val_dir))
print("Testdaten:", count_images(test_dir))

In [None]:
# Function that shows random images to verify correct class labels
def show_random_images(directory, class_name, num_images=5):
    class_path = os.path.join(directory, class_name)
    image_files = random.sample(os.listdir(class_path), num_images)

    fig, axes = plt.subplots(1, num_images, figsize=(15, 5))
    for i, img_file in enumerate(image_files):
        img_path = os.path.join(class_path, img_file)
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        axes[i].imshow(img)
        axes[i].axis("off")
    plt.show()

# Show an example set of images to check correctness of class labels
# Show 5 random unicorn images
show_random_images(train_dir, "unicorn")

# Show 5 random horse images 
show_random_images(train_dir, "horse")

3.3 Base Model 

3.3.1 Base Model Data Preprocessing

In [None]:
# Data augmentation 
train_datagen = ImageDataGenerator(
    rescale=1.0 / 255,
    rotation_range=25,  
    width_shift_range=0.2,  
    height_shift_range=0.2,
    shear_range=0.2,  
    zoom_range=0.2,  
    horizontal_flip=True,
    brightness_range=[0.7, 1.3],  
    channel_shift_range=20.0,  
    fill_mode='reflect'
)

val_datagen = ImageDataGenerator(rescale=1.0 / 255)

In [None]:
# Prepare and generate Data
# Batch size of 16 proofs to be the optimum in this case
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(224, 224),
    batch_size=16,
    class_mode='binary'
)

val_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(224, 224),
    batch_size=16,
    class_mode='binary'
)

In [None]:
# Data Augmentation directly integrated in Model architecture
data_augmentation = tf.keras.Sequential([
    RandomFlip("horizontal_and_vertical"),  
    RandomRotation(0.3), 
    RandomZoom(0.3),  
    RandomContrast(0.3), 
])

3.3.2 Base Model Callbacks

In [None]:
# Function for gradually decreasing the learning rate
# Sets/ schedules the exact epoch where learning rate gets decreased
def scheduler(epoch, lr):
    if epoch < 5:
        return 5e-3  
    elif epoch < 10:
        return 1e-3 
    elif epoch < 15:
        return 5e-4 

    elif epoch < 25:  
        return 1e-4  
    elif epoch < 35:
        return 5e-5 
    else:
        return 2e-5  

lr_callback = LearningRateScheduler(scheduler)

In [None]:
# Gets the tensorboard callback [1]
logdir='logs'
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
# This callback decreases the learning rate when the val_loss doesn't change after 3 epochs 
# Each function call decreases the learning rate to 50% 
# Learning rate decreases down to 1e-6
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,         
    patience=3,         
    min_lr=1e-6         
)

In [None]:
# CustomStop callback automatically stops training when loss equals 0.2
class CustomStop(Callback):
    def __init__(self, target_loss = 0.2):
        super(CustomStop, self).__init__()
        self.target_loss = target_loss
    def on_epoch_end(self, epoch, logs = None):
        if logs.get("loss") and logs["loss"] < self.target_loss:
            print(f"\n Stopping training: Loss {logs['loss']:.4f} reached target {self.target_loss:.4f}!")
            self.model.stop_training=True
            
custom_stop = CustomStop(target_loss=0.2)

In [None]:
# Early Stopping Callback stops training when val_loss doesn't change after 5 epochs
# Loads best weights after training is finished/ stopped
early_stopping = EarlyStopping(
    monitor='val_loss',  
    patience=5,          
    restore_best_weights=True  
)

In [None]:
# Class weights to improve training when data set is unbalanced
class_counts = count_images(train_dir)
classes = np.array(list(class_counts.keys()))
class_weights = compute_class_weight("balanced", classes=np.arange(len(classes)), y=np.hstack([np.ones(v) * i for i, v in enumerate(class_counts.values())]))

# Dictionary for Model-Training
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}

3.3.3 Base Model Initializing and Structure Configuration

In [None]:
# Load Sequential Model
model = Sequential()

In [None]:
# Create Model Architecture [1]
# Dropout increases by each layer to have stable weight changes and minimize overfitting
# Batch Normalization also helps against overfitting, chance of exploding gradients and accelerates training
# LeakyReLU is better for Backpropagation
# Ridge Regularization (l2) helps against overfitting and stabilizes training process


# Input Layer
# Data Augmentation as first layer
model.add(data_augmentation)

# Convolutional Block 1

model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(256, 256, 3)))
model.add(BatchNormalization())
model.add(MaxPooling2D())
model.add(Dropout(0.25))

# Convolutional Block 2
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling2D())
model.add(Dropout(0.3))

# Convolutional Block 3
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling2D())
model.add(Dropout(0.4))

# Convolutional Block 4
model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling2D())
model.add(Dropout(0.4))

# Global Pooling
model.add(GlobalAveragePooling2D())


# First Dense layer
model.add(Dense(512, kernel_regularizer=l2(0.01)))
model.add(LeakyReLU(alpha=0.1))
model.add(BatchNormalization())
model.add(Dropout(0.65))

# Second Dense layer
model.add(Dense(256, kernel_regularizer=l2(0.01)))
model.add(LeakyReLU(alpha=0.1))
model.add(BatchNormalization())
model.add(Dropout(0.7))

# Third Dense layer
model.add(Dense(128, kernel_regularizer=l2(0.01)))
model.add(LeakyReLU(alpha=0.1))
model.add(BatchNormalization())
model.add(Dropout(0.5))  

# Fourth Dense layer
model.add(Dense(64, kernel_regularizer=l2(0.01)))
model.add(LeakyReLU(alpha=0.1))
model.add(BatchNormalization())
model.add(Dropout(0.5))

# Output Layer
model.add(Dense(1, activation='sigmoid'))

In [None]:
# Compile model with Adam optimizer at learning rate 0.001
# Loss function is binary crossentropy
model.compile(optimizer=Adam(learning_rate=0.005),  
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [None]:
# Plotting of model architecture
model.summary()

3.3.4 Base Model Training

In [None]:
# Training the model with 60 epochs 
hist = model.fit(train_generator,epochs=60,validation_data=val_generator,class_weight=class_weights_dict,callbacks=[reduce_lr, tensorboard_callback,early_stopping,lr_callback])

In [None]:
# Plotting the loss
fig = plt.figure()
plt.plot(hist.history['loss'], color='teal',label='loss')
plt.plot(hist.history['val_loss'], color='orange',label='val_loss')
fig.suptitle('Loss',fontsize=20)
plt.legend(loc="upper left")
plt.show()

In [None]:
# Plotting the accuracy
fig = plt.figure()
plt.plot(hist.history['accuracy'], color='teal',label='accuracy')
plt.plot(hist.history['val_accuracy'], color='orange',label='val_accuracy')
fig.suptitle('Accuracy',fontsize=20)
plt.legend(loc="upper left")
plt.show()

3.4 Fine-Tuning Model

3.4.1 Fine-Tuning Data Preprocessing

In [None]:
# Weaker Data augmentation for fine-tuning 
fine_tune_datagen = ImageDataGenerator(
    rescale=1.0 / 255,
    rotation_range=10,  
    width_shift_range=0.1,  
    height_shift_range=0.1,
    shear_range=0.1,  
    zoom_range=0.1, 
    horizontal_flip=True, 
    brightness_range=[0.9, 1.1],  
    channel_shift_range=10.0,  
    fill_mode='nearest'
)

# Create fine-tune training data generator
# Batch size of 8 
fine_tune_generator = fine_tune_datagen.flow_from_directory(
    train_dir,
    target_size=(224, 224),
    batch_size=8,
    class_mode='binary'
)

In [None]:
# No data augmentation for val_data, only rescaling
val_datagen = ImageDataGenerator(rescale=1.0 / 255)

# Create Fine-Tuning val data generator
# Batch size of 8
val_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(224, 224),
    batch_size=8,
    class_mode='binary',
    shuffle=False
)

In [None]:
# Data augmentation directly integrated in model architecture for fine-tuning
data_augmentation = tf.keras.Sequential([
    RandomFlip("horizontal"),
    RandomRotation(0.05),  
    RandomZoom(0.1),  
    RandomContrast(0.1),  
])

3.4.2 Fine-Tuning Callbacks

In [None]:
# Learning rate schedule for fine-tuning
def fine_tune_scheduler(epoch, lr):
    if epoch < 10:
        return 5e-5  
    elif epoch < 20:
        return 2e-5 
    else:
        return 1e-5 

fine_tune_callback = LearningRateScheduler(fine_tune_scheduler)

3.4.3 Fine-Tuning Model Structure 

In [None]:
# Freeze all convolutional and pooling layers and only train 8 dense layers
for layer in model.layers[:-8]: 
    layer.trainable = False

# Compile model again with momentum optimization of 0.9 
# Start with learning rate 1e-5
# Using optimizer SGD
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=1e-5, momentum=0.9),
              loss='binary_crossentropy',
              metrics=['accuracy'])

3.4.4 Fine-Tuning Process

In [None]:
# Start Fine-Tuning Training with 20 epochs
fine_tune_hist = model.fit(
    fine_tune_generator,
    epochs=20,  
    validation_data=val_generator,
    callbacks=[fine_tune_callback]
)

In [None]:
# Plotting accuracy and loss of fine-tuning + base model training
fig, ax = plt.subplots(1, 2, figsize=(12, 6))

ax[0].plot(hist.history['loss'] + fine_tune_hist.history['loss'], label='Train Loss')
ax[0].plot(hist.history['val_loss'] + fine_tune_hist.history['val_loss'], label='Val Loss')
ax[0].set_title('Loss')
ax[0].legend()

ax[1].plot(hist.history['accuracy'] + fine_tune_hist.history['accuracy'], label='Train Accuracy')
ax[1].plot(hist.history['val_accuracy'] + fine_tune_hist.history['val_accuracy'], label='Val Accuracy')
ax[1].set_title('Accuracy')
ax[1].legend()

plt.show()

3.5 Evaluation of Model

In [None]:
# Load and generate test data
test_generator = val_datagen.flow_from_directory(
    test_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    shuffle=False  
)

# Get predictions of test data
predictions = model.predict(test_generator)
predicted_classes = (predictions > 0.5).astype("int").flatten()  
true_classes = test_generator.classes 
class_labels = list(test_generator.class_indices.keys())  

# Calculate Confusion Matrix berechnen
cm = confusion_matrix(true_classes, predicted_classes)

# Plotting of Confusion Matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

# Calculate metrics (Precision, Recall, F1-Score)
print(classification_report(true_classes, predicted_classes, target_names=class_labels))

In [None]:
# Load unseen Image for personal test
img_path = "./testdata/test_horse.jpg"
img = image.load_img(img_path, target_size=(224, 224))
img_array = np.expand_dims(np.array(img) / 255.0, axis=0)

In [None]:
# Get predictions of personal test
prediction = model.predict(img_array)
class_label = "unicorn" if prediction[0] > 0.5 else "horse"
print(f"Das Modell sagt: {class_label} mit Wahrscheinlichkeit {prediction[0][0]:.2f}")

3.6 Saving Model

In [None]:
model_2_name = 'ML_DL_Gruppe_12_Model_2'

In [None]:
# Save Model as .h5-file
model.save(os.path.join('models',f'{model_2_name}.h5'))

In [None]:
# Save Model as .keras-file
model.save(os.path.join('models',f'{model_2_name}.keras'))

In [None]:
model = None