# Coffee Leaf Diseases Prediction

## Overview
This notebook is a reproduction of the coffee leaf disease classification method described in the research paper below, using machine learning techniques with RGB and CMY color features.

## References

### Research Paper
- **Title**: Comparative Analysis of the Performance of the Decision Tree and K-Nearest Neighbors Methods in Classifying Coffee Leaf Diseases
- **Authors**: Adie Suryadi, Murhaban Murhaban, Rivansyah Suhendra
- **Published in**: Department of Information Technology, Teuku Umar University, Indonesia
- **URL**: [https://aptikom-journal.id/conferenceseries/article/view/649/272](https://aptikom-journal.id/conferenceseries/article/view/649/272)

### Dataset
- **Dataset**: Coffee Leaf Diseases
- **Source**: Kaggle
- **URL**: [https://www.kaggle.com/datasets/badasstechie/coffee-leaf-diseases/code](https://www.kaggle.com/datasets/badasstechie/coffee-leaf-diseases/code)

## Methodology
This implementation extracts color-based features from coffee leaf images:
- **RGB features**: Mean and standard deviation for each R, G, B channel (6 features)
- **CMY features**: Mean and standard deviation for each C, M, Y channel (6 features)
- **Total**: 12 color-based features per image

The features are then used to classify coffee leaves into four categories:
- Miner
- Phoma
- Rust
- No disease

## Preprocessing Data

In [None]:
import numpy as np
from PIL import Image
import os

def load_and_extract_features(image_dir, labels_df):
    features_list = []
    valid_indices = []
    
    for idx, row in labels_df.iterrows():
        img_path = os.path.join(image_dir, f"{row['id']}.jpg")
        
        if os.path.exists(img_path):
            img = Image.open(img_path)
            img_resized = img.resize((100, 50), Image.Resampling.BILINEAR) # resize to 100x50
            img_array = np.array(img_resized).astype('float32') / 255.0 # normalize 
            
            features_list.append(img_array)
            valid_indices.append(idx)
        else:
            print(f"Warning: {img_path} not found")
    
    features_array = np.array(features_list)
    labels = labels_df.loc[valid_indices].reset_index(drop=True)
    labels = labels.drop(columns=['id'], axis=1)
    
    return features_array, labels

In [None]:
from sklearn.model_selection import train_test_split
import pandas as pd

train_label_df = pd.read_csv('dataset/train_classes.csv')
test_label_df = pd.read_csv('dataset/test_classes.csv')

train_features, train_labels = load_and_extract_features('dataset/coffee-leaf-diseases/train/images', train_label_df)
test_features, test_labels = load_and_extract_features('dataset/coffee-leaf-diseases/test/images', test_label_df)

train_features_flat = train_features.reshape(train_features.shape[0], -1)
test_features_flat = test_features.reshape(test_features.shape[0], -1)

X_train, X_valid, y_train, y_valid = train_test_split(
    train_features_flat, 
    train_labels,
    test_size=0.2,
    stratify=train_labels,
    random_state=123
)
print(f"\nShape of X_train: {X_train.shape}")

In [None]:
label_counts = np.sum(train_labels.values, axis=0)
print("Label distribution in training set:")
for label, count in zip(train_labels.columns, label_counts):
    print(f"{label}: {count} samples")

## Bulid and Evaluate Models

In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline as SKPipeline
from sklearn.multioutput import MultiOutputClassifier
from sklearn.model_selection import GridSearchCV

def build_model(model_type, model, param_grid):
    pipeline = SKPipeline([
        ('pca', PCA(random_state=123)),
        ('scaler', StandardScaler()),
        ('multi_output', MultiOutputClassifier(model, n_jobs=-1))
    ])
    
    grid = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        scoring='f1_macro',
        cv=10,
        n_jobs=-1
    )
    
    grid.fit(X_train, y_train)
    print(f"Best parameters for {model_type}: {grid.best_params_}")
    print(f"Best F1 Macro Score for {model_type}: {grid.best_score_}")
    
    return grid.best_estimator_

# ---------- Decision Tree ----------
best_multilable_dt = build_model(
    'Decision Tree',
    DecisionTreeClassifier(),
    {
        'pca__n_components': [10, 20, 50],
        'multi_output__estimator__criterion': ['gini', 'entropy'],
        'multi_output__estimator__max_depth': [5, 8, 13, 18, None],
        'multi_output__estimator__min_samples_split': [2, 5],
        'multi_output__estimator__min_samples_leaf': [1, 3],
        'multi_output__estimator__class_weight': ['balanced', None],
        'multi_output__estimator__min_impurity_decrease': [0.0, 0.001, 0.01]
    }
)

# ---------- KNN ----------
best_multilable_knn = build_model(
    'KNN',
    KNeighborsClassifier(),
    {
        'pca__n_components': [10, 20, 50],
        'multi_output__estimator__n_neighbors': [1, 3, 5, 7, 9],
        'multi_output__estimator__metric': ['euclidean', 'manhattan', 'cosine'],
        'multi_output__estimator__weights': ['uniform', 'distance']
    }
)

#### Evaluation on Validation Set

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def show_evaluation_results(model_type, pred, actual):
    print(f"\n=== {model_type} Overall Metrics ===")
    print("Accuracy (subset accuracy):", accuracy_score(actual, pred))
    print("Precision (micro):", precision_score(actual, pred, average='micro', zero_division=0))
    print("Recall (micro):", recall_score(actual, pred, average='micro', zero_division=0))
    print("F1-score (micro):", f1_score(actual, pred, average='micro', zero_division=0))
    print("Precision (macro):", precision_score(actual, pred, average='macro', zero_division=0))
    print("Recall (macro):", recall_score(actual, pred, average='macro', zero_division=0))
    print("F1-score (macro):", f1_score(actual, pred, average='macro', zero_division=0))

# ---------- Decision Tree ----------
y_pred_valid_multilabel_dt = best_multilable_dt.predict(X_valid)

show_evaluation_results(
    "Decision Tree",
    y_pred_valid_multilabel_dt,
    y_valid
)

# ---------- KNN ----------
y_pred_valid_multilabel_knn = best_multilable_knn.predict(X_valid)

show_evaluation_results(
    "KNN",
    y_pred_valid_multilabel_knn,
    y_valid
)

##### Confusion Matrix Heatmap

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

labels = y_valid.columns

def plot_confusion_matrix(model_type, pred, actual):
    
    pred_array = np.array(pred)
    actual_array = np.array(actual)
    
    plt.figure(figsize=(15, 4))
    
    for i, label_name in enumerate(labels):
        y_true_label = actual_array[:, i]
        y_pred_label = pred_array[:, i]
        
        cm = confusion_matrix(y_true_label, y_pred_label)
        
        plt.subplot(1, len(labels), i + 1)
        sns.heatmap(
            cm,
            annot=True,
            fmt='d',
            cmap='Greens',
            xticklabels=['Not ' + label_name, label_name],
            yticklabels=['Not ' + label_name, label_name]
        )
        plt.title(f'{model_type} Confusion Matrix - {label_name}')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
    
    plt.tight_layout()
    plt.show()

# ---------- Decision Tree ----------
plot_confusion_matrix(
    'Decision Tree',
    y_pred_valid_multilabel_dt,
    y_valid
)

# ---------- KNN ----------
plot_confusion_matrix(
    'KNN',
    y_pred_valid_multilabel_knn,
    y_valid
)

##### ROC-AUC Curves

In [None]:
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc

def plot_roc_curve(model_type, model, pred_target, actual):
    actual_array = np.array(actual)
    y_score = model.predict_proba(pred_target)
    
    is_multioutput_style = False
    if isinstance(model, SKPipeline) and isinstance(model.steps[-1][1], MultiOutputClassifier):
        is_multioutput_style = True
    elif isinstance(model, MultiOutputClassifier):
        is_multioutput_style = True

    # convert to np.array
    if is_multioutput_style:
        y_score_array = np.column_stack([proba[:, 1] for proba in y_score])
    else:
        y_score_array = np.array(y_score)
        
    plt.figure(figsize=(8, 6))
    for i, class_name in enumerate(labels):
        fpr_dt, tpr_dt, _ = roc_curve(actual_array[:, i], y_score_array[:, i])
        roc_auc_dt_best = auc(fpr_dt, tpr_dt)
        plt.plot(
            fpr_dt,
            tpr_dt,
            lw=2,
            label=f'{class_name} (AUC = {roc_auc_dt_best:.4f})'
        )
    plt.plot([0, 1], [0, 1], 'k--', lw=1)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{model_type} ROC Curve for Predicting Test Set Using the Best Model - Decision Tree')
    plt.legend()
    plt.show()

# ---------- Decision Tree ----------
plot_roc_curve(
    'Decision Tree',
    best_multilable_dt,
    X_valid,
    y_valid
)

# ---------- KNN ----------
plot_roc_curve(
    'KNN',
    best_multilable_knn,
    X_valid,
    y_valid
)

#### Evaluation on Test Set


In [None]:
# ---------- Decision Tree ----------
y_pred_test_multilabel_dt = best_multilable_dt.predict(test_features_flat)

show_evaluation_results(
    "Decision Tree",
    y_pred_test_multilabel_dt,
    test_labels
)

# ---------- KNN ----------
y_pred_test_multilabel_knn = best_multilable_knn.predict(test_features_flat)

show_evaluation_results(
    "KNN",
    y_pred_test_multilabel_knn,
    test_labels
)

##### Confusion Matrix Heatmap

In [None]:
# ---------- Decision Tree ----------
plot_confusion_matrix(
    'Decision Tree',
    y_pred_test_multilabel_dt,
    test_labels
)

# ---------- KNN ----------
plot_confusion_matrix(
    'KNN',
    y_pred_test_multilabel_knn,
    test_labels
)

##### ROC-AUC Curves

In [None]:
# ---------- Decision Tree ----------
plot_roc_curve(
    'Decision Tree',
    best_multilable_dt,
    test_features_flat,
    test_labels
)

# ---------- KNN ----------
plot_roc_curve(
    'KNN',
    best_multilable_knn,
    test_features_flat,
    test_labels
)

### Apply SMOTE

In [None]:
from sklearn.base import clone
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import StratifiedKFold # CV with SMOTE

# Combine best estimators (custom multi-output)
class CustomMultiOutputEstimator:
    def __init__(self, estimators):
        self.estimators = estimators
        
    def predict(self, X):
        # Generate predictions for each estimator and stack them
        predictions = [est.predict(X).reshape(-1, 1) for est in self.estimators]
        return np.hstack(predictions)
    
    def predict_proba(self, X):
        # Generate probability predictions for each estimator
        proba_list = []
        for est in self.estimators:
            proba = est.predict_proba(X)[:, 1].reshape(-1, 1)
            proba_list.append(proba)
            
        return np.hstack(proba_list)


def build_model_smote(model_type, model, param_grid):
    
    # A list to store best estimators from each fold
    best_estimators_list = []
    
    # A list to store best macro scores from each fold
    best_macro_scores = []
    
    y_train_np = np.array(y_train)
    X_train_np = np.array(X_train)
    
    # Loop through each label for multi-label classification
    for i in range(y_train.shape[1]):
        print(f"Processing label {i+1}/{y_train.shape[1]}")
        
        y_label = y_train_np[:, i]
        
        if len(np.unique(y_label)) < 2:
            print(f"Skipping label {i+1} since it has only one class.")
            continue
        
        # Define pipeline for single label
        single_label_pipeline = ImbPipeline([
            ('pca', PCA(random_state=123)),
            ('scaler', StandardScaler()),
            ('smote', SMOTE(random_state=123)),
            ('model', clone(model))
        ])
        
        # Use StratifiedKFold for better representation of classes in each fold
        cv_splitter = StratifiedKFold(n_splits=10, shuffle=True, random_state=123)
        
        grid = RandomizedSearchCV(
            estimator=single_label_pipeline,
            param_distributions=param_grid,
            n_iter=200,
            scoring='f1',
            cv=cv_splitter,
            n_jobs=-1,
            random_state=123
        )
        
        grid.fit(X_train_np, y_label)
        
        print(f"Best parameters for {model_type} label {i+1}: {grid.best_params_}")
        print(f"Best F1 score for label {i+1}: {grid.best_score_}")
        best_estimators_list.append(grid.best_estimator_)
        best_macro_scores.append(grid.best_score_)
    
    return CustomMultiOutputEstimator(best_estimators_list)

# ---------- Decision Tree ----------
best_multilable_dt_smote = build_model_smote(
    'Decision Tree',
    DecisionTreeClassifier(),
    {
        'pca__n_components': [10, 20, 50],
        'smote__k_neighbors': [3, 5, 7, 11],
        'model__criterion': ['gini', 'entropy'],
        'model__max_depth': [5, 8, 13, 18, None],
        'model__min_samples_split': [2, 5],
        'model__min_samples_leaf': [1, 3],
        'model__class_weight': ['balanced', None],
        'model__min_impurity_decrease': [0.0, 0.001, 0.01]
    }
)

# ---------- KNN ----------
best_multilable_knn_smote = build_model_smote(
    'KNN',
    KNeighborsClassifier(),
    {
        'pca__n_components': [10, 20, 50],
        'smote__k_neighbors': [3, 5, 7, 11],
        'model__n_neighbors': [1, 3, 5, 7, 9],
        'model__metric': ['euclidean', 'manhattan', 'cosine'],
        'model__weights': ['uniform', 'distance']
    }
)

#### Evaluation on Validation Set

In [None]:
# ---------- Decision Tree ----------
y_pred_valid_multilabel_dt_smote = best_multilable_dt_smote.predict(X_valid)

show_evaluation_results(
    "Decision Tree",
    y_pred_valid_multilabel_dt_smote,
    y_valid
)

# ---------- KNN ----------
y_pred_valid_multilabel_knn_smote = best_multilable_knn_smote.predict(X_valid)

show_evaluation_results(
    "KNN",
    y_pred_valid_multilabel_knn_smote,
    y_valid
)

##### Confusion Matrix Heatmap

In [None]:
# ---------- Decision Tree ----------
plot_confusion_matrix(
    'Decision Tree',
    y_pred_valid_multilabel_dt_smote,
    y_valid
)

# ---------- KNN ----------
plot_confusion_matrix(
    'KNN',
    y_pred_valid_multilabel_knn_smote,
    y_valid
)

##### ROC-AUC Curves

In [None]:
# ---------- Decision Tree ----------
plot_roc_curve(
    'Decision Tree',
    best_multilable_dt_smote,
    X_valid,
    y_valid
)

# ---------- KNN ----------
plot_roc_curve(
    'KNN',
    best_multilable_knn_smote,
    X_valid,
    y_valid
)

#### Evaluation on Test Set

In [None]:
# ---------- Decision Tree ----------
y_pred_test_multilabel_dt_smote = best_multilable_dt_smote.predict(test_features_flat)

show_evaluation_results(
    "Decision Tree",
    y_pred_test_multilabel_dt_smote,
    test_labels
)

# ---------- KNN ----------
y_pred_test_multilabel_knn_smote = best_multilable_knn_smote.predict(test_features_flat)

show_evaluation_results(
    "KNN",
    y_pred_test_multilabel_knn_smote,
    test_labels
)

##### Confusion Matrix Heatmap

In [None]:
# ---------- Decision Tree ----------
plot_confusion_matrix(
    'Decision Tree',
    y_pred_test_multilabel_dt_smote,
    test_labels
)

# ---------- KNN ----------
plot_confusion_matrix(
    'KNN',
    y_pred_test_multilabel_knn_smote,
    test_labels
)

##### ROC-AUC Curves

In [None]:
# ---------- Decision Tree ----------
plot_roc_curve(
    'Decision Tree',
    best_multilable_dt_smote,
    test_features_flat,
    test_labels
)

# ---------- KNN ----------
plot_roc_curve(
    'KNN',
    best_multilable_knn_smote,
    test_features_flat,
    test_labels
)

## Combine All Train and Test Images

In [None]:
all_features = np.vstack([train_features, test_features])
all_labels = pd.concat([train_labels, test_labels], axis=0)

all_features_flat = all_features.reshape(all_features.shape[0], -1)

X_train, X_test, y_train, y_test = train_test_split(
    all_features_flat,
    all_labels,
    test_size=0.2,
    stratify=all_labels,
    random_state=123
)

## Save models

To save scikit-learn models, we use `joblib` which is more efficient for large numpy arrays:

In [None]:
import joblib

joblib.dump(best_multilable_knn_smote, 'best_multilabel_knn_smote.pkl')
joblib.dump(best_multilable_dt_smote, 'best_multilabel_dt_smote.pkl')