# Imports

In [None]:
!pip install scikit-posthocs

In [45]:
# Files
import os

# Linear Algebra
import numpy as np

# Multi thread & processing
import multiprocessing

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Machine learning general and Data Modelling
import pandas as pd
import sklearn
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler

# Image processing
import cv2

# Evaluation
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import learning_curve, cross_val_score
import scikit_posthocs as sp
from scipy.stats import friedmanchisquare

# Models
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Prepare Dataset

In [3]:
def get_list_of_files(directory):
  allFiles = []
  for root, dirs, files in os.walk(directory):
    for file in files:
        fullPath = os.path.join(root, file)
        allFiles.append(fullPath)
  return allFiles

def load_image_with_path(image_path):
    img = cv2.imread(image_path)
    img = cv2.resize(img, (150, 150), interpolation=cv2.INTER_AREA)
    label = os.path.basename(os.path.dirname(image_path))
    return img, label, image_path

def prepare_data(dataset, num_workers=2):
    with multiprocessing.Pool(num_workers) as pool:
        results = pool.map(load_image_with_path, dataset)

    data = [res[0] for res in results]
    labels = [res[1] for res in results]
    paths = [res[2] for res in results]

    return np.array(data), np.array(labels), paths

In [4]:
train_ds = get_list_of_files("/content/drive/MyDrive/Research Leaf/ASHIIQ/compute_ashiq/Train")
test_ds = get_list_of_files("/content/drive/MyDrive/Research Leaf/ASHIIQ/compute_ashiq/Test")

In [5]:
X_train, y_train, train_paths = prepare_data(train_ds, 2)
X_test, y_test, test_paths = prepare_data(test_ds, 2)

# Image Processing

## Remove Background

In [6]:
def remove_background_binary(image):
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    saturation = hsv_image[:, :, 1]
    _, binary_mask = cv2.threshold(saturation, 85, 255, cv2.THRESH_BINARY)
    masked_image = cv2.bitwise_and(image, image, mask=binary_mask)
    return masked_image

## Find Optimal Cluster

In [7]:
def find_optimal_k(gaps, s_k, max_k):
    for k in range(1, max_k):
        if gaps[k - 1] >= gaps[k] - s_k[k]:
            return k
    return max_k

def gap_statistic_for_k(pixel_values, k, n_refs=10, random_state=42):
    kmeans = KMeans(n_clusters=k, random_state=random_state, n_init='auto')
    kmeans.fit(pixel_values)

    disp = np.sum(np.min(pairwise_distances(pixel_values, kmeans.cluster_centers_, metric='euclidean')**2, axis=1))

    ref_disps = np.zeros(n_refs)
    for i in range(n_refs):
        random_reference = np.random.uniform(
            low=pixel_values.min(axis=0),
            high=pixel_values.max(axis=0),
            size=pixel_values.shape
        )
        kmeans.fit(random_reference)
        ref_disps[i] = np.sum(np.min(pairwise_distances(random_reference, kmeans.cluster_centers_, metric='euclidean')**2, axis=1))

    log_ref_disp = np.log(ref_disps + 1e-10)
    gap = np.mean(log_ref_disp) - np.log(disp + 1e-10)
    s_k = np.sqrt(np.sum((log_ref_disp - np.mean(log_ref_disp))**2) / n_refs)
    return gap, s_k

def calculate_gap_statistic(image, max_k=10, n_refs=10, random_state=42):
    pixel_values = np.ascontiguousarray(image.reshape((-1, 3)))
    gaps = np.zeros(max_k)
    s_k = np.zeros(max_k)

    for k in range(1, max_k + 1):
        gap, sk = gap_statistic_for_k(pixel_values, k, n_refs=n_refs, random_state=random_state)
        gaps[k - 1] = gap
        s_k[k - 1] = sk

    return gaps, s_k

# Image Segmentation

In [8]:
def k_mean_segmentation(disease, attempts=10, K=2):
    disease = remove_background_binary(disease)
    leaf_image = disease.reshape((-1, 3)).astype(np.float32)
    criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
    _, labels, centers = cv2.kmeans(leaf_image, K, None, criteria, attempts, cv2.KMEANS_PP_CENTERS)
    centers = np.uint8(centers)
    segmented_image = centers[labels.flatten()].reshape(disease.shape)
    return segmented_image, labels, centers

def segment_image(image, K=2):
    return k_mean_segmentation(image, K=K)

def process_image(args):
    image, max_k = args
    # gaps, s_k = calculate_gap_statistic(image, max_k=max_k)
    # optimal_k = find_optimal_k(gaps, s_k, max_k)
    return k_mean_segmentation(image, K=8)

def parallel_segmentation(images, max_k=10, num_workers=2):
    with multiprocessing.get_context("fork").Pool(processes=num_workers) as pool:
        args = [(image, max_k) for image in images]
        results = pool.map(process_image, args)
    return results

# Feature Extraction

In [17]:
def extract_features(segmented_results, labels, centers):
    """
    Extract mean HSV features from segmented images and flatten them.

    Parameters:
    - segmented_results: List of segmented images (output from k_mean_segmentation).
    - labels: List of label arrays corresponding to each image.
    - centers: List of cluster center arrays corresponding to each image.

    Returns:
    - features: Flattened HSV features for all images.
    """
    features = []

    for segmented_image, label_array, center_array in zip(segmented_results, labels, centers):
        hsv_image = cv2.cvtColor(segmented_image, cv2.COLOR_BGR2HSV)

        image_features = []
        num_clusters = len(center_array)

        for cluster_id in range(num_clusters):
            cluster_mask = (label_array.reshape(segmented_image.shape[:2]) == cluster_id)

            cluster_pixels = hsv_image[cluster_mask]

            if cluster_pixels.size > 0:
                mean_hsv = cluster_pixels.mean(axis=0)
            else:
                mean_hsv = [0, 0, 0]

            image_features.extend(mean_hsv)

        features.append(image_features)

    return np.array(features)

# Normalization

In [19]:
def normalize_features(features, method="standard"):
    """
    Normalize or standardize features.

    Parameters:
    - features: Feature matrix (NumPy array)
    - method: Normalization method, "standard" (default) or "minmax".

    Returns:
    - Normalized feature matrix.
    """
    if method == "standard":
        scaler = StandardScaler()
    elif method == "minmax":
        scaler = MinMaxScaler()
    else:
        raise ValueError("Invalid method. Choose 'standard' or 'minmax'.")

    return scaler.fit_transform(features)

---

# Execution

---

In [9]:
train_segmented_images = parallel_segmentation(X_train, max_k=8, num_workers=2)

In [10]:
test_segmented_images = parallel_segmentation(X_test, max_k=8, num_workers=2)

# Testing Divider

In [12]:
train_segmented = [segmented_image for segmented_image, _, _ in train_segmented_images]
train_labels = [label for _, label, _ in train_segmented_images]
train_centers = [center for _, _, center in train_segmented_images]

In [13]:
test_segmented = [segmented_image for segmented_image, _, _ in test_segmented_images]
test_labels = [label for _, label, _ in test_segmented_images]
test_centers = [center for _, _, center in test_segmented_images]

In [20]:
X_train = extract_features(train_segmented, train_labels, train_centers)
X_test = extract_features(test_segmented, test_labels, test_centers)

In [21]:
X_train_normalized = normalize_features(X_train, method="minmax")
X_test_normalized = normalize_features(X_test, method="minmax")

In [22]:
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

# Evaluation

In [40]:
def evaluate_confusion_matrix(models, X_train, y_train, X_test, y_test):
    """
    Evaluate multiple models on the test set using accuracy, precision, and recall.

    Parameters:
    - models: Dictionary of model names and instantiated objects.
    - X_train, y_train: Training features and labels.
    - X_test, y_test: Testing features and labels.

    Returns:
    - Results as a dictionary.
    """
    results = {}
    for name, model in models.items():
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        results[name] = {
            "Accuracy": accuracy_score(y_test, y_pred),
            "Precision": precision_score(y_test, y_pred, average='weighted'),
            "Recall": recall_score(y_test, y_pred, average='weighted'),
            "F1 Score": f1_score(y_test, y_pred, average='weighted'),
        }
    return results

In [None]:
def evaluate_learning_curves(models, X, y, cv=5, scoring="accuracy"):
    """
    evaluate learning curves for multiple models.

    Parameters:
    - models: Dictionary of model names and instantiated models.
    - X: Features.
    - y: Labels.
    - cv: Number of cross-validation folds.
    - scoring: Scoring metric.
    """
    plt.figure(figsize=(15, 8))

    for name, model in models.items():
        train_sizes, train_scores, test_scores = learning_curve(
            model, X, y, cv=cv, scoring=scoring, n_jobs=-1
        )
        train_mean = np.mean(train_scores, axis=1)
        train_std = np.std(train_scores, axis=1)
        test_mean = np.mean(test_scores, axis=1)
        test_std = np.std(test_scores, axis=1)

        plt.plot(train_sizes, test_mean, label=f"{name} (CV Score)", marker="o")
        plt.fill_between(train_sizes, test_mean - test_std, test_mean + test_std, alpha=0.2)

    plt.title("Learning Curves for Multiple Models")
    plt.xlabel("Training Size")
    plt.ylabel(scoring.capitalize())
    plt.legend()
    plt.grid()
    plt.show()

In [None]:
models = {
    "Decision Tree": DecisionTreeClassifier(random_state=42),
    "Random Forest": RandomForestClassifier(random_state=42),
    "SVM": SVC(kernel='linear', C=1.0),
    "Naive Bayes": GaussianNB(),
    "KNN": KNeighborsClassifier(n_neighbors=3),
}
results = evaluate_confusion_matrix(models, X_train_normalized, y_train, X_test_normalized, y_test)
for model, metrics in results.items():
    print(f"{model} Performance:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.2f}")
    print()