In [141]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yaml

from sklearn.impute import KNNImputer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder

import torch
import torch.nn as nn
import torch.optim as optim

In [142]:
""""Configuration"""

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)

print_graphs = config['print_graphs']
features = config['features']
target = config['target']
num_epochs = config['num_epochs']
learning_rate = config['learning_rate']
dropout_rate = config['dropout_rate']

In [143]:
"""Classes"""


class MLP(nn.Module):
    def __init__(self, input_size, output_size, dropout_rate):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.fc4 = nn.Linear(64, 32)
        self.bn4 = nn.BatchNorm1d(32)
        self.output_layer = nn.Linear(32, output_size)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)
        x = self.relu(self.bn4(self.fc4(x)))
        x = self.dropout(x)
        x = self.output_layer(x)
        return x


class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

In [144]:
"""Functions"""


def cap_outliers_percentiles(df, feature, lower_percentile=5, upper_percentile=95):
    lower_limit = np.percentile(df[feature], lower_percentile)
    upper_limit = np.percentile(df[feature], upper_percentile)

    df[feature] = np.where(df[feature] < lower_limit, lower_limit, df[feature])
    df[feature] = np.where(df[feature] > upper_limit, upper_limit, df[feature])


def plot_feature_distributions(data, data_imputed, features):
    plt.figure(figsize=(15, 10))
    for i, feature in enumerate(features):
        plt.subplot(4, 3, i + 1)
        sns.histplot(
            data[feature].dropna(),
            kde=True,
            label="Before Imputation",
            color="blue",
            bins=30,
            alpha=0.5,
        )
        sns.histplot(
            data_imputed[feature].dropna(),
            kde=True,
            label="After Imputation",
            color="red",
            bins=30,
            alpha=0.5,
        )
        plt.title(f"{feature}")
        plt.xlabel("")
        plt.ylabel("")
        plt.legend()
    plt.tight_layout()
    plt.show()


def plot_box_plots_comparison(data, data_imputed, features):
    plt.figure(figsize=(15, 12))

    for i, feature in enumerate(features):
        plt.subplot(4, 3, i + 1)

        df_before_plot = data[[feature]].copy()
        df_before_plot["Imputation Status"] = "Before Imputation"

        df_after_plot = data_imputed[[feature]].copy()
        df_after_plot["Imputation Status"] = "After Imputation"

        df_plot = pd.concat([df_before_plot, df_after_plot], ignore_index=True)

        sns.boxplot(
            x="Imputation Status",
            y=feature,
            data=df_plot,
            hue="Imputation Status",
            palette="Set2",
            showfliers=True,
        )

        plt.title(f"{feature}")
        plt.xlabel("")
        plt.ylabel("")

    plt.tight_layout()
    plt.show()


def plot_correlation_heatmaps(data, data_imputed, features):
    corr_before = data[features].corr()
    corr_after = data_imputed[features].corr()

    plt.figure(figsize=(16, 8))

    plt.subplot(1, 2, 1)
    sns.heatmap(corr_before, annot=True, cmap="coolwarm", fmt=".2f")
    plt.title("Correlation Before Imputation")

    plt.subplot(1, 2, 2)
    sns.heatmap(corr_after, annot=True, cmap="coolwarm", fmt=".2f")
    plt.title("Correlation After Imputation")

    plt.tight_layout()
    plt.show()


def single_plot_feature_distributions(data, features):
    plt.figure(figsize=(15, 10))
    for i, feature in enumerate(features):
        plt.subplot(4, 3, i + 1)
        sns.histplot(
            data[feature].dropna(),
            kde=True,
            label="Normalized",
            color="blue",
            bins=30,
            alpha=1,
        )
        plt.title(f"{feature}")
        plt.xlabel("")
        plt.ylabel("")
        plt.legend()
    plt.tight_layout()
    plt.show()

In [145]:
"""Read Data"""

data = pd.read_csv("almond_data.csv")

data = data.iloc[:, 1:]

data.head()

data.columns = data.columns.str.strip()

In [146]:
"""Impute Features"""

impute_features = [
    "Length (major axis)",
    "Width (minor axis)",
    "Thickness (depth)",
    "Area",
    "Perimeter",
    "Solidity",
    "Compactness",
    "Extent",
    "Convex hull(convex area)",
]

knn_imputer = KNNImputer(n_neighbors=53)  # n_neighbors = root of dataset size

data_imputed = data.copy()

types = data[target].unique()
for almond_type in types:

    type_data = data[data[target] == almond_type].copy()

    type_features = type_data[impute_features]

    imputed_values = knn_imputer.fit_transform(type_features)

    type_data[impute_features] = imputed_values

    data_imputed.update(type_data)

data_imputed["Roundness"] = (4 * data_imputed["Area"]) / (
    np.pi * data_imputed["Length (major axis)"] ** 2
)

data_imputed["Aspect Ratio"] = (
    data_imputed["Length (major axis)"] / data_imputed["Width (minor axis)"]
)

data_imputed["Eccentricity"] = np.sqrt(
    1 - (data_imputed["Width (minor axis)"] / data_imputed["Length (major axis)"]) ** 2
)

In [147]:
"""Cap Outliers"""

for feature in features:
    cap_outliers_percentiles(data_imputed, feature)

In [148]:
if print_graphs:
    plot_feature_distributions(data, data_imputed, features)

In [149]:
if print_graphs:
    plot_box_plots_comparison(data, data_imputed, features)

In [150]:
if print_graphs:
    plot_correlation_heatmaps(data, data_imputed, features)

In [151]:
"""Normalize Data"""

scaler = StandardScaler()

normalized_data = data_imputed.copy()
normalized_data[features] = scaler.fit_transform(data_imputed[features])

# print(normalized_data.head())

In [152]:
if print_graphs:
    single_plot_feature_distributions(normalized_data, features)

In [154]:
x = normalized_data[features].values
y = normalized_data[target].values

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

X_train, X_val, y_train, y_val = train_test_split(x, y_encoded, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)

input_size = len(features)
output_size = len(label_encoder.classes_)

model = MLP(input_size, output_size, dropout_rate)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

early_stopping = EarlyStopping(patience=50, delta=0.001)

for epoch in range(num_epochs):
    model.train()

    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_tensor)
        val_loss = criterion(val_outputs, y_val_tensor).item()

    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Training Loss: {loss.item():.4f}, Validation Loss: {val_loss:.4f}')

    early_stopping(val_loss)
    if early_stopping.early_stop:
        print(f'Early stopping triggered, Epoch [{epoch + 1}/{num_epochs}]')
        break

model.eval()
with torch.no_grad():
    val_outputs = model(X_val_tensor)
    _, predicted = torch.max(val_outputs.data, 1)
    accuracy = (predicted == y_val_tensor).float().mean()
    print(f'Final Validation Accuracy: {accuracy:.4f}')


Epoch [100/10000], Training Loss: 0.2665, Validation Loss: 0.2177
Epoch [200/10000], Training Loss: 0.1577, Validation Loss: 0.1428
Epoch [300/10000], Training Loss: 0.1029, Validation Loss: 0.1234
Epoch [400/10000], Training Loss: 0.0852, Validation Loss: 0.1193
Epoch [500/10000], Training Loss: 0.0834, Validation Loss: 0.1241
Early stopping triggered, Epoch [502/10000]
Final Validation Accuracy: 0.9590
