In [23]:
import pandas as pd

# Custom dataset made by my partner
train_dataset = pd.read_csv("custom_training_data.csv")
test_dataset = pd.read_csv("custom_test_data.csv")

# Features

In [24]:
train_df = train_dataset.drop(columns=["Unnamed: 0", "image_path", "id"], axis=1)
test_df = test_dataset.drop(columns=["Unnamed: 0", "image_path", "id", "ClassId"], axis=1)

# Remove all columns starting with "hsv_bin"
# train_df = train_df.loc[:, ~train_df.columns.str.startswith("hsv_bin")]
# test_df = test_df.loc[:, ~test_df.columns.str.startswith("hsv_bin")]

In [25]:
import numpy as np
from PIL import Image
import cv2

IMAGE_SIZE = 96

def load_images(paths):
    images = []
    for path in paths:
        image = Image.open(path)
        images.append(image)
    return images

# Get RGB color contrast
def get_color_contrast(images, width=IMAGE_SIZE, height=IMAGE_SIZE):
    data = []

    for image in images:
        # Crop 3 pixels from each side
        # img = img.crop((3, 3, img.width - 3, img.height - 3))

        image = image.resize((width, height), Image.BILINEAR)

        # Get normalised pixel values
        image_data = np.asarray(image).astype("float32") / 255.0
        r, g, b = image_data[..., 0], image_data[..., 1], image_data[..., 2]

        # Use ratio to better emulate the perceived brightness, instead of just doing the average
        # https://en.wikipedia.org/wiki/Luma_(video)
        luma = 0.299 * r + 0.587 * g + 0.114 * b
        red_green   = r - g
        blue_yellow = b - (r + g) / 2.0

        data.append(np.dstack((luma, red_green, blue_yellow)))

    return np.array(data)

# Get HSV color contrast
def get_hsv_features(images, width=IMAGE_SIZE, height=IMAGE_SIZE):
    data = []

    for image in images:
        image = image.resize((width, height), Image.BILINEAR)

        # Convert RGB to HSV
        rgb = np.asarray(image).astype("uint8")
        hsv = cv2.cvtColor(rgb[..., ::-1], cv2.COLOR_BGR2HSV)

        # Normalise HSV
        hsv = hsv.astype("float32") / 255.0
        data.append(hsv)

    return np.array(data)

# Bin values chunk by chunk
def joint_bin_features_hsv(image_features, num_bins=3, grid_size=4):
    # Binned value per chunk
    binned_features = []
    
    for image_feature in image_features:
        chunk_h = image_feature.shape[0] // grid_size
        chunk_w = image_feature.shape[1] // grid_size

        features = []

        for i in range(grid_size):
            for j in range(grid_size):
                chunk = image_feature[
                    i * chunk_h:(i + 1) * chunk_h,
                    j * chunk_w:(j + 1) * chunk_w,
                    :
                ]

                # Clip and split to bins
                chunk = np.clip(chunk, 0.0, 1.0)
                bin_indices = np.floor(chunk * num_bins).astype(int)
                bin_indices = np.clip(bin_indices, 0, num_bins - 1)

                # Flatten into joint bin index for HSV channels
                bin_h = bin_indices[..., 0]
                bin_s = bin_indices[..., 1]
                bin_v = bin_indices[..., 2]
                combined = bin_h * (num_bins ** 2) + bin_s * num_bins + bin_v

                # Histogram per chunk
                hist = np.bincount(combined.ravel(), minlength=num_bins**3)
                features.extend(hist)

        binned_features.append(features)

    # Create feature names
    col_names = []
    for i in range(grid_size):
        for j in range(grid_size):
            for k in range(num_bins**3):
                col_names.append(f"HSV_{i}_{j}_bin_{k}")

    return pd.DataFrame(binned_features, columns=col_names)

In [26]:
# Create additional features from images
def add_features(dataset, df):
    images = load_images(dataset["image_path"].values)

    # Get binned 3D histogram of Luma, R-G, B-Y
    # color_contrast = get_color_contrast(images)
    
    # Get binned 3D histogram of HSV
    color_contrast = get_hsv_features(images)
    color_contrast_binned = joint_bin_features_hsv(color_contrast)

    df = pd.concat([df, color_contrast_binned], axis=1)
    return df

# Model Training

In [27]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from termcolor import colored

N_SPLITS = 10

def train_evaluate(model, train_df):
    kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=42)

    X = train_df.drop(["ClassId"], axis=1)
    y = train_df["ClassId"]

    accuracies = []
    for fold, (train_index, test_index) in enumerate(kf.split(X)):
        X_train, X_val = X.iloc[train_index],  X.iloc[test_index]
        y_train, y_val = y.iloc[train_index],  y.iloc[test_index]

        # Train using sklearn
        model.fit(X_train, y_train)
        y_pred = model.predict(X_val)
        acc = accuracy_score(y_val, y_pred)
        accuracies.append(acc)

        # Print fold results
        print(f"{'Fold':<6} {fold + 1:<4} | Accuracy: {acc:.4f}")

    print(colored(f"Average Accuracy: {sum(accuracies)/len(accuracies):.4f}", "cyan"))

    return model

# Train Models

In [28]:
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.linear_model import LogisticRegression
# from sklearn.neighbors import KNeighborsClassifier

# models = [
#     RandomForestClassifier(n_estimators=200, max_depth=20, random_state=42, n_jobs=-1), 
#     LogisticRegression(max_iter=5000),
#     KNeighborsClassifier(n_neighbors=1, n_jobs=-1),
# ]

# # Add additional features
# train_df = add_features(train_dataset, train_df)
# test_df = add_features(test_dataset, test_df)

# for model in models:
#     print(colored(f"Training {model.__class__.__name__}", "yellow"))

#     train_evaluate(model, train_df)

#     # Retain on the entire training set
#     X, y = train_df.drop(["ClassId"], axis=1), train_df["ClassId"]
#     model.fit(X, y)

#     # Get accuracy on the test set
#     # Do it here so don't need to submit to Kaggle
#     y_pred = model.predict(test_df)
#     test_set_benchmark = pd.read_csv("test_100.csv")
#     labels = test_set_benchmark["ClassId"].values

#     accuracy = accuracy_score(labels, y_pred)

#     print(colored(f"Test Set Accuracy: {accuracy:.4f}", "green"))
#     print(colored(f"Finished training {model.__class__.__name__}", "yellow"))

# Hyperparameter Tuning

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import loguniform, randint
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

models = [
    RandomForestClassifier(random_state=42), 
    LogisticRegression(solver="saga", penalty="l2", max_iter=5000),
    KNeighborsClassifier(),
    SVC()
]

parameter_ranges = [
    # Random forest
    {
        "n_estimators": randint(100, 500),
        "max_depth": [10, 20, None],
        "min_samples_split": [2, 5, 10],
        "min_samples_leaf": [1, 2, 4],
        "max_features": ["sqrt", "log2", None],
    },

    # Logistic regression
    # Values are chosen using the resources below as reference
    # https://medium.com/@agrawalsam1997/regularization-in-logistic-regression-3d854e79f07d
    # https://nachi-keta.medium.com/how-would-you-go-about-choosing-the-right-parameters-for-logistic-regression-9284fc4b560
    {
        "C": loguniform(1e-3, 1e2),
        "solver": ["lbfgs", "saga"],
        "penalty": ["l2"],
    },
    
    # KNN
    {
        "n_neighbors": randint(1, 10),
        "weights": ["uniform", "distance"],
        "p": [1, 2],
    },

    # SVM, used the resource below as a starting point
    # https://www.geeksforgeeks.org/rbf-svm-parameters-in-scikit-learn/
    {
        "C": loguniform(1e-3, 1e2),
        "kernel": ["linear", "rbf"],
        "gamma": ["scale", "auto"]
    }
]

# Add additional features
# train_df = add_features(train_dataset, train_df)
# test_df = add_features(test_dataset, test_df)

for i, model in enumerate(models):
    # Random holdout
    X = train_df.drop("ClassId", axis=1)
    y = train_df["ClassId"]
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

    # Must scale features for distance based models
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)

    # Tuning using CV
    print(colored(f"Tuning {model.__class__.__name__}", "yellow"))

    optimiser = RandomizedSearchCV(model, parameter_ranges[i], n_iter=20, cv=5, scoring="accuracy", random_state=42, n_jobs=-1)
    optimiser.fit(X_train_scaled, y_train)
    
    print("Best parameters:", optimiser.best_params_)

    # Predict on validation set
    print(colored(f"Evaluating {model.__class__.__name__} on validation set", "yellow"))

    best_model = optimiser.best_estimator_
    y_pred = best_model.predict(X_val_scaled)
    accuracy = accuracy_score(y_val, y_pred)

    print(colored(f"Validation accuracy: {accuracy:.4f}", "green"))

    # Predict on test set (instead of submitting to Kaggle)
    # This is only used for sanity checks during report writing
    # The test set if just the best CNN predictions, since we achieved a high accuracy in that

    # test_df_scaled = scaler.transform(test_df)
    # y_pred = best_model.predict(test_df_scaled)
    # test_set_benchmark = pd.read_csv("test_100.csv")
    # labels = test_set_benchmark["ClassId"].values
    # accuracy = accuracy_score(labels, y_pred)

    # print(colored(f"Test set accuracy: {accuracy:.4f}", "green"))
    # print(colored(f"Finished training {model.__class__.__name__}", "yellow"))

[33mTuning RandomForestClassifier[0m
Best parameters: {'max_depth': None, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 426}
[33mEvaluating RandomForestClassifier on validation set[0m
[32mValidation accuracy: 0.8716[0m
[33mTuning LogisticRegression[0m
Best parameters: {'C': 0.14445251022763064, 'penalty': 'l2', 'solver': 'lbfgs'}
[33mEvaluating LogisticRegression on validation set[0m
[32mValidation accuracy: 0.8925[0m
[33mTuning KNeighborsClassifier[0m
Best parameters: {'n_neighbors': 5, 'p': 1, 'weights': 'distance'}
[33mEvaluating KNeighborsClassifier on validation set[0m
[32mValidation accuracy: 0.8179[0m
[33mTuning SVC[0m
Best parameters: {'C': 0.033205591037519584, 'gamma': 'auto', 'kernel': 'linear'}
[33mEvaluating SVC on validation set[0m
[32mValidation accuracy: 0.9126[0m


# Stacking

In [30]:
# The code below is just pulled form my partner's notebook, but with parameters changed based on the tuning

from sklearn.ensemble import StackingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split

base_estimators = [
    # Random forest
    ("rf", RandomForestClassifier(
        max_depth = None, max_features = "sqrt", min_samples_leaf = 1, min_samples_split = 2, n_estimators=426, n_jobs=-1, random_state=42
    )),

    # SVM
    ("svm_rbf", make_pipeline(SVC(
        kernel="linear", C=0.033205591037519584, gamma="auto", probability=True, random_state=42
    ))),

    # Logistic regression
    ("lr", make_pipeline(StandardScaler(), LogisticRegression(
      solver="lbfgs", penalty="l2", max_iter=5000, C = 0.14445251022763064,
    )))
]

meta_learner = RandomForestClassifier(
    n_estimators=500, max_depth=10, n_jobs=-1, random_state=42, class_weight="balanced_subsample"
)

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

stacking = StackingClassifier(
    estimators=base_estimators,
    final_estimator=meta_learner,
    # cv=cv,
    n_jobs=-1,
    passthrough=True
)


X = train_df.drop("ClassId", axis=1).values
y = train_df["ClassId"].values

# Random holdout
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Scale training data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)

# Fit on training data
stacking.fit(X_train_scaled, y_train)

# Validation
val_pred = stacking.predict(X_val_scaled)
val_acc = accuracy_score(y_val, val_pred)
print(f"Validation accuracy: {val_acc:.5f}")

# # Prepare test set
# X_test_scaled = scaler.transform(test_df)
# y_test_true = test_set_benchmark["ClassId"].values

# # Retrain on all data and fit on test set
# stacking.fit(X_train_scaled, y_train)
# test_pred = stacking.predict(X_test_scaled)
# test_acc = accuracy_score(y_test_true, test_pred)
# print(f"Test accuracy: {test_acc:.5f}")

Validation accuracy: 0.88889


In [31]:
# Compare predictions to the test set
# Again, this is only for sanity checks
# The test set if just the best CNN predictions, since we achieved a high accuracy in that

# import seaborn as sns
# import matplotlib.pyplot as plt
# from sklearn.metrics import confusion_matrix

# matrix = confusion_matrix(y_test_true, test_preds)

# plt.figure(figsize=(12, 10))
# sns.heatmap(matrix, annot=True, fmt="d", cmap="Blues")
# plt.xlabel("Predicted")
# plt.ylabel("True")
# plt.title("Confusion Matrix Heatmap")
# plt.show()