0. Use GPU for faster computing.

In [None]:
import tensorflow as tf
import torch
print("GPU available:", torch.cuda.is_available())
print("TensorFlow version:", tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))


1. Run on CLASSIC DATA

1.1 Import CLASSIC DATA

In [None]:
#FOR CLASSIC DATA ORDERING

import numpy as np
import pooch
import tensorflow as tf
import os
import random
import shutil
from pathlib import Path
import pandas as pd

# to make this notebook's output stable across runs
rnd_seed = 42
rnd_gen = np.random.default_rng(rnd_seed)

# Define paths
base_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Copy_full_fish_dataset')
#base_dir = Path(r'C:\Users\bapti\Downloads\Mask_creation')
output_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Data_ready')
#output_dir = Path(r'C:\Users\bapti\Downloads\Mask_data_ready')
train_dir = output_dir / 'Train'
valid_dir = output_dir / 'Valid'
test_dir = output_dir / 'Test'

# Ensure output directories exist
for path in [train_dir, valid_dir, test_dir]:
    path.mkdir(parents=True, exist_ok=True)

# Collect all images with metadata
data = []
test_data = []

for class_name in ["LEFT", "RIGHT"]:
    class_dir = base_dir / class_name
    for species_name in ["Grayling", "Trout"]:
        species_dir = class_dir / species_name
        for date_folder in species_dir.iterdir():
            if date_folder.is_dir():
                date = date_folder.name
                for image_file in date_folder.glob('*.*'):
                    # Store metadata with each image path
                    item = {
                        "path": image_file,
                        "class": class_name,
                        "species": species_name,
                        "date": date
                    }
                    # Assign test data based on specified conditions
                    if (species_name == "Grayling" and date == "Day_35") or \
                       (species_name == "Trout" and date == "Day_146"):
                        test_data.append(item)
                    else:
                        data.append(item)

# Shuffle data for random splitting
random.seed(123)  # For reproducibility
random.shuffle(data)

# Split remaining data into train (85%) and validation (15%)
total_count = len(data)
train_count = int(0.85 * total_count)

train_data = data[:train_count]
valid_data = data[train_count:]

# Convert lists of metadata to DataFrames for easier access
train_df = pd.DataFrame(train_data)
valid_df = pd.DataFrame(valid_data)
test_df = pd.DataFrame(test_data)

# Save metadata DataFrames to CSV files for future use
train_df.to_csv(output_dir / 'train_metadata.csv', index=False)
valid_df.to_csv(output_dir / 'valid_metadata.csv', index=False)
test_df.to_csv(output_dir / 'test_metadata.csv', index=False)

# Function to copy images to output directory with new structure
def copy_images(data_split, target_dir):
    for item in data_split:
        target_class_dir = target_dir / item["class"]
        target_species_dir = target_class_dir / item["species"]
        target_species_dir.mkdir(parents=True, exist_ok=True)
        
        # Copy image to the new directory, preserving original filename
        shutil.copy(item["path"], target_species_dir / item["path"].name)

# Copy images to respective folders
copy_images(train_data, train_dir)
copy_images(valid_data, valid_dir)
copy_images(test_data, test_dir)

# Load the datasets from the new directories
train_set = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir,
    image_size=(256, 256),  # Specify your desired image size
    batch_size=32,          # Specify your desired batch size
    seed=123,
)

valid_set = tf.keras.preprocessing.image_dataset_from_directory(
    valid_dir,
    image_size=(256, 256),
    batch_size=32,
    seed=123,
)

test_set = tf.keras.preprocessing.image_dataset_from_directory(
    test_dir,
    image_size=(256, 256),
    batch_size=32,
    seed=123,
)

train_set.name = 'Training'
valid_set.name = 'Validation'
test_set.name = 'Test'

# Print class names and dataset sizes
class_names = train_set.class_names
print("Class names:", class_names)
print("Training dataset size:", len(train_set))
print("Validation dataset size:", len(valid_set))
print("Test dataset size:", len(test_set))

# Print number of test images
print("\n","Total number of images in the input dataset:", "\n","train + valid:",len(data), "\n","test:",len(test_data))

In [None]:

# #FOLLOWING CODE FOR CLASSIC DATA WITHOUT ORDERING (IF THE ORDERING HAS ALREADY BEEN DONE WITH THE ABOVE CELL)

# from PIL import Image
# import tensorflow as tf
# import os
# import random
# import shutil
# from pathlib import Path
# import pandas as pd

# # Define paths
# base_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Copy_full_fish_dataset')
# #base_dir = Path(r'C:\Users\bapti\Downloads\Mask_creation')
# output_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Data_ready')
# #output_dir = Path(r'C:\Users\bapti\Downloads\Mask_data_ready')

# #For Ubuntu: 
# base_dir = Path('/mnt/c/Users/bapti/Downloads/ML_Project/Copy_full_fish_dataset')
# output_dir = Path('/mnt/c/Users/bapti/Downloads/ML_Project/Data_ready')

# train_dir = output_dir / 'Train'
# valid_dir = output_dir / 'Valid'
# test_dir = output_dir / 'Test'

# # Ensure output directories exist
# for path in [train_dir, valid_dir, test_dir]:
#     path.mkdir(parents=True, exist_ok=True)

# # Collect all images with metadata
# data = []
# test_data = []

# for class_name in ["LEFT", "RIGHT"]:
#     class_dir = base_dir / class_name
#     for species_name in ["Grayling", "Trout"]:
#         species_dir = class_dir / species_name
#         for date_folder in species_dir.iterdir():
#             if date_folder.is_dir():
#                 date = date_folder.name
#                 for image_file in date_folder.glob('*.*'):
#                     # Store metadata with each image path
#                     item = {
#                         "path": image_file,
#                         "class": class_name,
#                         "species": species_name,
#                         "date": date
#                     }
#                     # Assign test data based on specified conditions
#                     if (species_name == "Grayling" and date == "Day_35") or \
#                        (species_name == "Trout" and date == "Day_146"):
#                         test_data.append(item)
#                     else:
#                         data.append(item)

# #FOR CLASSIC DATA ORDERING

# import tensorflow as tf
# import os
# import random
# import shutil
# from pathlib import Path
# import pandas as pd

# # Define paths
# #base_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Copy_full_fish_dataset')
# base_dir = Path(r'C:\Users\bapti\Downloads\Mask_creation')
# #output_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Data_ready')
# output_dir = Path(r'C:\Users\bapti\Downloads\Mask_data_ready')
# train_dir = output_dir / 'Train'
# valid_dir = output_dir / 'Valid'
# test_dir = output_dir / 'Test'

# # Ensure output directories exist
# for path in [train_dir, valid_dir, test_dir]:
#     path.mkdir(parents=True, exist_ok=True)

# # Collect all images with metadata
# data = []
# test_data = []

# for class_name in ["LEFT", "RIGHT"]:
#     class_dir = base_dir / class_name
#     for species_name in ["Grayling", "Trout"]:
#         species_dir = class_dir / species_name
#         for date_folder in species_dir.iterdir():
#             if date_folder.is_dir():
#                 date = date_folder.name
#                 for image_file in date_folder.glob('*.*'):
#                     # Store metadata with each image path
#                     item = {
#                         "path": image_file,
#                         "class": class_name,
#                         "species": species_name,
#                         "date": date
#                     }
#                     # Assign test data based on specified conditions
#                     if (species_name == "Grayling" and date == "Day_35") or \
#                        (species_name == "Trout" and date == "Day_146"):
#                         test_data.append(item)
#                     else:
#                         data.append(item)

# # Shuffle data for random splitting
# random.seed(123)  # For reproducibility
# random.shuffle(data)

# # Split remaining data into train (85%) and validation (15%)
# total_count = len(data)
# train_count = int(0.85 * total_count)

# train_data = data[:train_count]
# valid_data = data[train_count:]

# # Convert lists of metadata to DataFrames for easier access
# train_df = pd.DataFrame(train_data)
# valid_df = pd.DataFrame(valid_data)
# test_df = pd.DataFrame(test_data)

# # Save metadata DataFrames to CSV files for future use
# train_df.to_csv(output_dir / 'train_metadata.csv', index=False)
# valid_df.to_csv(output_dir / 'valid_metadata.csv', index=False)
# test_df.to_csv(output_dir / 'test_metadata.csv', index=False)

# # Function to copy images to output directory with new structure
# def copy_images(data_split, target_dir):
#     for item in data_split:
#         target_class_dir = target_dir / item["class"]
#         target_species_dir = target_class_dir / item["species"]
#         target_species_dir.mkdir(parents=True, exist_ok=True)
        
#         # Copy image to the new directory, preserving original filename
#         shutil.copy(item["path"], target_species_dir / item["path"].name)

# # Copy images to respective folders
# copy_images(train_data, train_dir)
# copy_images(valid_data, valid_dir)
# copy_images(test_data, test_dir)

# # Load the datasets from the new directories
# train_set = tf.keras.preprocessing.image_dataset_from_directory(
#     train_dir,
#     image_size=(256, 256),  # Specify your desired image size
#     batch_size=32,          # Specify your desired batch size
#     seed=123,
# )

# valid_set = tf.keras.preprocessing.image_dataset_from_directory(
#     valid_dir,
#     image_size=(256, 256),
#     batch_size=32,
#     seed=123,
# )

# test_set = tf.keras.preprocessing.image_dataset_from_directory(
#     test_dir,
#     image_size=(256, 256),
#     batch_size=32,
#     seed=123,
# )

# train_set.name = 'Training'
# valid_set.name = 'Validation'
# test_set.name = 'Test'

# # Print class names and dataset sizes
# class_names = train_set.class_names
# print("Class names:", class_names)
# print("Training dataset size:", len(train_set))
# print("Validation dataset size:", len(valid_set))
# print("Test dataset size:", len(test_set))

# # Print number of test images
# print("\n","Total number of images in the input dataset:", "\n","train + valid:",len(data), "\n","test:",len(test_data))

1.2. Preprocess Data for Baseline and Augmented Baseline

In [144]:
#PREPROCESSING FOR THE BASELINE MODELS:
def preprocessing_function(image, label):

    num_classes = 2

    # Cast the image and label datatypes
    image = tf.cast(image, tf.float32)
    label = tf.cast(label,tf.int32)

    # Normalize the pixel values. Use a float value in the denominator!
    image = image / 255.0
    # Cast the label to int32 and one-hot encode
    label = tf.one_hot(label, num_classes)
    # Recast label to Float32
    label = tf.cast(label, tf.float32)

    return image, label

train = train_set.map(preprocessing_function)
validation = valid_set.map(preprocessing_function)
test = test_set.map(preprocessing_function)


1.3. Runs for the Baseline

1.3.1 Define Baseline callback

In [46]:
def get_CNN_logdir():
    time = np.datetime64('now').astype(str)[:-3].replace(':', '-')
    run_logdir = os.path.join(os.curdir, "Final_CNN_logs", f"Baseline_run_on_classic_data{time}") # time goes in the fstring
    return run_logdir


early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10)
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("Baseline_run_on_classic_data.keras",
                                                   save_best_only=True,
                                                   monitor='val_loss')
tensorboard_cb = tf.keras.callbacks.TensorBoard(get_CNN_logdir())

# Let's clear out the backend and set our random seeds.
# Consistency is key :)
keras.backend.clear_session()
tf.random.set_seed(rnd_seed)
np.random.seed(rnd_seed)


1.3.2 Build Baseline architecture

In [47]:
model = keras.models.Sequential([
    # Convolution 1
    keras.layers.Conv2D(filters=32, kernel_size=7, padding="same", activation="relu"),
    keras.layers.MaxPool2D((3,3)),

    # Convolution 2
    keras.layers.Conv2D(filters=64, kernel_size=5, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 3
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 4
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    keras.layers.Flatten(),
    keras.layers.Dense(4096, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(2, activation="softmax") #Change the last layer from 5 classes to 2 classes
])

In [48]:
model.build((None, 256 , 256, 3))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

1.3.3 Training the Baseline model

In [None]:
import datetime
import tensorflow as tf

# Adjust timestamp to avoid colons
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H_%M_%S")
log_dir = f".\\Final_CNN_logs\\Baseline_run_on_classic_data{timestamp}"

#For Ubuntu 
log_dir = Path(f"Final_CNN_logs/Baseline_run_on_classic_data{timestamp}")
log_dir_str = log_dir.as_posix()

# Define TensorBoard callback with corrected log directory path
tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

# Now run the model training
history = model.fit(train, # Training data generator
                    epochs=30,
                    validation_data=validation, # Validation data generator
                    callbacks=[early_stopping_cb,
                               checkpoint_cb,
                               tensorboard_cb])

1.4. Runs for the Augmented Baseline

1.4.1 Define callback Augmented Baseline

In [50]:
keras.backend.clear_session()
tf.random.set_seed(rnd_seed)
np.random.seed(rnd_seed)

In [51]:
early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10)
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("Augmented_run_on_classic_data.keras",
                                                   save_best_only=True,
                                                   monitor='val_loss')
tensorboard_cb = tf.keras.callbacks.TensorBoard(get_CNN_logdir())

1.4.2 Build Augmented Baseline architecture

In [52]:
aug_model = keras.models.Sequential([
    #keras.layers.RandomFlip(), # Flip augmentation removed 
    keras.layers.RandomRotation(0.08), # Rotation Aumentation
    #keras.layers.RandomBrightness([0.2,1.0]),
    keras.layers.RandomContrast(0.9),
    keras.layers.RandomTranslation(-0.08,0.08),
    keras.layers.GaussianNoise( 0.1 , seed =42),
    # Convolution 1
    keras.layers.Conv2D(filters=32, kernel_size=7, padding="same", activation="relu"),
    keras.layers.MaxPool2D((3,3)),

    # Convolution 2
    keras.layers.Conv2D(filters=64, kernel_size=5, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 3
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 4
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    keras.layers.Flatten(),
    keras.layers.Dense(4096, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(2, activation="softmax") #Change the last layer from 5 classes to 2 classes

    # Copy your previous model's layers here
])
aug_model.build((None, 256 , 256, 3))

In [53]:
import tensorflow
from tensorflow.keras.optimizers import Adam


# Set a custom learning rate
learning_rate = 0.001  # Try different values: 0.0001, 0.001, 0.005, etc.
optimizer = Adam(learning_rate=learning_rate)

#changed optimizer = "adam" to this:
aug_model.compile(loss="categorical_crossentropy",
              optimizer=optimizer,
              metrics=['accuracy'])

1.4.3 Training Augmented Baseline

In [None]:


import datetime

# Generate a valid directory path
log_dir = f".\\Final_CNN_logs\\Augmented_run_on_classic_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}"
#For Ubuntu
log_dir = Path(f"Final_CNN_logs/Augmented_run_on_classic_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}")
log_dir_str = log_dir.as_posix()
#
tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

history = aug_model.fit(train, # Training data generator
                    epochs=30,
                    validation_data=validation, # Validation data generator
                    callbacks=[early_stopping_cb,
                               checkpoint_cb,
                               tensorboard_cb])

1.5. Save the best models for Baseline and Augmented

In [None]:
import os
print(os.listdir())

current_directory = os.getcwd()
print(current_directory)
model.save('Baseline_run_on_classic_data.h5')
aug_model.save('Augmented_run_on_classic_data.h5')

# Let's load the models!
non_aug_model = keras.models.load_model(r'Baseline_run_on_classic_data.h5')
aug_model = keras.models.load_model(r'Augmented_run_on_classic_data.h5')

# And test them on the testing dataset
non_aug_model.evaluate(test)
aug_model.evaluate(test)

1.6. Preprocess for ResNet 

In [147]:
def preprocessing_function(image, label):
    num_classes = 2

    # Cast the image and label datatypes
    image = tf.cast(image, tf.float32)
    label = tf.cast(label,tf.int32)

    # Resize the image to (224, 224) for ResNet50
    image = tf.image.resize(image, (224, 224))
    
    # Normalize the pixel values. Use a float value in the denominator!
    image = image / 255.0

    # Cast the label to int32 and one-hot encode
    label = tf.one_hot(label, num_classes)
    # Recast label to Float32
    label = tf.cast(label, tf.float32)

    return image, label

train = train_set.map(preprocessing_function)
valid = valid_set.map(preprocessing_function)
test = test_set.map(preprocessing_function)

In [None]:

for images, labels in train.take(1):
  print(f'Images shape: {images.numpy().shape} Labels: {labels.numpy().shape}')

1.7. Run for ResNet


1.7.1 Define Callback

In [None]:
def get_CNN_logdir():
    time = np.datetime64('now').astype(str)[:-3].replace(':', '-')
    run_logdir = os.path.join(os.curdir, "Final_CNN_logs", f"ResNet_no_weight_run_on_classic_data{time}") # change "ResNet_no_weight_run_on_classic_data{time}" to f"ResNet_with_weight_run_on_classic_data{time}" 
    return run_logdir


early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10)
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("CNN_ResNet_no_weight_classic_data.keras", # change "CNN_ResNet_no_weight_classic_data.keras" to "CNN_ResNet_with_weight_classic_data.keras" 
                                                   save_best_only=True,
                                                   monitor='val_loss')
tensorboard_cb = tf.keras.callbacks.TensorBoard(get_CNN_logdir())


1.7.2 Import ResNet model with or without weights and train model

In [None]:
# Let's clear out the backend and set our random seeds.
# Consistency is key :)
keras.backend.clear_session()
tf.random.set_seed(rnd_seed)
np.random.seed(rnd_seed)

n_classes = 2

# Load ResNet50 with no top layers (without classification layers)
#base_model = keras.applications.resnet50.ResNet50(weights="imagenet", include_top=False, input_shape=(224, 224, 3)) #Change to Weight or No Weight
base_model = keras.applications.resnet50.ResNet50(weights=None, include_top=False, input_shape=(224, 224, 3))
avg = keras.layers.GlobalAveragePooling2D()(base_model.output)
output = keras.layers.Dense(n_classes, activation="softmax")(avg)
model = keras.Model(inputs=base_model.input, outputs=output)

# Freeze the base model
for layer in base_model.layers[-10:]:
    layer.trainable = False
# base_model.trainable = False

inputs = train.map(lambda image, label: (image, label))


optimizer = keras.optimizers.SGD(learning_rate=0.01, momentum=0.9, decay=0.01) 
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) 

import datetime

# Generate a valid directory path

log_dir = f".\\Final_CNN_logs\\ResNet_no_weight_run_on_classic_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}" # change "ResNet_no_weight_run_on_classic_data{time}" to f"ResNet_with_weight_run_on_classic_data{time}" 

#For Ubuntu
log_dir = Path(f"Final_CNN_logs/ResNet_no_weight_run_on_classic_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}") # change "ResNet_no_weight_run_on_classic_data{time}" to f"ResNet_with_weight_run_on_classic_data{time}" 
log_dir_str = log_dir.as_posix()

tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)
history = model.fit(train, epochs=30, validation_data=valid, callbacks=[early_stopping_cb,checkpoint_cb,tensorboard_cb])



1.7.3 Save best ResNet model with or without weights 

In [None]:
model.save('CNN_ResNet_no_weight_classic_data.h5')
#model.save('CNN_ResNet_with_weight_classic_data.h5')

2. Run on Mask Data

2.1. Import Mask Data

In [None]:
#FOR MASK DATA ORDERING

import numpy as np
import pooch
import tensorflow as tf
import os
import random
import shutil
from pathlib import Path
import pandas as pd

# to make this notebook's output stable across runs
rnd_seed = 42
rnd_gen = np.random.default_rng(rnd_seed)

# Define paths
#base_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Copy_full_fish_dataset')
base_dir = Path(r'C:\Users\bapti\Downloads\Mask_creation')
#output_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Data_ready')
output_dir = Path(r'C:\Users\bapti\Downloads\Mask_data_ready')
train_dir = output_dir / 'Train'
valid_dir = output_dir / 'Valid'
test_dir = output_dir / 'Test'

# Ensure output directories exist
for path in [train_dir, valid_dir, test_dir]:
    path.mkdir(parents=True, exist_ok=True)

# Collect all images with metadata
data = []
test_data = []

for class_name in ["LEFT", "RIGHT"]:
    class_dir = base_dir / class_name
    for species_name in ["Grayling", "Trout"]:
        species_dir = class_dir / species_name
        for date_folder in species_dir.iterdir():
            if date_folder.is_dir():
                date = date_folder.name
                for image_file in date_folder.glob('*.*'):
                    # Store metadata with each image path
                    item = {
                        "path": image_file,
                        "class": class_name,
                        "species": species_name,
                        "date": date
                    }
                    # Assign test data based on specified conditions
                    if (species_name == "Grayling" and date == "Day_35") or \
                       (species_name == "Trout" and date == "Day_146"):
                        test_data.append(item)
                    else:
                        data.append(item)

# Shuffle data for random splitting
random.seed(123)  # For reproducibility
random.shuffle(data)

# Split remaining data into train (85%) and validation (15%)
total_count = len(data)
train_count = int(0.85 * total_count)

train_data = data[:train_count]
valid_data = data[train_count:]

# Convert lists of metadata to DataFrames for easier access
train_df = pd.DataFrame(train_data)
valid_df = pd.DataFrame(valid_data)
test_df = pd.DataFrame(test_data)

# Save metadata DataFrames to CSV files for future use
train_df.to_csv(output_dir / 'train_metadata.csv', index=False)
valid_df.to_csv(output_dir / 'valid_metadata.csv', index=False)
test_df.to_csv(output_dir / 'test_metadata.csv', index=False)

# Function to copy images to output directory with new structure
def copy_images(data_split, target_dir):
    for item in data_split:
        target_class_dir = target_dir / item["class"]
        target_species_dir = target_class_dir / item["species"]
        target_species_dir.mkdir(parents=True, exist_ok=True)
        
        # Copy image to the new directory, preserving original filename
        shutil.copy(item["path"], target_species_dir / item["path"].name)

# Copy images to respective folders
copy_images(train_data, train_dir)
copy_images(valid_data, valid_dir)
copy_images(test_data, test_dir)

# Load the datasets from the new directories
train_set = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir,
    image_size=(256, 256),  # Specify your desired image size
    batch_size=32,          # Specify your desired batch size
    seed=123,
)

valid_set = tf.keras.preprocessing.image_dataset_from_directory(
    valid_dir,
    image_size=(256, 256),
    batch_size=32,
    seed=123,
)

test_set = tf.keras.preprocessing.image_dataset_from_directory(
    test_dir,
    image_size=(256, 256),
    batch_size=32,
    seed=123,
)

train_set.name = 'Training'
valid_set.name = 'Validation'
test_set.name = 'Test'

# Print class names and dataset sizes
class_names = train_set.class_names
print("Class names:", class_names)
print("Training dataset size:", len(train_set))
print("Validation dataset size:", len(valid_set))
print("Test dataset size:", len(test_set))

# Print number of test images
print("\n","Total number of images in the input dataset:", "\n","train + valid:",len(data), "\n","test:",len(test_data))

In [None]:
# #FOLLOWING CODE FOR CLASSIC DATA WITHOUT ORDERING (IF THE ORDERING HAS ALREADY BEEN DONE WITH THE ABOVE CELL)

# #FOR CLASSIC DATA WITHOUT ORDERING
# from PIL import Image
# import tensorflow as tf
# import os
# import random
# import shutil
# from pathlib import Path
# import pandas as pd

# # Define paths
# #base_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Copy_full_fish_dataset')
# base_dir = Path(r'C:\Users\bapti\Downloads\Mask_creation')
# #output_dir = Path(r'C:\Users\bapti\Downloads\ML_Project\Data_ready')
# output_dir = Path(r'C:\Users\bapti\Downloads\Mask_data_ready')
# #For Ubuntu: 
# base_dir = Path('/mnt/c/Users/bapti/Downloads/Mask_creation')
# output_dir = Path('/mnt/c/Users/bapti/Downloads/Mask_data_ready')

# train_dir = output_dir / 'Train'
# valid_dir = output_dir / 'Valid'
# test_dir = output_dir / 'Test'

# # Ensure output directories exist
# for path in [train_dir, valid_dir, test_dir]:
#     path.mkdir(parents=True, exist_ok=True)

# # Collect all images with metadata
# data = []
# test_data = []

# for class_name in ["LEFT", "RIGHT"]:
#     class_dir = base_dir / class_name
#     for species_name in ["Grayling", "Trout"]:
#         species_dir = class_dir / species_name
#         for date_folder in species_dir.iterdir():
#             if date_folder.is_dir():
#                 date = date_folder.name
#                 for image_file in date_folder.glob('*.*'):
#                     # Store metadata with each image path
#                     item = {
#                         "path": image_file,
#                         "class": class_name,
#                         "species": species_name,
#                         "date": date
#                     }
#                     # Assign test data based on specified conditions
#                     if (species_name == "Grayling" and date == "Day_35") or \
#                        (species_name == "Trout" and date == "Day_146"):
#                         test_data.append(item)
#                     else:
#                         data.append(item)

# # Shuffle data for random splitting
# random.seed(123)  # For reproducibility
# random.shuffle(data)

# # Split remaining data into train (85%) and validation (15%)
# total_count = len(data)
# train_count = int(0.85 * total_count)

# train_data = data[:train_count]
# valid_data = data[train_count:]

# # Load the datasets from the new directories
# train_set = tf.keras.preprocessing.image_dataset_from_directory(
#     train_dir,
#     image_size=(256, 256),  # Specify your desired image size
#     batch_size=32,          # Specify your desired batch size
#     seed=123,
# )

# valid_set = tf.keras.preprocessing.image_dataset_from_directory(
#     valid_dir,
#     image_size=(256, 256),
#     batch_size=32,
#     seed=123,
# )

# test_set = tf.keras.preprocessing.image_dataset_from_directory(
#     test_dir,
#     image_size=(256, 256),
#     batch_size=32,
#     seed=123,
# )

# train_set.name = 'Training'
# valid_set.name = 'Validation'
# test_set.name = 'Test'

# # Print class names and dataset sizes
# class_names = train_set.class_names
# print("Class names:", class_names)
# print("Training dataset size:", len(train_set))
# print("Validation dataset size:", len(valid_set))
# print("Test dataset size:", len(test_set))

# # Print number of test images
# print("\n","Total number of images in the input dataset:", "\n","train + valid:",len(data), "\n","test:",len(test_data))


2.2 Preprocess for ResNet models

In [None]:

def preprocessing_function(image, label):
    # We're going to hard code the image size we want to use. We can define this
    # with a lambda function, but we won't really need to change this and it's
    # more trouble than it's worth for us right now :)
    # image_size = 128
    num_classes = 2

    # Cast the image and label datatypes
    image = tf.cast(image, tf.float32)
    label = tf.cast(label,tf.int32)

    # Resize the image to (224, 224) for ResNet50
    image = tf.image.resize(image, (224, 224))
    
    # Normalize the pixel values. Use a float value in the denominator!
    image = image / 255.0

    # Resize the image
    # image = tf.image.resize(image, ( image_size,  image_size))

    # Cast the label to int32 and one-hot encode
    label = tf.one_hot(label, num_classes)
    # Recast label to Float32
    label = tf.cast(label, tf.float32)

    return image, label

train = train_set.map(preprocessing_function)
validation = valid_set.map(preprocessing_function)
test = test_set.map(preprocessing_function)

2.3. Run for ResNet

2.3.1 Define callbacks

In [None]:
def get_CNN_logdir():
    time = np.datetime64('now').astype(str)[:-3].replace(':', '-')
    run_logdir = os.path.join(os.curdir, "Final_CNN_logs", f"ResNet_no_weight_run_on_mask_data{time}") # Replace "ResNet_no_weight_run_on_mask_data{time}" with "ResNet_with_weight_run_on_mask_data{time}" for training with weights
    return run_logdir


early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10)
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("CNN_ResNet_no_weight_mask_data.keras", #Replace "CNN_ResNet_no_weight_mask_data.keras" with "CNN_ResNet_with_weight_mask_data.keras" 
                                                   save_best_only=True,
                                                   monitor='val_loss')
tensorboard_cb = tf.keras.callbacks.TensorBoard(get_CNN_logdir())


2.3.2 Import ResNet model with or without weights and train model

In [None]:
# Let's clear out the backend and set our random seeds.
# Consistency is key :)
keras.backend.clear_session()
tf.random.set_seed(rnd_seed)
np.random.seed(rnd_seed)

n_classes = 2

# Load ResNet50 with no top layers (without classification layers)
base_model = keras.applications.resnet50.ResNet50(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
avg = keras.layers.GlobalAveragePooling2D()(base_model.output)
output = keras.layers.Dense(n_classes, activation="softmax")(avg)
model = keras.Model(inputs=base_model.input, outputs=output)

# Freeze the base model
for layer in base_model.layers[-10:]:
    layer.trainable = False
# base_model.trainable = False

inputs = train.map(lambda image, label: (image, label))


optimizer = keras.optimizers.SGD(learning_rate=0.01, momentum=0.9, decay=0.01) 
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) 

import datetime

# Generate a valid directory path

log_dir = f".\\Final_CNN_logs\\ResNet_no_weight_run_on_mask_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}" # Replace "ResNet_no_weight_run_on_mask_data" with "ResNet_with_weight_run_on_mask_data" for training with weights
#For Ubuntu
log_dir = Path(f"Final_CNN_logs/ResNet_no_weight_run_on_mask_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}") # Replace "ResNet_no_weight_run_on_mask_data" with "ResNet_with_weight_run_on_mask_data" for training with weights
log_dir_str = log_dir.as_posix()
tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)
history = model.fit(train, epochs=30, validation_data=valid, callbacks=[early_stopping_cb,checkpoint_cb,tensorboard_cb])


2.3.3 Save ResNet models with and without weights

In [None]:
model.save('CNN_ResNet_no_weight_mask_data.h5')
#model.save('CNN_ResNet_with_weight_mask_data.h5')

2.3.4 Optional, Load the models and evaluate

In [None]:
import os
print(os.listdir())

current_directory = os.getcwd()
print(current_directory)

# #aug_model = keras.models.load_model('path/to/your/directory/CNN_augmented.h5')
classic_model = keras.models.load_model(r'CNN_ResNet_no_weight_mask_data.keras')
mask_model = keras.models.load_model(r'CNN_ResNet_no_weight_classic_data.keras')

# And test them on the testing dataset
classic_model.evaluate(test)
mask_model.evaluate(test)


In [None]:
# # To visualize Model performance on Tensorboard:
#%tensorboard --logdir=./Final_CNN_logs --port=6006

2.4 Preprocess for Baseline and Augmented Baseline models

In [None]:
import numpy as np
# BASELINE RUN ON MASK DATA:

 #PREPROCESSING FOR THE BASELINE MODELS:
def preprocessing_function(image, label):
    num_classes = 2

    # Cast the image and label datatypes
    image = tf.cast(image, tf.float32)
    label = tf.cast(label,tf.int32)

    # Normalize the pixel values. Use a float value in the denominator!
    image = image / 255.0

    # Cast the label to int32 and one-hot encode
    label = tf.one_hot(label, num_classes)
    # Recast label to Float32
    label = tf.cast(label, tf.float32)

    return image, label

train = train_set.map(preprocessing_function)
validation = valid_set.map(preprocessing_function)
test = test_set.map(preprocessing_function)


2.5. Runs for Baseline model

2.5.1 Define callback

In [None]:
def get_CNN_logdir():
    time = np.datetime64('now').astype(str)[:-3].replace(':', '-')
    run_logdir = os.path.join(os.curdir, "Final_CNN_logs", f"Baseline_run_on_mask_data{time}") # time goes in the fstring
    return run_logdir


early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10)
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("Baseline_run_on_mask_data.keras",
                                                   save_best_only=True,
                                                   monitor='val_loss')
tensorboard_cb = tf.keras.callbacks.TensorBoard(get_CNN_logdir())


2.5.2 Build Baseline Architecture

In [None]:
# Let's clear out the backend and set our random seeds.
# Consistency is key :)
keras.backend.clear_session()
tf.random.set_seed(rnd_seed)
np.random.seed(rnd_seed)

model = keras.models.Sequential([
    # Convolution 1
    keras.layers.Conv2D(filters=32, kernel_size=7, padding="same", activation="relu"),
    keras.layers.MaxPool2D((3,3)),

    # Convolution 2
    keras.layers.Conv2D(filters=64, kernel_size=5, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 3
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 4
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    keras.layers.Flatten(),
    keras.layers.Dense(4096, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(2, activation="softmax") #Change the last layer from 5 classes to 2 classes
])

model.build((None, 256 , 256, 3))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

import datetime



2.5.3 train Baseline Architecure

In [None]:
# Adjust timestamp to avoid colons
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H_%M_%S")
log_dir = f".\\Final_CNN_logs\\Baseline_run_on_mask_data{timestamp}"
#For Ubuntu
log_dir = Path(f"Final_CNN_logs/Baseline_run_on_mask_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}")
log_dir_str = log_dir.as_posix()

# Define TensorBoard callback with corrected log directory path
tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

# Now run the model training
history = model.fit(train, # Training data generator
                    epochs=30,
                    validation_data=validation, # Validation data generator
                    callbacks=[early_stopping_cb,
                               checkpoint_cb,
                               tensorboard_cb])

2.6. Runs for Augmented Baseline model

2.6.1 Define callback

In [None]:
def get_CNN_logdir():
    time = np.datetime64('now').astype(str)[:-3].replace(':', '-')
    run_logdir = os.path.join(os.curdir, "Final_CNN_logs", f"Augmented_run_on_mask_data{time}") # time goes in the fstring
    return run_logdir

early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10)
checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("Augmnented_run_on_mask_data.keras",
                                                   save_best_only=True,
                                                   monitor='val_loss')
tensorboard_cb = tf.keras.callbacks.TensorBoard(get_CNN_logdir())


2.6.2 Build Augmented Baseline Architecture

In [None]:
# AUGMENTED RUN ON MASK DATA

aug_model = keras.models.Sequential([
    #keras.layers.RandomFlip(), # Flip augmentation removed 
    keras.layers.RandomRotation(0.08), # Rotation Aumentation
    #keras.layers.RandomBrightness([0.2,1.0]),
    keras.layers.RandomContrast(0.9),
    keras.layers.RandomTranslation(-0.08,0.08),
    # Convolution 1
    keras.layers.Conv2D(filters=32, kernel_size=7, padding="same", activation="relu"),
    keras.layers.MaxPool2D((3,3)),

    # Convolution 2
    keras.layers.Conv2D(filters=64, kernel_size=5, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 3
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    # Convolution 4
    keras.layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
    keras.layers.MaxPool2D((2,2)),


    keras.layers.Flatten(),
    keras.layers.Dense(4096, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(2, activation="softmax") #Change the last layer from 5 classes to 2 classes

    # Copy your previous model's layers here
])
aug_model.build((None, 256 , 256, 3))

from tensorflow.keras.optimizers import Adam

# Set a custom learning rate
learning_rate = 0.001  # Try different values: 0.0001, 0.001, 0.005, etc.
optimizer = Adam(learning_rate=learning_rate)

#changed optimizer = "adam" to this:
aug_model.compile(loss="categorical_crossentropy",
              optimizer=optimizer,
              metrics=['accuracy'])

2.6.3 Train Augmented baseline

In [None]:
import datetime

# Generate a valid directory path
log_dir = f".\\Final_CNN_logs\\Augmented_run_on_mask_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}"
#For Ubuntu
log_dir = Path(f"Final_CNN_logs/Augmented_run_on_mask_data{datetime.datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}")
log_dir_str = log_dir.as_posix()
tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

history = aug_model.fit(train, # Training data generator
                    epochs=30,
                    validation_data=validation, # Validation data generator
                    callbacks=[early_stopping_cb,
                               checkpoint_cb,
                               tensorboard_cb])

2.7 Save models

In [None]:
import os
print(os.listdir())

current_directory = os.getcwd()
print(current_directory)
model.save('Baseline_run_on_mask_data.h5')
aug_model.save('Augmented_run_on_mask_data.h5')


2.8 Optional Load the models and evaluate their performance

In [None]:
non_aug_model = keras.models.load_model(r'Baseline_run_on_mask_data.h5')
aug_model = keras.models.load_model(r'Augmented_run_on_mask_data.h5')

# And test them on the testing dataset
# non_aug_model.evaluate(test)
# aug_model.evaluate(test)

In [None]:
from tensorflow.keras.utils import image_dataset_from_directory

#"Test the model on new data:

##Test on "mask data" (if trained on classic data)
#data_to_test = r"C:\Users\bapti\Downloads\ML_Project\Data_ready\Test"
##Test on "classic data" (if trained on mask data)

path_data_to_test = r"C:\Users\bapti\Downloads\Mask_data_ready\Test"
#For UBUNTU:
#On mask
path_data_to_test = Path('/mnt/c/Users/bapti/Downloads/Mask_data_ready/Test')
#On classic
path_data_to_test = Path('/mnt/c/Users/bapti/Downloads/ML_Project/Data_ready/Test')

test_dataset = image_dataset_from_directory(
    path_data_to_test,
    image_size=(256, 256), #CHANGE RESIER TO SIZE ACCEPTED BY MODEL
    batch_size=16,
    shuffle=False  # Do not shuffle if the dataset order matters
)
def one_hot_encode_labels(images, labels):
    labels = tf.one_hot(labels, depth=2)
    return images, labels
test_dataset = test_dataset.map(one_hot_encode_labels)

current_directory = os.getcwd()
# #Which model?
# model_ready =  keras.models.load_model(r'CNN_ResNet_classic_data.h5')
# model_ready =  keras.models.load_model(r'CNN_ResNet_mask_data.h5')
model_ready = keras.models.load_model(r'Baseline_run_on_mask_data.h5')

# Evaluate the augmented model
aug_results = model_ready.evaluate(test) 
print("Augmented Model - Test Loss:", aug_results[0])
print("Augmented Model - Test Accuracy:", aug_results[1])

# Evaluate the non-augmented model
non_aug_results = model.evaluate(test_dataset)
print("Non-Augmented Model - Test Loss:", non_aug_results[0])
print("Non-Augmented Model - Test Accuracy:", non_aug_results[1])


In [None]:
#EVALUATE MODELS ON TEST SETS:

baseline_model_ready_classic = keras.models.load_model(r'Baseline_run_on_classic_data.h5')
baseline_model_ready_mask = keras.models.load_model(r'Baseline_run_on_mask_data.h5')
Augmented_model_ready_classic = keras.models.load_model(r'Augmented_run_on_classic_data.h5')
Augmented_model_ready_mask = keras.models.load_model(r'Augmented_run_on_mask_data.h5')
ResNet_no_weight_model_ready_classic = keras.models.load_model(r'CNN_ResNet_no_weight_classic_data.h5')
ResNet_no_weight_model_ready_mask = keras.models.load_model(r'CNN_ResNet_no_weight_mask_data.h5')
ResNet_weight_model_ready_classic = keras.models.load_model(r'CNN_ResNet_classic_data.h5')
ResNet_weight_model_ready_mask = keras.models.load_model(r'CNN_ResNet_mask_data.h5')


#PREPROCESSING FOR THE BASELINE MODELS:
def preprocessing_function_256(image, label):
    num_classes = 2

    # Cast the image and label datatypes
    image = tf.cast(image, tf.float32)
    label = tf.cast(label,tf.int32)

    # Normalize the pixel values. Use a float value in the denominator!
    image = image / 255.0

    # Cast the label to int32 and one-hot encode
    label = tf.one_hot(label, num_classes)
    # Recast label to Float32
    label = tf.cast(label, tf.float32)

    return image, label

#PREPROCESSING FOR THE RESNET:

def preprocessing_function_224(image, label):

    num_classes = 2

    # Cast the image and label datatypes
    image = tf.cast(image, tf.float32)
    label = tf.cast(label,tf.int32)

    # Resize the image to (224, 224) for ResNet50
    image = tf.image.resize(image, (224, 224))
    
    # Normalize the pixel values. Use a float value in the denominator!
    image = image / 255.0

    # Cast the label to int32 and one-hot encode
    label = tf.one_hot(label, num_classes)
    # Recast label to Float32
    label = tf.cast(label, tf.float32)

    return image, label



test_256 = test_set.map(preprocessing_function_256)
test_224 = test_set.map(preprocessing_function_224)

baseline_model_ready_classic.evaluate(test_256) 
baseline_model_ready_mask.evaluate(test_256) 
Augmented_model_ready_classic.evaluate(test_256) 
Augmented_model_ready_mask.evaluate(test_256) 
ResNet_no_weight_model_ready_classic.evaluate(test_224) 
ResNet_no_weight_model_ready_mask.evaluate(test_224) 
ResNet_weight_model_ready_classic.evaluate(test_224) 
ResNet_weight_model_ready_mask.evaluate(test_224) 


3. Salency Maps

In [None]:
# # First test of Salency maps, kept for personnal interest. You may want to run it by curiosity

# from tensorflow import keras
# from tensorflow.keras.models import load_model
# from tensorflow.keras.preprocessing.image import img_to_array, load_img
# from tf_keras_vis.saliency import Saliency
# from tf_keras_vis.utils import normalize
# from tf_keras_vis.utils.scores import CategoricalScore
# import matplotlib.pyplot as plt
# import tensorflow as tf

# # Load your model
# # model_ready = load_model(r'CNN_ResNet_classic_data.h5')
# # model_ready =  keras.models.load_model(r'CNN_ResNet_classic_data.h5')
# # model_ready =  keras.models.load_model(r'CNN_ResNet_classic_data.h5')
# # model_ready =  keras.models.load_model(r'CNN_ResNet_mask_data.h5')
# model_ready = keras.models.load_model(r'Baseline_run_on_mask_data.h5')
# # Prepare the image
# path_data_to_test = r"C:\Users\bapti\Downloads\Mask_data_ready\Test\LEFT\Trout\IMG_7946_processed.jpg"

# #For UBUNTU:
# path_data_to_test = Path('/mnt/c/Users/bapti/Downloads/Mask_data_ready/Test/LEFT/Trout/IMG_7946_processed.jpg')
# # path_data_to_test = Path('/mnt/c/Users/bapti/Downloads/ML_Project/Data_ready/Test/LEFT/Trout/IMG_7946.jpg')
# print(model_ready.input_shape)

# img = load_img(path_data_to_test, target_size=(256, 256)) # CHANGE THE RESIZE FOR THE CORECT MODEL + HAVE THE RIGHT PREPROCESSING FUNCTION
# x = img_to_array(img)  # Convert to numpy array
# x = x.reshape((1,) + x.shape)  # Add batch dimension
# x = x / 255.0  # Normalize image data if your model expects normalized inputs

# # Convert the last activation layer to linear
# model_ready.layers[-1].activation = None  # For TensorFlow 2.x compatibility

# # Define the class index you want to visualize
# class_index = 0  # Change this to match your target class index
# score = CategoricalScore([class_index])

# # Create Saliency object
# saliency = Saliency(model_ready, clone=False)

# # Generate saliency map
# saliency_map = saliency(score, x, smooth_samples=20, smooth_noise=0.2)
# saliency_map = normalize(saliency_map)  # Normalize values for visualization

# # Plot saliency map
# plt.figure(figsize=(5, 4))
# plt.imshow(saliency_map[0], cmap='hot')
# plt.axis('off')
# plt.show()


3.1 Load the test dataset

In [None]:
import time
from pathlib import Path
from PIL import Image
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tf_keras_vis.saliency import Saliency
from tf_keras_vis.utils import normalize
from tf_keras_vis.utils.scores import CategoricalScore

# Start the timer
start_time = time.time()

# Define the base directory for the test data
base_dir = Path('/mnt/c/Users/bapti/Downloads/ML_Project/Data_ready/Test')
#base_dir = Path('/mnt/c/Users/bapti/Downloads/Mask_data_ready/Test')

# Initialize an empty list to hold test data
test_data = []

# Check if the base directory exists
if not base_dir.exists():
    raise ValueError(f"Base directory does not exist: {base_dir}")

# Loop through the 'LEFT' and 'RIGHT' directories
for class_name in ["LEFT", "RIGHT"]:
    class_dir = base_dir / class_name
    print(f"Checking class directory: {class_dir}")
    
    if not class_dir.exists():
        print(f"Class directory does not exist: {class_dir}")
        continue

    for species_name in ["Grayling", "Trout"]:
        species_dir = class_dir / species_name
        print(f"Checking species directory: {species_dir}")
        
        if not species_dir.exists():
            print(f"Species directory does not exist: {species_dir}")
            continue
        
        # List all image files in the directory
        for image_file in species_dir.glob('*.*'):
            if image_file.is_file():  # Ensure it's an actual file
                item = {
                    "path": image_file,
                    "class": class_name,
                    "species": species_name
                }
                test_data.append(item)

# Check if test_data is populated
print(f"Number of test data items: {len(test_data)}")


3.2 Load the model (you may manualy chose whichever you prefer)

In [None]:
# Load model
model_ready = keras.models.load_model(r'CNN_ResNet_classic_data.h5')

3.3 Preprocess the data (you may change adapt the target size to 224 for ResNet and 256 for Baseline and Augmented baseline models)

In [None]:
# Ensure test data is populated correctly
if len(test_data) == 0:
    raise ValueError("Test data is empty. Please check the data loading process.")

# Image preprocessing function
def image_path_to_array(image_path, target_size=(224, 224)):
    image = Image.open(image_path)
    image = image.resize(target_size)  # Resize to the target size
    image_array = np.array(image)  # Convert to numpy array
    image_array = image_array / 255.0  # Normalize to [0, 1]
    return image_array

# Get the actual images and their corresponding filenames
test_images = np.array([image_path_to_array(item["path"]) for item in test_data])  # Load images and convert to arrays
print(f"Shape of test_images before reshape: {test_images.shape}")

# Ensure the test_images are in the correct shape (num_samples, 256, 256, 3)
test_images = test_images.reshape(-1, 224, 224, 3)  # Explicitly reshape
print(f"Shape of test_images after reshape: {test_images.shape}")

filenames = [item["path"].name for item in test_data]  # Extract filenames for reference

# Convert class labels to 0 (LEFT) and 1 (RIGHT)
test_labels = [1 if item["class"] == "RIGHT" else 0 for item in test_data]
print(f"Number of test labels: {len(test_labels)}")


3.4 Get the best and worst predictions

In [None]:
# Get model predictions
predictions = model_ready.predict(test_images)
predicted_classes = np.argmax(predictions, axis=1)  # Convert to class indices

# Check if predicted_classes has the correct shape
print(f"Shape of predicted_classes: {predicted_classes.shape}")
if len(predicted_classes) != len(test_labels):
    raise ValueError(f"Mismatch between predicted classes and test labels. "
                     f"Predicted: {len(predicted_classes)}, Labels: {len(test_labels)}")

# Correct and wrong predictions
correct_preds = predicted_classes == test_labels
wrong_preds = ~correct_preds

# Calculate best and worst confidence
best_idx = None
worst_idx = None

if np.any(correct_preds):
    best_confidence = predictions[correct_preds, predicted_classes[correct_preds]]
    global_correct_indices = np.where(correct_preds)[0]
    best_idx = global_correct_indices[np.argmax(best_confidence)]
else:
    print("No correct predictions found.")

if np.any(wrong_preds):
    worst_confidence = predictions[wrong_preds, predicted_classes[wrong_preds]]
    global_wrong_indices = np.where(wrong_preds)[0]
    worst_idx = global_wrong_indices[np.argmin(worst_confidence)]
else:
    print("No wrong predictions found.")


# Define class mapping
class_mapping = {0: "LEFT", 1: "RIGHT"}

# Display best and worst prediction details
if best_idx is not None:
    best_image = test_images[best_idx]
    best_label = test_labels[best_idx]
    best_filename = filenames[best_idx]
    best_pred_class = predicted_classes[best_idx]
    best_pred_value = predictions[best_idx][best_pred_class]

    print(f"Best Prediction Details:")
    print(f"Index: {best_idx}, True Label: {class_mapping[best_label]}, Predicted Class: {class_mapping[best_pred_class]}")
    print(f"Confidence: {best_pred_value:.4f}, Filename: {best_filename}")

if worst_idx is not None:
    worst_image = test_images[worst_idx]
    worst_label = test_labels[worst_idx]
    worst_filename = filenames[worst_idx]
    worst_pred_class = predicted_classes[worst_idx]
    worst_pred_value = predictions[worst_idx][worst_pred_class]

    print(f"Worst Prediction Details:")
    print(f"Index: {worst_idx}, True Label: {class_mapping[worst_label]}, Predicted Class: {class_mapping[worst_pred_class]}")
    print(f"Confidence: {worst_pred_value:.4f}, Filename: {worst_filename}")

# Find all indices with the highest confidence among correct predictions (best case)
if np.any(correct_preds):
    best_confidence = predictions[correct_preds, predicted_classes[correct_preds]]
    global_correct_indices = np.where(correct_preds)[0]
    best_idx = global_correct_indices[np.argmax(best_confidence)]
    # Find indices where confidence matches the highest value
    duplicate_best_indices = global_correct_indices[best_confidence == max_confidence]
    
    # Print the number of "best" predictions
    num_best_predictions = len(duplicate_best_indices)
    print(f"Number of best predictions with the highest confidence ({max_confidence:.4f}): {num_best_predictions}")
    
    # Optional: Print their details if needed
    if num_best_predictions > 1:
        print("Details of the best predictions:")
        for idx in duplicate_best_indices:
            print(f" - Filename: {filenames[idx]}, True Label: {class_mapping[test_labels[idx]]}, "
                  f"Predicted: {class_mapping[predicted_classes[idx]]}")
else:
    print("No correct predictions found to determine 'best' cases.")
    # Check for duplicates
    max_confidence = np.max(best_confidence)
    duplicate_best_indices = global_correct_indices[best_confidence == max_confidence]
    
    if len(duplicate_best_indices) > 1:
        print(f"Multiple best predictions found with the same confidence ({max_confidence:.4f}):")
        for idx in duplicate_best_indices:
            print(f" - Filename: {filenames[idx]}, True Label: {class_mapping[test_labels[idx]]}, "
                  f"Predicted: {class_mapping[predicted_classes[idx]]}")
    else:
        print(f"No duplicate best predictions found.")

# Find all indices with the lowest confidence among incorrect predictions (worst case)
if np.any(wrong_preds):
    worst_confidence = predictions[wrong_preds, predicted_classes[wrong_preds]]
    global_wrong_indices = np.where(wrong_preds)[0]
    worst_idx = global_wrong_indices[np.argmin(worst_confidence)]
    
    # Check for duplicates
    min_confidence = np.min(worst_confidence)
    duplicate_worst_indices = global_wrong_indices[worst_confidence == min_confidence]
    
    if len(duplicate_worst_indices) > 1:
        print(f"Multiple worst predictions found with the same confidence ({min_confidence:.4f}):")
        for idx in duplicate_worst_indices:
            print(f" - Filename: {filenames[idx]}, True Label: {class_mapping[test_labels[idx]]}, "
                  f"Predicted: {class_mapping[predicted_classes[idx]]}")
    else:
        print(f"No duplicate worst predictions found.")


3.5 Plot the Saliency maps 

In [None]:
# Define function to plot saliency map
def plot_saliency_map(image, model, class_index, smooth_samples=20, smooth_noise=0.2):
    x = image.reshape((1,) + image.shape)  # Add batch dimension
    
    # Ensure normalization
    if x.max() > 1:
        x = x / 255.0  # Normalize if required
    
    model.layers[-1].activation = None  # Set the last layer activation to linear for saliency
    
    score = CategoricalScore([class_index])
    saliency = Saliency(model, clone=False)
    
    saliency_map = saliency(score, x, smooth_samples=smooth_samples, smooth_noise=smooth_noise)
    saliency_map = normalize(saliency_map)  # Normalize for visualization
    
    # Rescale and prepare for visualization
    saliency_map_rescaled = (saliency_map[0] - saliency_map[0].min()) / (saliency_map[0].max() - saliency_map[0].min())
    saliency_map_rescaled = np.clip(saliency_map_rescaled * 1.5, 0, 1)
    saliency_map_rescaled = np.uint8(255 * saliency_map_rescaled)
    
    # Create a red-highlighted colormap
    saliency_colormap = np.zeros((*saliency_map_rescaled.shape, 3), dtype=np.uint8)
    saliency_colormap[..., 0] = saliency_map_rescaled
    
    # Original image preparation
    original_image = np.clip(x[0], 0, 1)  # Clamp values to [0, 1]
    original_image = np.uint8(original_image * 255)  # Scale to 0-255
    
    # Blend original image and saliency map
    alpha = 0.5
    blended = np.uint8(original_image * (1 - alpha) + saliency_colormap * alpha)
    
    # Plot original, saliency map, and overlay
    plt.figure(figsize=(15, 5))
    plt.subplot(1, 3, 1)
    plt.title('Original Image')
    plt.imshow(original_image.astype('uint8'))
    plt.axis('off')
    
    plt.subplot(1, 3, 2)
    plt.title('Saliency Map')
    plt.imshow(saliency_map[0], cmap='hot')
    plt.axis('off')
    
    plt.subplot(1, 3, 3)
    plt.title('Overlay with Saliency')
    plt.imshow(blended)
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()

# Plot saliency maps for the best and worst predictions
if worst_idx is not None:
    print(f"Plotting saliency map for worst prediction (Index: {worst_idx}, True Label: {class_mapping[worst_label]}, Predicted: {class_mapping[worst_pred_class]}, Filename: {worst_filename})")
    plot_saliency_map(worst_image, model_ready, np.argmax(predictions[worst_idx]))

print(f"Plotting saliency map for best prediction (Index: {best_idx}, True Label: {class_mapping[best_label]}, Predicted: {class_mapping[best_pred_class]}, Filename: {best_filename})")
plot_saliency_map(best_image, model_ready, np.argmax(predictions[best_idx]))

# Processing time
print(f"Processing time: {time.time() - start_time} seconds")
