# Classical Machine Learning Baseline for Coffee Bean Defect Detection
Extracts hand-crafted features and trains traditional classifiers

In [1]:
import numpy as np
import cv2
import pickle
import json
import time
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, roc_auc_score
from skimage.feature import graycomatrix, graycoprops, local_binary_pattern
import mahotas
from tqdm.auto import tqdm

# Set random seed
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

In [2]:
EPS = 1e-6

In [3]:
# Set project paths
PROJECT_ROOT = Path('/home/tony/research_project/iate_project')
SPLITS_DIR = PROJECT_ROOT / 'data' / 'splits'
RESULTS_DIR = PROJECT_ROOT / 'results'
MODELS_DIR = RESULTS_DIR / 'models'
METRICS_DIR = RESULTS_DIR / 'metrics'

In [4]:
# Create directories
MODELS_DIR.mkdir(parents=True, exist_ok=True)
METRICS_DIR.mkdir(parents=True, exist_ok=True)

# 1. LOADING DATA SPLITS

In [5]:
with open(SPLITS_DIR / 'splits.pkl', 'rb') as f:
    splits = pickle.load(f)

train_paths = splits['train_paths']
train_labels = splits['train_labels']
val_paths = splits['val_paths']
val_labels = splits['val_labels']
test_paths = splits['test_paths']
test_labels = splits['test_labels']

print(f"Train: {len(train_paths)} images")
print(f"Validation: {len(val_paths)} images")
print(f"Test: {len(test_paths)} images")

Train: 3780 images
Validation: 810 images
Test: 810 images


# 2. DEFINING FEATURE EXTRACTORS

In [6]:
def extract_color_features(image):
    """Extract color histogram and statistical features"""
    features = []

    # Convert to different color spaces
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)

    # Color histograms for each channel
    for i in range(3):
        # BGR histogram
        hist_bgr = cv2.calcHist([image], [i], None, [32], [0, 256])
        features.extend(hist_bgr.flatten())

        # HSV histogram
        if i == 0:  # Hue
            hist_hsv = cv2.calcHist([hsv], [i], None, [32], [0, 180])
        else:  # Saturation and Value
            hist_hsv = cv2.calcHist([hsv], [i], None, [32], [0, 256])
        features.extend(hist_hsv.flatten())

    # Color moments (mean, std, skewness)
    for channel in cv2.split(image):
        mu = float(np.mean(channel))
        sigma = float(np.std(channel))
        z3 = 0.0 if sigma < EPS else float(np.mean(((channel - mu) / (sigma + EPS)) ** 3))
        features.extend([mu, sigma, abs(z3)])


    # LAB color statistics
    for channel in cv2.split(lab):
        features.append(np.mean(channel))
        features.append(np.std(channel))

    return features

In [7]:
def extract_texture_features(image):
    """Extract texture features using GLCM, LBP, and Haralick"""
    features = []

    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # GLCM features
    distances = [1, 3, 5]
    angles = [0, np.pi/4, np.pi/2, 3*np.pi/4]
    glcm = graycomatrix(gray, distances=distances, angles=angles,
                        levels=256, symmetric=True, normed=True)

    # GLCM properties
    props = ['contrast', 'dissimilarity', 'homogeneity', 'energy', 'correlation']
    for prop in props:
        glcm_prop = graycoprops(glcm, prop)
        features.extend(glcm_prop.flatten())

    # Local Binary Pattern
    radius = 3
    n_points = 8 * radius
    lbp = local_binary_pattern(gray, n_points, radius, method='uniform')
    lbp_hist, _ = np.histogram(lbp.ravel(), bins=np.arange(0, n_points + 3),
                               range=(0, n_points + 2))
    features.extend(lbp_hist)

    # Haralick texture features
    try:
        haralick = mahotas.features.haralick(gray).mean(axis=0)
        features.extend(haralick)
    except:
        features.extend([0] * 13)  # 13 Haralick features

    return features

In [8]:
def extract_shape_features(image):
    """Extract shape and edge features"""
    features = []

    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Edge detection
    edges = cv2.Canny(gray, 50, 150)
    features.append(np.sum(edges > 0))  # Edge pixel count
    features.append(np.sum(edges > 0) / edges.size)  # Edge density

    # Contour features
    contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if contours:
        # Find largest contour
        largest_contour = max(contours, key=cv2.contourArea)

        # Contour area and perimeter
        area = cv2.contourArea(largest_contour)
        perimeter = cv2.arcLength(largest_contour, True)
        features.extend([area, perimeter])

        # Circularity
        if perimeter > 0:
            circularity = 4 * np.pi * area / (perimeter ** 2)
            features.append(circularity)
        else:
            features.append(0)

        # Bounding box
        x, y, w, h = cv2.boundingRect(largest_contour)
        aspect_ratio = float(w) / h if h > 0 else 0
        extent = float(area) / (w * h) if w * h > 0 else 0
        features.extend([aspect_ratio, extent])

        # Hu moments
        moments = cv2.moments(largest_contour)
        hu_moments = cv2.HuMoments(moments).flatten()
        features.extend(hu_moments)
    else:
        features.extend([0] * 14)  # Fill with zeros if no contours

    return features

In [9]:
def extract_frequency_features(image):
    """Extract frequency domain features using FFT"""
    features = []

    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Apply FFT
    f_transform = np.fft.fft2(gray)
    f_shift = np.fft.fftshift(f_transform)
    magnitude_spectrum = np.abs(f_shift)

    # Radial profile
    center = np.array(magnitude_spectrum.shape) // 2
    y, x = np.ogrid[:magnitude_spectrum.shape[0], :magnitude_spectrum.shape[1]]
    r = np.sqrt((x - center[1])**2 + (y - center[0])**2).astype(int)

    # Binned radial average
    bins = np.arange(0, r.max() + 1, 5)
    radial_prof = []
    for i in range(len(bins) - 1):
        mask = (r >= bins[i]) & (r < bins[i+1])
        radial_prof.append(float(np.mean(magnitude_spectrum[mask])) if np.any(mask) else 0.0)

    features.extend(radial_prof[:20])  # Use first 20 bins

    # Frequency statistics
    features.append(np.mean(magnitude_spectrum))
    features.append(np.std(magnitude_spectrum))
    features.append(np.max(magnitude_spectrum))

    return features

In [10]:
def extract_all_features(image_path):
    """Extract all features from an image"""
    # Read and resize image
    image = cv2.imread(image_path)
    if image is None:
        return None
    image = cv2.resize(image, (256, 256))

    # Extract features
    color_features = extract_color_features(image)
    texture_features = extract_texture_features(image)
    shape_features = extract_shape_features(image)
    freq_features = extract_frequency_features(image)

    # Combine all features
    all_features = color_features + texture_features + shape_features + freq_features
    all_features = np.nan_to_num(all_features, nan=0.0, posinf=0.0, neginf=0.0)

    return np.array(all_features, dtype=np.float32)

# 3. EXTRACTING FEATURES FROM TRAINING SET

In [11]:
X_train = []
y_train = []
for path, label in tqdm(list(zip(train_paths, train_labels)), desc="Extracting train features"):
    features = extract_all_features(path)
    if features is not None:
        X_train.append(features)
        y_train.append(label)

X_train = np.array(X_train)
y_train = np.array(y_train)

print(f"Train features shape: {X_train.shape}")

Extracting train features:   0%|          | 0/3780 [00:00<?, ?it/s]

Train features shape: (3780, 343)


# 4. EXTRACTING FEATURES FROM VALIDATION SET

In [12]:
X_val = []
y_val = []  # NEW
for path, label in tqdm(list(zip(val_paths, val_labels)), desc="Extracting val features"):
    features = extract_all_features(path)
    if features is not None:
        X_val.append(features)
        y_val.append(label)

X_val = np.array(X_val)
y_val = np.array(y_val)

print(f"Validation features shape: {X_val.shape}")

Extracting val features:   0%|          | 0/810 [00:00<?, ?it/s]

Validation features shape: (810, 343)


# 5. NORMALIZING FEATURES

In [13]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)

print(f"Features normalized using StandardScaler")
print(f"Mean: {np.mean(X_train_scaled):.4f}, Std: {np.std(X_train_scaled):.4f}")

# Save scaler
with open(MODELS_DIR / 'baseline_scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)

Features normalized using StandardScaler
Mean: 0.0000, Std: 0.9985


# 6. TRAINING CLASSIFIERS

In [14]:
classifiers = {
    'RandomForest': RandomForestClassifier(
        n_estimators=200,
        max_depth=20,
        min_samples_split=10,
        min_samples_leaf=5,
        random_state=RANDOM_SEED,
        n_jobs=-1
    ),
    'GradientBoosting': GradientBoostingClassifier(
        n_estimators=100,
        learning_rate=0.1,
        max_depth=5,
        random_state=RANDOM_SEED
    ),
    'SVM': SVC(
        kernel='rbf',
        C=1.0,
        gamma='scale',
        probability=True,
        random_state=RANDOM_SEED
    )
}

results = {}

for clf_name, clf in classifiers.items():
    print(f"\nTraining {clf_name}...")
    start_time = time.time()

    # Train
    clf.fit(X_train_scaled, y_train)

    # Predict on validation
    y_pred = clf.predict(X_val_scaled)
    y_pred_proba = clf.predict_proba(X_val_scaled)[:, 1] if hasattr(clf, 'predict_proba') else y_pred

    # Calculate metrics
    accuracy = accuracy_score(y_val, y_pred)
    precision, recall, f1, _ = precision_recall_fscore_support(y_val, y_pred, average='binary')
    auc = roc_auc_score(y_val, y_pred_proba)
    cm = confusion_matrix(y_val, y_pred)

    train_time = time.time() - start_time

    # Store results
    results[clf_name] = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'auc': auc,
        'confusion_matrix': cm.tolist(),
        'train_time': train_time
    }

    # Save model
    model_path = MODELS_DIR / f'baseline_{clf_name.lower()}.pkl'
    with open(model_path, 'wb') as f:
        pickle.dump(clf, f)

    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall: {recall:.4f}")
    print(f"  F1-Score: {f1:.4f}")
    print(f"  AUC: {auc:.4f}")
    print(f"  Training time: {train_time:.2f}s")
    print(f"  Model saved to: {model_path}")

# Select best model based on F1 score
best_model_name = max(results, key=lambda x: results[x]['f1_score'])
print(f"\nBEST MODEL: {best_model_name}")
print("-"*40)
print(f"F1-Score: {results[best_model_name]['f1_score']:.4f}")


Training RandomForest...
  Accuracy: 0.8877
  Precision: 0.8874
  Recall: 0.7593
  F1-Score: 0.8184
  AUC: 0.9533
  Training time: 0.59s
  Model saved to: /home/tony/research_project/iate_project/results/models/baseline_randomforest.pkl

Training GradientBoosting...
  Accuracy: 0.8975
  Precision: 0.8880
  Recall: 0.7926
  F1-Score: 0.8376
  AUC: 0.9647
  Training time: 33.17s
  Model saved to: /home/tony/research_project/iate_project/results/models/baseline_gradientboosting.pkl

Training SVM...
  Accuracy: 0.8815
  Precision: 0.8718
  Recall: 0.7556
  F1-Score: 0.8095
  AUC: 0.9451
  Training time: 2.18s
  Model saved to: /home/tony/research_project/iate_project/results/models/baseline_svm.pkl

BEST MODEL: GradientBoosting
----------------------------------------
F1-Score: 0.8376


# 7. EVALUATING ON TEST SET

In [15]:
# Extract test features
X_test = []
y_test = []
for path, label in tqdm(list(zip(test_paths, test_labels)), desc="Extracting test features"):
    features = extract_all_features(path)
    if features is not None:
        X_test.append(features)
        y_test.append(label)

X_test = np.array(X_test)
y_test = np.array(y_test)
X_test_scaled = scaler.transform(X_test)

print(f"Test features shape: {X_test.shape}")

# Load best model
with open(MODELS_DIR / f'baseline_{best_model_name.lower()}.pkl', 'rb') as f:
    best_model = pickle.load(f)

# Predict on test set
y_test_pred = best_model.predict(X_test_scaled)
y_test_proba = best_model.predict_proba(X_test_scaled)[:, 1] if hasattr(best_model, 'predict_proba') else y_test_pred

# Calculate test metrics
test_accuracy = accuracy_score(y_test, y_test_pred)
test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(y_test, y_test_pred, average='binary')
test_auc = roc_auc_score(y_test, y_test_proba)
test_cm = confusion_matrix(y_test, y_test_pred)

print(f"\nTest Set Results ({best_model_name}):")
print(f"  Accuracy: {test_accuracy:.4f}")
print(f"  Precision: {test_precision:.4f}")
print(f"  Recall: {test_recall:.4f}")
print(f"  F1-Score: {test_f1:.4f}")
print(f"  AUC: {test_auc:.4f}")

print(f"\nConfusion Matrix:")
print(f"  TN: {test_cm[0,0]}, FP: {test_cm[0,1]}")
print(f"  FN: {test_cm[1,0]}, TP: {test_cm[1,1]}")

Extracting test features:   0%|          | 0/810 [00:00<?, ?it/s]

Test features shape: (810, 343)

Test Set Results (GradientBoosting):
  Accuracy: 0.9123
  Precision: 0.9307
  Recall: 0.7963
  F1-Score: 0.8583
  AUC: 0.9663

Confusion Matrix:
  TN: 524, FP: 16
  FN: 55, TP: 215


# 8. Save Results

In [16]:
# Save all results
all_results = {
    'validation_results': results,
    'test_results': {
        'model': best_model_name,
        'accuracy': test_accuracy,
        'precision': test_precision,
        'recall': test_recall,
        'f1_score': test_f1,
        'auc': test_auc,
        'confusion_matrix': test_cm.tolist()
    },
    'feature_dimensions': X_train.shape[1],
    'feature_types': {
        'color': 'Histograms, moments, LAB statistics',
        'texture': 'GLCM, LBP, Haralick',
        'shape': 'Contours, Hu moments, edge density',
        'frequency': 'FFT radial profile'
    }
}

with open(METRICS_DIR / 'baseline_results.json', 'w') as f:
    json.dump(all_results, f, indent=2)

print(f"\nResults saved to: {METRICS_DIR / 'baseline_results.json'}")

# Feature importance for Random Forest
if 'RandomForest' in classifiers:
    rf_model = classifiers['RandomForest']
    feature_importance = rf_model.feature_importances_

    # Save feature importance
    np.save(MODELS_DIR / 'baseline_feature_importance.npy', feature_importance)
    print(f"\nFeature importance saved to: {MODELS_DIR / 'baseline_feature_importance.npy'}")

    # Print top 10 important features
    top_indices = np.argsort(feature_importance)[-10:][::-1]
    print("\nTop 10 Important Features:")
    for i, idx in enumerate(top_indices):
        print(f"  {i+1}. Feature {idx}: {feature_importance[idx]:.4f}")


Results saved to: /home/tony/research_project/iate_project/results/metrics/baseline_results.json

Feature importance saved to: /home/tony/research_project/iate_project/results/models/baseline_feature_importance.npy

Top 10 Important Features:
  1. Feature 310: 0.0891
  2. Feature 313: 0.0281
  3. Feature 312: 0.0249
  4. Feature 314: 0.0228
  5. Feature 263: 0.0174
  6. Feature 311: 0.0146
  7. Feature 315: 0.0144
  8. Feature 266: 0.0132
  9. Feature 200: 0.0126
  10. Feature 93: 0.0125


In [17]:
print(f"\nBest Model: {best_model_name}")
print(f"Test F1-Score: {test_f1:.4f}")


Best Model: GradientBoosting
Test F1-Score: 0.8583
