In [1]:
import os
import time
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix, classification_report, f1_score, accuracy_score
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns
import pandas as pd
import json
import pickle

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Define paths
DATASET_PATH = "../archive/images/images"  # Path to your dataset
RESULTS_DIR = "svm_hyperopt_results_for_kaggleds"

# Check if dataset path exists
if not os.path.exists(DATASET_PATH):
    raise FileNotFoundError(f"Dataset path {DATASET_PATH} does not exist")

# Create results directory
os.makedirs(RESULTS_DIR, exist_ok=True)

# Define feature extraction settings
BATCH_SIZE = 32
FEATURE_EXTRACTOR = 'resnet50'  # Using ResNet-50 as feature extractor
NUM_WORKERS = 4  # Adjust based on your CPU

# Function to check device availability
def get_device():
    print("PyTorch version:", torch.__version__)
    if torch.backends.mps.is_available():
        device = torch.device("mps")
        print("Using MPS device for feature extraction")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
        print(f"Using CUDA device for feature extraction: {torch.cuda.get_device_name(0)}")
    else:
        device = torch.device("cpu")
        print("Using CPU device for feature extraction")
    return device

# Data preprocessing and loading functions
def get_transforms():
    """Define image transformations for feature extraction"""
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    return transform

def load_data(batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, device=None):
    """Load and prepare the dataset with train/val/test splits"""
    transform = get_transforms()
    
    try:
        # Load the full dataset
        full_dataset = datasets.ImageFolder(DATASET_PATH, transform=transform)
        class_names = full_dataset.classes
        num_classes = len(class_names)
        
        # Calculate sizes for splits (70% train, 20% val, 10% test)
        total_size = len(full_dataset)
        train_size = int(0.7 * total_size)
        val_size = int(0.2 * total_size)
        test_size = total_size - train_size - val_size
        
        # Split the dataset
        train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
            full_dataset, [train_size, val_size, test_size],
            generator=torch.Generator().manual_seed(42)  # Ensure reproducibility
        )
        
        # Set pin_memory based on device
        pin_memory = False
        if device is not None and device.type != 'cpu':
            pin_memory = True
        
        # Create data loaders
        train_loader = DataLoader(
            train_dataset, 
            batch_size=batch_size, 
            shuffle=False,  # No need to shuffle for feature extraction
            num_workers=num_workers,
            pin_memory=pin_memory
        )
        
        val_loader = DataLoader(
            val_dataset, 
            batch_size=batch_size, 
            shuffle=False, 
            num_workers=num_workers,
            pin_memory=pin_memory
        )
        
        test_loader = DataLoader(
            test_dataset, 
            batch_size=batch_size, 
            shuffle=False, 
            num_workers=num_workers,
            pin_memory=pin_memory
        )
        
        print(f"Dataset loaded: {len(train_dataset)} training, {len(val_dataset)} validation, {len(test_dataset)} test images")
        print(f"Classes: {class_names}")
        
        # Print dataset distribution
        class_counts = {class_name: 0 for class_name in class_names}
        for _, class_idx in full_dataset.samples:
            class_counts[class_names[class_idx]] += 1
            
        print("Class distribution:")
        for class_name, count in class_counts.items():
            print(f"  {class_name}: {count} images ({count/total_size:.1%})")
        
        return train_loader, val_loader, test_loader, class_names, num_classes
    
    except Exception as e:
        print(f"Error loading dataset: {e}")
        raise

# Feature extraction functions
def create_feature_extractor(model_name=FEATURE_EXTRACTOR, device=None):
    """Create a model for feature extraction"""
    try:
        if model_name == 'resnet50':
            model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
            # Remove the classification layer
            model = nn.Sequential(*list(model.children())[:-1])
        else:
            raise ValueError(f"Unsupported model: {model_name}")
        
        # Set to evaluation mode
        model.eval()
        
        # Move to appropriate device
        if device is not None:
            model = model.to(device)
        
        print(f"Feature extractor created: {model_name}")
        
        return model
    
    except Exception as e:
        print(f"Error creating feature extractor: {e}")
        raise

def extract_features(model, data_loader, device):
    """Extract features from images using the feature extractor model"""
    features = []
    labels = []
    
    with torch.no_grad():
        for inputs, targets in tqdm(data_loader, desc="Extracting features"):
            inputs = inputs.to(device)
            # Forward pass to get features
            output = model(inputs)
            # Flatten the features
            output = output.view(output.size(0), -1)
            # Move to CPU and convert to numpy
            features.append(output.cpu().numpy())
            labels.append(targets.numpy())
    
    # Concatenate all batches
    features = np.vstack(features)
    labels = np.concatenate(labels)
    
    return features, labels

# SVM hyperparameter optimization
def optimize_svm_hyperparameters(X_train, y_train, X_val, y_val, cv=5, n_iter=25):
    """Optimize SVM hyperparameters using RandomizedSearchCV with cross-validation"""
    print(f"Starting SVM hyperparameter optimization with {n_iter} iterations...")
    
    # Define hyperparameter space to search
    param_grid = {
        'C': np.logspace(-3, 3, 20),  # Regularization parameter
        'gamma': np.logspace(-4, 1, 20),  # Kernel coefficient
        'kernel': ['rbf', 'linear', 'poly', 'sigmoid'],
        'class_weight': ['balanced', None],
        'decision_function_shape': ['ovr', 'ovo'],
    }
    
    # Create base SVM model
    svm = SVC(probability=True, random_state=42)
    
    # Use RandomizedSearchCV for efficient search
    # Note: This uses cross-validation on the training set
    random_search = RandomizedSearchCV(
        estimator=svm,
        param_distributions=param_grid,
        n_iter=n_iter,
        cv=cv,
        scoring='f1_weighted',
        n_jobs=-1,  # Use all available cores
        verbose=2,
        random_state=42
    )
    
    # Fit the random search on training data
    start_time = time.time()
    random_search.fit(X_train, y_train)
    search_time = time.time() - start_time
    
    print(f"Best parameters found: {random_search.best_params_}")
    print(f"Best cross-validation score: {random_search.best_score_:.4f}")
    print(f"Hyperparameter search completed in {search_time:.2f} seconds")
    
    # Evaluate on validation set
    best_svm = random_search.best_estimator_
    val_accuracy = best_svm.score(X_val, y_val)
    y_val_pred = best_svm.predict(X_val)
    val_f1 = f1_score(y_val, y_val_pred, average='weighted')
    
    print(f"Validation accuracy with best parameters: {val_accuracy:.4f}")
    print(f"Validation F1-score with best parameters: {val_f1:.4f}")
    
    # Create and save search results summary
    results = {
        'best_params': random_search.best_params_,
        'best_cv_score': float(random_search.best_score_),
        'validation_accuracy': float(val_accuracy),
        'validation_f1': float(val_f1),
        'search_time': search_time,
        'cv_results': pd.DataFrame(random_search.cv_results_).to_dict()
    }
    
    # Create a DataFrame of the results for easier visualization
    cv_results_df = pd.DataFrame(random_search.cv_results_)
    
    return best_svm, results, cv_results_df

# Evaluation Function for SVM
def evaluate_model(model, X, y, class_names, save_path=None):
    """Evaluate the SVM model and generate detailed metrics and visualizations"""
    try:
        # Predict
        y_pred = model.predict(X)
        
        # Calculate metrics
        accuracy = accuracy_score(y, y_pred)
        cm = confusion_matrix(y, y_pred)
        report = classification_report(y, y_pred, target_names=class_names, output_dict=True)
        report_str = classification_report(y, y_pred, target_names=class_names)
        f1 = f1_score(y, y_pred, average='weighted')
        
        # Print report
        print(f'Accuracy: {accuracy:.4f}, F1 Score: {f1:.4f}')
        print('\nClassification Report:')
        print(report_str)
        
        if save_path:
            # Save detailed report
            with open(f"{save_path}_report.txt", 'w') as f:
                f.write(f'Accuracy: {accuracy:.4f}\n')
                f.write(f'F1 Score: {f1:.4f}\n\n')
                f.write(report_str)
                
            # Save report as JSON for further analysis
            with open(f"{save_path}_report.json", 'w') as f:
                json.dump(report, f, indent=4)
            
            # Plot confusion matrix
            plt.figure(figsize=(10, 8))
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.title('Confusion Matrix')
            plt.tight_layout()
            plt.savefig(f"{save_path}_confusion_matrix.png")
            plt.close()
            
            # Plot normalized confusion matrix
            plt.figure(figsize=(10, 8))
            cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
            sns.heatmap(cm_norm, annot=True, fmt='.2f', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.title('Normalized Confusion Matrix')
            plt.tight_layout()
            plt.savefig(f"{save_path}_confusion_matrix_norm.png")
            plt.close()
            
            print(f'Evaluation results saved to {save_path}')
        
        return accuracy, report
    
    except Exception as e:
        print(f"Error evaluating model: {e}")
        raise

# Plot hyperparameter search results
def plot_search_results(cv_results_df, save_path):
    """Plot the results of hyperparameter search for visualization"""
    # Create directory for plots
    plots_dir = os.path.join(save_path, 'hyperparameter_plots')
    os.makedirs(plots_dir, exist_ok=True)
    
    # 1. Plot mean test scores by parameters
    plt.figure(figsize=(15, 10))
    
    # 1.1 C parameter
    plt.subplot(2, 2, 1)
    # Group by C parameter and calculate mean score
    C_scores = cv_results_df.groupby('param_C')['mean_test_score'].mean().reset_index()
    C_scores['param_C'] = C_scores['param_C'].astype(float)
    C_scores = C_scores.sort_values('param_C')
    
    plt.semilogx(C_scores['param_C'], C_scores['mean_test_score'], 'o-')
    plt.xlabel('C parameter (log scale)')
    plt.ylabel('Mean F1 Score')
    plt.title('Effect of C Parameter on Performance')
    plt.grid(True)
    
    # 1.2 Gamma parameter (for non-linear kernels)
    plt.subplot(2, 2, 2)
    # Filter for non-linear kernels
    non_linear_kernels = cv_results_df[cv_results_df['param_kernel'].isin(['rbf', 'poly', 'sigmoid'])]
    
    if not non_linear_kernels.empty:
        # Group by gamma parameter and calculate mean score
        gamma_scores = non_linear_kernels.groupby('param_gamma')['mean_test_score'].mean().reset_index()
        gamma_scores['param_gamma'] = gamma_scores['param_gamma'].astype(float)
        gamma_scores = gamma_scores.sort_values('param_gamma')
        
        plt.semilogx(gamma_scores['param_gamma'], gamma_scores['mean_test_score'], 'o-')
        plt.xlabel('Gamma parameter (log scale)')
        plt.ylabel('Mean F1 Score')
        plt.title('Effect of Gamma Parameter on Performance')
        plt.grid(True)
    
    # 1.3 Kernel type
    plt.subplot(2, 2, 3)
    kernel_scores = cv_results_df.groupby('param_kernel')['mean_test_score'].mean().reset_index()
    plt.bar(kernel_scores['param_kernel'], kernel_scores['mean_test_score'])
    plt.xlabel('Kernel Type')
    plt.ylabel('Mean F1 Score')
    plt.title('Effect of Kernel Type on Performance')
    plt.grid(True)
    
    # 1.4 Class weight
    plt.subplot(2, 2, 4)
    weight_scores = cv_results_df.groupby('param_class_weight')['mean_test_score'].mean().reset_index()
    plt.bar(weight_scores['param_class_weight'].astype(str), weight_scores['mean_test_score'])
    plt.xlabel('Class Weight')
    plt.ylabel('Mean F1 Score')
    plt.title('Effect of Class Weight on Performance')
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig(os.path.join(plots_dir, 'parameter_effects.png'))
    plt.close()
    
    # 2. Plot top N parameter combinations
    plt.figure(figsize=(12, 8))
    top_n = min(20, len(cv_results_df))
    top_results = cv_results_df.sort_values('mean_test_score', ascending=False).head(top_n)
    
    # Create parameter combination labels
    param_labels = []
    for i, row in top_results.iterrows():
        label = f"C={row['param_C']:.2e}, γ={row['param_gamma']:.2e}, {row['param_kernel'][:3]}"
        param_labels.append(label)
    
    plt.barh(range(len(top_results)), top_results['mean_test_score'], align='center')
    plt.yticks(range(len(top_results)), param_labels)
    plt.xlabel('Mean F1 Score')
    plt.title(f'Top {top_n} Parameter Combinations')
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(os.path.join(plots_dir, 'top_parameters.png'))
    plt.close()
    
    # 3. Plot score distribution
    plt.figure(figsize=(10, 6))
    plt.hist(cv_results_df['mean_test_score'], bins=20, alpha=0.7, color='blue')
    plt.axvline(cv_results_df['mean_test_score'].max(), color='red', linestyle='dashed', 
                linewidth=2, label=f'Best score: {cv_results_df["mean_test_score"].max():.4f}')
    plt.xlabel('Mean F1 Score')
    plt.ylabel('Count')
    plt.title('Distribution of F1 Scores')
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(plots_dir, 'score_distribution.png'))
    plt.close()
    
    print(f"Hyperparameter search plots saved to {plots_dir}")

# Main function for SVM hyperparameter optimization
def main():
    print("\n" + "="*50)
    print("SVM Hyperparameter Optimization for Recycling Material Classification")
    print("="*50)
    
    # Create directory for hyperopt results
    hyperopt_dir = os.path.join(RESULTS_DIR, "hyperopt")
    os.makedirs(hyperopt_dir, exist_ok=True)
    
    # Get device
    device = get_device()
    
    # Load data - pass device to function
    train_loader, val_loader, test_loader, class_names, num_classes = load_data(device=device)
    
    # Create feature extractor
    feature_extractor = create_feature_extractor(model_name=FEATURE_EXTRACTOR, device=device)
    
    # Extract features for all sets
    print("\nExtracting features from training data...")
    X_train, y_train = extract_features(feature_extractor, train_loader, device)
    print(f"Training features shape: {X_train.shape}, Labels shape: {y_train.shape}")
    
    print("\nExtracting features from validation data...")
    X_val, y_val = extract_features(feature_extractor, val_loader, device)
    print(f"Validation features shape: {X_val.shape}, Labels shape: {y_val.shape}")
    
    print("\nExtracting features from test data...")
    X_test, y_test = extract_features(feature_extractor, test_loader, device)
    print(f"Test features shape: {X_test.shape}, Labels shape: {y_test.shape}")
    
    # Perform hyperparameter optimization
    print("\nPerforming SVM hyperparameter optimization...")
    best_svm, opt_results, cv_results_df = optimize_svm_hyperparameters(
        X_train, y_train, X_val, y_val, cv=5, n_iter=25
    )
    
    # Save hyperparameter search results
    with open(os.path.join(hyperopt_dir, 'hyperparameter_search_results.json'), 'w') as f:
        # Convert numpy values to Python native types for JSON serialization
        results_json = {k: v for k, v in opt_results.items() if k != 'cv_results'}
        json.dump(results_json, f, indent=4)
    
    # Save CV results DataFrame
    cv_results_df.to_csv(os.path.join(hyperopt_dir, 'cv_results.csv'), index=False)
    
    # Plot hyperparameter search results
    plot_search_results(cv_results_df, hyperopt_dir)
    
    # Save the best model
    best_model_path = os.path.join(hyperopt_dir, 'best_svm_model.pkl')
    with open(best_model_path, 'wb') as f:
        pickle.dump(best_svm, f)
    print(f"Best SVM model saved to {best_model_path}")
    
    # Save configuration
    config = {
        'model': 'SVM',
        'feature_extractor': FEATURE_EXTRACTOR,
        'best_parameters': best_svm.get_params(),
        'batch_size': BATCH_SIZE,
        'num_workers': NUM_WORKERS,
        'device': str(device),
        'dataset_path': DATASET_PATH,
        'num_classes': num_classes,
        'class_names': class_names,
        'feature_dimension': X_train.shape[1],
        'hyperparameter_search': {
            'method': 'RandomizedSearchCV',
            'n_iter': 25,
            'cv': 5,
            'scoring': 'f1_weighted'
        }
    }
    
    with open(os.path.join(hyperopt_dir, 'config.json'), 'w') as f:
        json.dump(config, f, indent=4)
    
    # Evaluate best model on validation set
    print("\nEvaluating best model on validation set:")
    val_results_path = os.path.join(hyperopt_dir, 'validation_results')
    val_accuracy, val_report = evaluate_model(best_svm, X_val, y_val, class_names, save_path=val_results_path)
    
    # Evaluate best model on test set
    print("\nEvaluating best model on test set:")
    test_results_path = os.path.join(hyperopt_dir, 'test_results')
    test_accuracy, test_report = evaluate_model(best_svm, X_test, y_test, class_names, save_path=test_results_path)
    
    # Save model summary
    model_summary = {
        'model_type': f'SVM with {FEATURE_EXTRACTOR} features (optimized)',
        'feature_extractor': FEATURE_EXTRACTOR,
        'num_classes': num_classes,
        'class_names': class_names,
        'hyperparameters': best_svm.get_params(),
        'val_accuracy': val_accuracy,
        'test_accuracy': test_accuracy,
        'val_f1_score': val_report['weighted avg']['f1-score'],
        'test_f1_score': test_report['weighted avg']['f1-score'],
        'per_class_f1': {cls: test_report[cls]['f1-score'] for cls in class_names}
    }
    
    with open(os.path.join(hyperopt_dir, 'model_summary.json'), 'w') as f:
        json.dump(model_summary, f, indent=4)
    
    print("\n" + "="*50)
    print("OPTIMIZED SVM MODEL SUMMARY")
    print("="*50)
    print(f"Model: SVM with {FEATURE_EXTRACTOR} features")
    print(f"Classes: {class_names}")
    print(f"Best Parameters: {best_svm.get_params()}")
    print(f"Feature extractor: {FEATURE_EXTRACTOR}")
    print(f"Feature dimension: {X_train.shape[1]}")
    print(f"Validation Accuracy: {val_accuracy:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Validation F1 Score: {val_report['weighted avg']['f1-score']:.4f}")
    print(f"Test F1 Score: {test_report['weighted avg']['f1-score']:.4f}")
    print("="*50)
    print(f"Full results saved to {hyperopt_dir}")
    
    return best_svm, test_accuracy, test_report

if __name__ == "__main__":
    main()


SVM Hyperparameter Optimization for Recycling Material Classification
PyTorch version: 2.6.0
Using MPS device for feature extraction
Dataset loaded: 10500 training, 3000 validation, 1500 test images
Classes: ['aerosol_cans', 'aluminum_food_cans', 'aluminum_soda_cans', 'cardboard_boxes', 'cardboard_packaging', 'clothing', 'coffee_grounds', 'disposable_plastic_cutlery', 'eggshells', 'food_waste', 'glass_beverage_bottles', 'glass_cosmetic_containers', 'glass_food_jars', 'magazines', 'newspaper', 'office_paper', 'paper_cups', 'plastic_cup_lids', 'plastic_detergent_bottles', 'plastic_food_containers', 'plastic_shopping_bags', 'plastic_soda_bottles', 'plastic_straws', 'plastic_trash_bags', 'plastic_water_bottles', 'shoes', 'steel_food_cans', 'styrofoam_cups', 'styrofoam_food_containers', 'tea_bags']
Class distribution:
  aerosol_cans: 500 images (3.3%)
  aluminum_food_cans: 500 images (3.3%)
  aluminum_soda_cans: 500 images (3.3%)
  cardboard_boxes: 500 images (3.3%)
  cardboard_packaging: 

Extracting features: 100%|████████████████████| 329/329 [01:30<00:00,  3.64it/s]


Training features shape: (10500, 2048), Labels shape: (10500,)

Extracting features from validation data...


Extracting features: 100%|██████████████████████| 94/94 [00:42<00:00,  2.19it/s]


Validation features shape: (3000, 2048), Labels shape: (3000,)

Extracting features from test data...


Extracting features: 100%|██████████████████████| 47/47 [00:33<00:00,  1.41it/s]

Test features shape: (1500, 2048), Labels shape: (1500,)

Performing SVM hyperparameter optimization...
Starting SVM hyperparameter optimization with 25 iterations...
Fitting 5 folds for each of 25 candidates, totalling 125 fits





[CV] END C=0.004281332398719396, class_weight=None, decision_function_shape=ovr, gamma=0.8858667904100823, kernel=rbf; total time=20.1min
[CV] END C=2.976351441631316, class_weight=None, decision_function_shape=ovo, gamma=0.0006158482110660267, kernel=rbf; total time= 4.7min
[CV] END C=0.6951927961775606, class_weight=None, decision_function_shape=ovr, gamma=0.26366508987303583, kernel=rbf; total time=18.9min
[CV] END C=12.742749857031322, class_weight=None, decision_function_shape=ovo, gamma=0.00379269019073225, kernel=poly; total time= 5.9min
[CV] END C=1000.0, class_weight=balanced, decision_function_shape=ovo, gamma=2.9763514416313193, kernel=sigmoid; total time=17.4min
[CV] END C=0.6951927961775606, class_weight=balanced, decision_function_shape=ovr, gamma=0.023357214690901212, kernel=sigmoid; total time= 4.2min
[CV] END C=0.001, class_weight=balanced, decision_function_shape=ovo, gamma=0.14384498882876628, kernel=poly; total time= 5.7min
[CV] END C=0.0379269019073225, class_weigh