In [1]:
import os, warnings
import json

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report


import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import load_img, img_to_array

# Load configuration data
# Please note: If running outside the provided Docker environment,
# update dataset_path in config.json to the local path where your dataset is stored.

with open('/workspace/folio_project/config.json') as f:
    config = json.load(f)

# Access values from configuration
input_shape = tuple(config['input_shape'])
target_size = tuple(config['target_size'])
seed = config['seed']
dataset_path = config['dataset_path']

if not os.path.exists(dataset_path):
    raise FileNotFoundError(f"Dataset path {dataset_path} does not exist. Please update dataset_path in config.json.")
    

def set_seed(seed):
    """
    Sets the random seed for reproducibility.
    
    Parameters:
    seed (int): The seed value.
    """
    np.random.seed(seed)
    tf.random.set_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
set_seed(seed)

plt.rc('figure', autolayout=True)
plt.rc('axes', labelweight='bold', labelsize='large',
      titleweight='bold', titlesize=18, titlepad=10)
plt.rc('image', cmap='magma')
warnings.filterwarnings('ignore')

2023-10-22 10:36:40.805729: I tensorflow/core/platform/cpu_feature_guard.cc:183] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE3 SSE4.1 SSE4.2 AVX, in other operations, rebuild TensorFlow with the appropriate compiler flags.


FileNotFoundError: [Errno 2] No such file or directory: 'config.json'

In [None]:
# Path to data, a folder with 32 folders with photos for each species, it's 
ds_dir = '/dataset'

# Lists to hold data and labels
data = []
labels = []


# Iterate through each folder (which represents a class) in the dataset directory
for class_folder in os.listdir(ds_dir):
    class_folder_path = os.path.join(ds_dir, class_folder)
    if os.path.isdir(class_folder_path):
          # Iterate through each image file in the folder
        for img_file in os.listdir(class_folder_path):
            img_file_path = os.path.join(class_folder_path, img_file)
            # Load the image and convert it to an array
            img = load_img(img_file_path, target_size=target_size)
            img_array = img_to_array(img)
            # Append the image array and label to the data and labels lists
            data.append(img_array)
            labels.append(class_folder)
            
data = np.array(data)
labels = np.array(labels)

In [None]:
# Converting string labels to integers to avoid "Cast string to float is not supported"
encoder = LabelEncoder()
encoded_labels = encoder.fit_transform(labels)

# Checking if all is well
print(f'Original unique labels: {len(np.unique(labels))}')
print(f'Encoded unique labels: {len(np.unique(encoded_labels))}')

In [None]:
def convert_to_float(image, label):
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    return image, label

def prepare_data(X_train, y_train, X_val, y_val):
    train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val))
    
    AUTOTUNE = tf.data.experimental.AUTOTUNE
    
    data_augmentation = tf.keras.Sequential([
        tf.keras.layers.experimental.preprocessing.RandomFlip('horizontal'),
    ])
    
    train_ds = (
        train_ds
        .cache()
        .map(convert_to_float)
        .map(lambda x, y: (data_augmentation(x, training=True), y))
        .batch(128)  
        .prefetch(buffer_size=AUTOTUNE)
    )

    val_ds = (
        val_ds
        .cache()
        .map(convert_to_float)
        .batch(128)  
        .prefetch(buffer_size=AUTOTUNE)
    )
    
    return train_ds, val_ds


def create_model(input_shape):
    """
    Creates and compiles the model.
    
    Parameters:
    input_shape (tuple): The shape of the input data.
    
    Returns:
    model (tf.keras.Model): The compiled model.
    """
    base_model = tf.keras.applications.MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    model = tf.keras.Sequential([
        base_model,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(1024, activation='relu'),
        tf.keras.layers.Dense(32, activation='softmax')
    ])
    for layer in base_model.layers:
        layer.trainable = False
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

def train_model_on_fold(model, train_ds, val_ds, epochs=20, batch_size=128):
    history = model.fit(train_ds,
                        epochs=epochs,
                        validation_data=val_ds) 
    return history.history


def average_history(histories):
    avg_history = {
        'loss': np.mean([x['loss'] for x in histories], axis=0),
        'val_loss': np.mean([x['val_loss'] for x in histories], axis=0),
        'accuracy': np.mean([x['accuracy'] for x in histories], axis=0),
        'val_accuracy': np.mean([x['val_accuracy'] for x in histories], axis=0)
    }
    return pd.DataFrame(avg_history)

def plot_history(avg_history_frame):
    avg_history_frame.loc[:, ['loss', 'val_loss']].plot()
    avg_history_frame.loc[:, ['accuracy', 'val_accuracy']].plot()

def perform_kfold_cross_validation(data, encoded_labels, n_splits=5, random_state=1):
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    all_histories = []

    for train_index, val_index in skf.split(data, encoded_labels):
        X_train, X_val = data[train_index], data[val_index]
        y_train, y_val = encoded_labels[train_index], encoded_labels[val_index]

        train_ds, val_ds = prepare_data(X_train, y_train, X_val, y_val)  

        model = create_model(input_shape=input_shape)
        history = train_model_on_fold(model, train_ds, val_ds)  
        all_histories.append(history)
        
        # Confusion Matrix
        y_pred = model.predict(X_val)
        y_pred_classes = np.argmax(y_pred, axis=1)
        cm = confusion_matrix(y_val, y_pred_classes)
        unique_labels = np.unique(encoded_labels)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=unique_labels)
        disp.plot(cmap=plt.cm.Blues)
        plt.show()

        # Per-class Performance
        print(classification_report(y_val, y_pred_classes))

    avg_history_frame = average_history(all_histories)
    plot_history(avg_history_frame)
    
    return avg_history_frame

# Finally, call the function to perform K-fold cross validation:
avg_history_frame = perform_kfold_cross_validation(data, encoded_labels)

