# Libraries

In [None]:
import os
import cv2
import sys
import uuid
import joblib
import random
import datetime
import numpy as np
import pandas as pd
import seaborn as sns
from pathlib import Path
import concurrent.futures
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import albumentations as A

import torch
from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

from sklearn.svm import SVC
from sklearn.base import BaseEstimator
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier

import xgboost as xgb

In [None]:
# Attaching project directory
sys.path.append(os.path.dirname(os.getcwd()))

# Pathing imports
from src import GetPath, FullMHI

In [None]:
SIZE = (32, 32)
NORMALIZE = False

RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)

In [None]:
DATA_PATH = GetPath().shared_data()

MHI_DATA = os.path.join(DATA_PATH, 'preprocess', 'mhi')

# # Local Path
# D_PATH = 'D:/fish_behavior'
# MHI_DATA = os.path.join(D_PATH, 'data', 'preprocess', 'mhi_right_tail')
# MHI_CNTRS_SAMPLE = os.path.join(MHI_DATA, 'samples', 'mhi_contour_sampling.csv')

# Inspecting Data Samples for B1 batch - 15 frame

In [None]:
# transform = A.Compose([
#     # Normalize
#     A.Normalize(mean=(0,0,0), std=(1,1,1)),
#     A.RandomRotate90(),
#     A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.50, rotate_limit=45, p=.75),
#     A.Resize(224, 224)
# ])

transform = A.Compose([
        A.Resize(224, 224),
        A.OneOf([
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
        ], p=0.5),
        A.OneOf([
            A.RandomRotate90(),
            A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.50, rotate_limit=45, p=.75),
        ], p=0.5),
        A.Normalize(mean=(0,0,0), std=(1,1,1),),
    ])

# transform = A.Compose([
#     # Normalize
#     A.Normalize(mean=(0,0,0), std=(1,1,1)),
#     A.Resize(32, 32)
# ])

In [None]:
datasets = FullMHI(root_dir=MHI_DATA, transform=transform)
print(f"Total experiment datasets: {len(datasets)}")

In [None]:
idx_track = {
    'B1': [],
    'B2': [],
    'B3': []
}

labels_track = {
    'B1': [],
    'B2': [],
    'B3': []
}

for idx in range(len(datasets)):
    experiment, label, image = datasets.__getitem__(idx)
    idx_track[experiment].append(idx)
    labels_track[experiment].append(label)

In [None]:
idx_train, idx_test = train_test_split(
    idx_track['B1'] + idx_track['B2'],
    test_size=0.2,
    random_state=RANDOM_STATE
)

In [None]:
X_train = []
y_train = []

for i in range(10):
    for idx in idx_train:
        experiment, label, image = datasets.__getitem__(idx)
        X_train.append(image.flatten())
        y_train.append(label)

X_test = []
y_test = []

for idx in idx_test:
    experiment, label, image = datasets.__getitem__(idx)
    X_test.append(image.flatten())
    y_test.append(label)

X_val = []
y_val = []

for idx in idx_track['B3']:
    experiment, label, image = datasets.__getitem__(idx)
    X_val.append(image.flatten())
    y_val.append(label)

In [None]:
class ClassificationModel:
    def __init__(self, model: BaseEstimator, model_name: str):
        self.model = model
        self.model_name = model_name
        self.is_trained = False

    def train(self, X_train, y_train):
        self.model.fit(X_train, y_train)
        train_accuracy = self.model.score(X_train, y_train)
        print(f"{self.model_name} Training accuracy: {train_accuracy:.4f}")

    def evaluate(self, X_test, y_test):
        test_accuracy = self.model.score(X_test, y_test)
        y_pred = self.model.predict(X_test)

        print(f"{self.model_name} Test accuracy: {test_accuracy:.4f}")
        print("\n-----CLASSIFICATION REPORT-----\n")
        print(classification_report(y_test, y_pred))

        return y_test, y_pred

    def plot_confusion_matrix(self, y_true, y_pred):
        disp = ConfusionMatrixDisplay.from_predictions(y_true, y_pred, xticks_rotation='vertical')
        disp.figure_.suptitle(f"{self.model_name} Confusion Matrix")
        plt.show()

    def run_full_analysis(self, X_train, y_train, X_test, y_test):
        self.train(X_train, y_train)
        y_true, y_pred = self.evaluate(X_test, y_test)
        self.plot_confusion_matrix(y_true, y_pred)

In [None]:
# Evaluate the performance of each base model
def train_and_evaluate(name, model, images_train, labels_train, images_test, labels_test):
    le = LabelEncoder()
    labels_train = le.fit_transform((labels_train))
    model.fit(images_train, labels_train)
    label_predict = model.predict(images_test)
    label_predict = le.inverse_transform(label_predict)
    accuracy = accuracy_score(labels_test, label_predict)
    f1 = f1_score(labels_test, label_predict, average='weighted', zero_division=0.0)
    precision = precision_score(labels_test, label_predict, average='weighted', zero_division=0.0)
    recall = recall_score(labels_test, label_predict, average='weighted', zero_division=0.0)
    return [name, accuracy, f1, precision, recall]

def model_selection(models, X_train, y_train, X_test, y_test):
    results = []
    with tqdm(total=len(models), desc='Model Training', unit='model') as progress:
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for name, model in models.items():
                future = executor.submit(
                    train_and_evaluate, 
                    name, 
                    model, 
                    X_train,
                    y_train,
                    X_test,
                    y_test
                )
                futures.append(future)

            for future in concurrent.futures.as_completed(futures):
                results.append(future.result())
                future.add_done_callback(lambda p: progress.update())
    
    return results

In [None]:
# Define the base models
models = {
    'Logistic Regression': LogisticRegression(random_state=RANDOM_STATE, max_iter=1000),
    'SGD Classifier': SGDClassifier(random_state=RANDOM_STATE),
    'Decision Tree': DecisionTreeClassifier(random_state=RANDOM_STATE),
    'Random Forest': RandomForestClassifier(random_state=RANDOM_STATE),
    'Gradient Boosting': GradientBoostingClassifier(random_state=RANDOM_STATE),
    'AdaBoost': AdaBoostClassifier(random_state=RANDOM_STATE, algorithm='SAMME'),
    'K-Nearest Neighbors': KNeighborsClassifier(),
    'Support Vector Machine': SVC(random_state=RANDOM_STATE),
    'Gaussian Naive Bayes': GaussianNB(),
    'Multi-layer Perceptron': MLPClassifier(random_state=RANDOM_STATE),
    'XGBoost': xgb.XGBClassifier()
}

# run baseline selector
results = model_selection(models, X_train, y_train, X_test, y_test)

# Sort the results by F1-score in descending order
results = sorted(results, key=lambda x: x[2], reverse=True)

# Print the results
print("Model Performance:")
print("+-----------------------+----------+----------+----------+----------+")
print("| Model                 | Accuracy | F1-score | Precision | Recall   |")
print("+-----------------------+----------+----------+----------+----------+")
for result in results:
    print("| {:<20} | {:.4f}   | {:.4f}   | {:.4f}   | {:.4f}   |".format(*result))
print("+-----------------------+----------+----------+----------+----------+")

# Choose the best model based on the results
best_model = models[results[0][0]]
print(f"The best model is {results[0][0]}")