In [None]:
# Multi-Objective Genetic Algorithm for Feature Selection with KNN
# Optimizing precision, recall, and F1-score simultaneously

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import StratifiedKFold, cross_val_score, cross_validate, train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import (
    precision_score, recall_score, f1_score, accuracy_score, 
    roc_auc_score, average_precision_score, classification_report, 
    confusion_matrix
)
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE
from deap import base, creator, tools, algorithms
import random
import warnings
import io
from io import StringIO
from collections import defaultdict
import statistics
from scipy.io import arff
from google.colab import files
import joblib
import os

# Suppress warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
random.seed(42)

# Reset DEAP classes to avoid errors when re-running
if 'FitnessMulti' in creator.__dict__:
    del creator.FitnessMulti
if 'MultiIndividual' in creator.__dict__:
    del creator.MultiIndividual

# Create DEAP multi-objective fitness and individual classes
# Weights (1.0, 1.0, 1.0) means maximize all three objectives: precision, recall, f1
creator.create("FitnessMulti", base.Fitness, weights=(1.0, 1.0, 1.0))
creator.create("MultiIndividual", list, fitness=creator.FitnessMulti)

# --- Data Loading and Preprocessing Functions ---

def upload_and_load_file():
    """Upload CSV or ARFF file and return dataframe"""
    print("Please upload your Promise dataset file (CSV or ARFF format)...")
    uploaded = files.upload()
    
    if not uploaded:
        print("No file uploaded.")
        return None
    
    filename = list(uploaded.keys())[0]  # Get the first uploaded file
    
    if filename.endswith('.csv'):
        df = pd.read_csv(io.BytesIO(uploaded[filename]))
        print(f"Loaded CSV {filename}: {df.shape}")
    elif filename.endswith('.arff'):
        try:
            # Try with default loading
            data, meta = arff.loadarff(io.BytesIO(uploaded[filename]))
            df = pd.DataFrame(data)
        except TypeError:
            # If TypeError about bytes-like object, decode to string first
            arff_content = uploaded[filename].decode('utf-8')
            data, meta = arff.loadarff(io.StringIO(arff_content))
            df = pd.DataFrame(data)
        
        # Convert byte strings to regular strings for object columns
        for col in df.columns:
            if df[col].dtype == 'object':
                try:
                    df[col] = df[col].str.decode('utf-8')
                except AttributeError:
                    pass
        
        print(f"Loaded ARFF {filename}: {df.shape}")
        print(f"Attributes: {list(meta.names())}")
    else:
        print(f"Unsupported file format: {filename}")
        return None
    
    return df

def preprocess_dataset(df, target_column=None):
    """Clean, encode, and prepare dataset for ML"""
    df = df.copy()
    
    # Handle missing values
    numeric_columns = df.select_dtypes(include=[np.number]).columns
    categorical_columns = df.select_dtypes(include=['object']).columns
    
    # Impute missing values
    if len(numeric_columns) > 0:
        num_imputer = SimpleImputer(strategy='median')
        df[numeric_columns] = num_imputer.fit_transform(df[numeric_columns])
    
    if len(categorical_columns) > 0:
        cat_imputer = SimpleImputer(strategy='most_frequent')
        df[categorical_columns] = cat_imputer.fit_transform(df[categorical_columns])
    
    # Handle target column
    if target_column is None:
        # Common target column names in Promise datasets
        possible_targets = ['bug', 'defects', 'class', 'defective', 'Class']
        target_column = None
        
        for col in possible_targets:
            if col in df.columns:
                target_column = col
                break
        
        if target_column is None:
            print(f"Available columns: {list(df.columns)}")
            target_column = input("Please specify the target column name: ")
    
    if target_column in df.columns:
        y = df[target_column]
        X = df.drop(columns=[target_column])
        print(f"Using '{target_column}' as target column")
    else:
        # Assume last column is target
        y = df.iloc[:, -1]
        X = df.iloc[:, :-1]
        print(f"Using last column '{df.columns[-1]}' as target column")
    
    # Encode categorical features
    categorical_features = X.select_dtypes(include=['object']).columns
    label_encoders = {}
    
    for col in categorical_features:
        le = LabelEncoder()
        X[col] = le.fit_transform(X[col].astype(str))
        label_encoders[col] = le
    
    # Encode target if categorical
    if y.dtype == 'object' or y.dtype == 'bool':
        target_encoder = LabelEncoder()
        y = target_encoder.fit_transform(y)
        print(f"Target classes: {list(target_encoder.classes_)}")
        print(f"Target encoding: {dict(zip(target_encoder.classes_, range(len(target_encoder.classes_))))}")
    else:
        target_encoder = None
    
    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=X.columns, index=X.index)
    
    return X_scaled, y, scaler, label_encoders, target_encoder

def apply_smote(X, y, random_state=42):
    """Apply SMOTE to balance the dataset"""
    smote = SMOTE(random_state=random_state)
    X_balanced, y_balanced = smote.fit_resample(X, y)
    
    print(f"Original class distribution: {np.bincount(y)}")
    print(f"Balanced class distribution: {np.bincount(y_balanced)}")
    
    return X_balanced, y_balanced

# --- Multi-Objective Genetic Algorithm Functions ---

def evaluate_knn_multi_cv(individual, X, y, cv_folds=5):
    """Evaluate KNN performance using cross-validation for multiple metrics (precision, recall, f1)"""
    try:
        # For feature selection only
        feature_mask = individual
        k = 5  # Default k
        weights = 'uniform'  # Default weights
        p = 2  # Default p (Euclidean distance)
        
        # Select features
        selected_features = [i for i, mask in enumerate(feature_mask) if mask > 0.5]
        if len(selected_features) == 0:
            return (0.0, 0.0, 0.0)  # No features selected
        
        X_selected = X.iloc[:, selected_features]
        
        # Ensure k is valid (adjust max k based on dataset size)
        max_k = min(20, len(X_selected) // 5)  # Use at most 1/5 of data size for k
        k = max(1, min(k, max_k))
        
        # Create model
        knn = KNeighborsClassifier(n_neighbors=k, weights=weights, p=p)
        
        cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)
        
        # Calculate all three metrics simultaneously
        precision_scores = cross_val_score(knn, X_selected, y, cv=cv, scoring='precision')
        recall_scores = cross_val_score(knn, X_selected, y, cv=cv, scoring='recall')
        f1_scores = cross_val_score(knn, X_selected, y, cv=cv, scoring='f1')
        
        return (np.mean(precision_scores), np.mean(recall_scores), np.mean(f1_scores))
    
    except Exception as e:
        print(f"Evaluation error: {str(e)}")
        return (0.0, 0.0, 0.0)

def run_multi_objective_ga(X, y, n_runs=10, generations=20, pop_size=50):
    """Run multi-objective genetic algorithm for feature selection"""
    print(f"Running Multi-Objective Feature Selection (precision, recall, f1)")
    
    results = []
    
    for run in range(n_runs):
        print(f"  Run {run + 1}/{n_runs}")
        
        # Setup GA
        toolbox = base.Toolbox()
        toolbox.register("attr_bool", random.random)
        toolbox.register("individual", tools.initRepeat, creator.MultiIndividual, 
                         toolbox.attr_bool, n=X.shape[1])
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)
        toolbox.register("evaluate", evaluate_knn_multi_cv, X=X, y=y)
        toolbox.register("mate", tools.cxTwoPoint)
        toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
        toolbox.register("select", tools.selNSGA2)
        
        # Run GA
        population = toolbox.population(n=pop_size)
        hof = tools.ParetoFront()  # Store non-dominated solutions
        
        # Use NSGA-II algorithm for multi-objective optimization
        algorithms.eaMuPlusLambda(
            population, toolbox, 
            mu=pop_size,          # Number of individuals to select for next generation
            lambda_=pop_size,     # Number of children to produce
            cxpb=0.7,             # Crossover probability
            mutpb=0.2,            # Mutation probability
            ngen=generations,     # Number of generations
            halloffame=hof,       # Hall of Fame with Pareto-optimal solutions
            verbose=False
        )
        
        # Store all non-dominated solutions from this run
        for ind in hof:
            results.append(ind)
    
    return results

def evaluate_solution_with_cv(individual, X, y, cv_folds=5):
    """Evaluate a solution using cross-validation and return detailed metrics"""
    try:
        # Parse individual for feature selection
        feature_mask = individual
        k = 5  # Default k
        weights = 'uniform'  # Default weights
        p = 2  # Default p (Euclidean)
        
        # Select features
        selected_features = [i for i, mask in enumerate(feature_mask) if mask > 0.5]
        if len(selected_features) == 0:
            return {metric: 0.0 for metric in ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']}
        
        X_selected = X.iloc[:, selected_features]
        
        # Create model
        knn = KNeighborsClassifier(n_neighbors=k, weights=weights, p=p)
        
        # Define CV strategy
        cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)
        
        # Perform cross-validation with multiple metrics
        scoring = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']
        cv_results = cross_validate(knn, X_selected, y, cv=cv, scoring=scoring)
        
        # Aggregate results
        metrics = {}
        for metric in scoring:
            metrics[metric] = np.mean(cv_results[f'test_{metric}'])
            metrics[f'{metric}_std'] = np.std(cv_results[f'test_{metric}'])
        
        metrics['selected_features'] = len(selected_features)
        metrics['feature_indices'] = selected_features
        
        return metrics
    
    except Exception as e:
        print(f"Evaluation error: {str(e)}")
        return {metric: 0.0 for metric in ['accuracy', 'precision', 'recall', 'f1', 'roc_auc', 'selected_features']}

def select_best_solution(solutions, evaluation_results, metric='f1'):
    """Select the best solution based on a specific metric"""
    if not solutions or not evaluation_results:
        return None, None
        
    # Find best solution based on the provided metric
    best_idx = np.argmax([result[metric] for result in evaluation_results])
    best_solution = solutions[best_idx]
    best_result = evaluation_results[best_idx]
    
    return best_solution, best_result

def build_and_save_best_model(best_solution, X, y, feature_names, save_path='best_knn_model.pkl'):
    """Build and save the best KNN model based on the selected solution"""
    # Parse individual for feature selection
    feature_mask = best_solution
    k = 5  # Default k
    weights = 'uniform'  # Default weights
    p = 2  # Default p (Euclidean)
    
    # Select features
    selected_features = [i for i, mask in enumerate(feature_mask) if mask > 0.5]
    X_selected = X.iloc[:, selected_features]
    
    # Build the model
    knn = KNeighborsClassifier(n_neighbors=k, weights=weights, p=p)
    knn.fit(X_selected, y)
    
    # Create a dictionary with the model and metadata
    model_info = {
        'model': knn,
        'selected_feature_indices': selected_features,
        'selected_feature_names': [feature_names[i] for i in selected_features],
        'hyperparameters': {
            'n_neighbors': k,
            'weights': weights,
            'p': p
        }
    }
    
    # Save the model
    joblib.dump(model_info, save_path)
    print(f"\nBest model saved to: {save_path}")
    files.download(save_path)
    
    return model_info

# --- Results Analysis Functions ---

def display_feature_selection_results(solutions, feature_names):
    """Display feature selection statistics for Pareto-optimal solutions"""
    print("\n=== Feature Selection Analysis ===")
    
    if not solutions:
        print("No solutions to analyze")
        return
    
    # Count features
    feature_counts = defaultdict(int)
    selected_feature_counts = []
    
    for ind in solutions:
        selected_features = [i for i, mask in enumerate(ind) if mask > 0.5]
        selected_feature_counts.append(len(selected_features))
        for i in selected_features:
            if i < len(feature_names):
                feature_counts[feature_names[i]] += 1
    
    # Feature count statistics
    print(f"Number of Pareto-optimal solutions: {len(solutions)}")
    print(f"Average number of features: {np.mean(selected_feature_counts):.2f}")
    print(f"Range of features: {min(selected_feature_counts)} - {max(selected_feature_counts)}")
    
    # Most frequently selected features
    top_features = sorted(feature_counts.items(), key=lambda x: x[1], reverse=True)
    print("\nTop 10 most frequently selected features:")
    for feat, count in top_features[:10]:
        percentage = (count / len(solutions)) * 100
        print(f"  • {feat}: {count}/{len(solutions)} solutions ({percentage:.1f}%)")

def plot_pareto_front(solutions, evaluation_results):
    """Plot Pareto front and related visualizations"""
    if not solutions or not evaluation_results:
        print("Not enough data for visualization")
        return
    
    plt.figure(figsize=(15, 12))
    
    # 1. Precision vs Recall scatter plot (with feature count as size)
    plt.subplot(2, 2, 1)
    precisions = [ind.fitness.values[0] for ind in solutions]
    recalls = [ind.fitness.values[1] for ind in solutions]
    f1s = [ind.fitness.values[2] for ind in solutions]
    feature_counts = [sum(1 for mask in ind if mask > 0.5) for ind in solutions]
    
    scatter = plt.scatter(precisions, recalls, c=f1s, s=[fc*10 for fc in feature_counts], 
                        alpha=0.7, cmap='viridis')
    plt.colorbar(scatter, label='F1 Score')
    plt.xlabel('Precision')
    plt.ylabel('Recall')
    plt.title('Pareto Front: Precision vs Recall')
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # 2. Feature count vs F1 score
    plt.subplot(2, 2, 2)
    plt.scatter(feature_counts, f1s, c=precisions, cmap='plasma', alpha=0.7)
    plt.colorbar(label='Precision')
    plt.xlabel('Number of Features')
    plt.ylabel('F1 Score')
    plt.title('Feature Count vs F1 Score')
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # 3. Bar chart of top features
    plt.subplot(2, 2, 3)
    feature_counts_dict = defaultdict(int)
    for ind in solutions:
        for i, mask in enumerate(ind):
            if mask > 0.5 and i < len(evaluation_results[0]['feature_names']):
                feature_counts_dict[evaluation_results[0]['feature_names'][i]] += 1
    
    top_n = 10
    top_features = sorted(feature_counts_dict.items(), key=lambda x: x[1], reverse=True)[:top_n]
    feature_labels = [f"{name[:15]}..." if len(name) > 15 else name for name, _ in top_features]
    feature_values = [count for _, count in top_features]
    
    bars = plt.bar(range(len(top_features)), feature_values, color='skyblue')
    plt.xticks(range(len(top_features)), feature_labels, rotation=45, ha='right')
    plt.xlabel('Feature')
    plt.ylabel('Selection Frequency')
    plt.title(f'Top {top_n} Most Selected Features')
    
    # 4. Performance metrics comparison
    plt.subplot(2, 2, 4)
    metrics = ['precision', 'recall', 'f1', 'accuracy']
    avg_metrics = {metric: np.mean([result[metric] for result in evaluation_results]) for metric in metrics}
    
    bars = plt.bar(metrics, [avg_metrics[m] for m in metrics], color=['blue', 'green', 'red', 'purple'])
    
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height + 0.02,
                f'{height:.3f}', ha='center', va='bottom')
    
    plt.ylim(0, 1.1)
    plt.title('Average Performance Metrics (Cross-Validation)')
    plt.grid(True, linestyle='--', alpha=0.4)
    
    plt.tight_layout()
    plt.show()

def display_best_model_details(best_solution, best_result, feature_names):
    """Display detailed information about the best model"""
    print("\n" + "="*80)
    print("BEST MODEL DETAILS")
    print("="*80)
    
    # Feature selection details
    selected_features = [i for i, mask in enumerate(best_solution) if mask > 0.5]
    selected_feature_names = [feature_names[i] for i in selected_features if i < len(feature_names)]
    
    # Performance metrics
    metrics = {
        'Precision': best_result['precision'],
        'Recall': best_result['recall'],
        'F1-Score': best_result['f1'],
        'Accuracy': best_result['accuracy'],
        'ROC-AUC': best_result['roc_auc']
    }
    
    metrics_std = {
        'Precision': best_result['precision_std'],
        'Recall': best_result['recall_std'],
        'F1-Score': best_result['f1_std'],
        'Accuracy': best_result['accuracy_std'],
        'ROC-AUC': best_result['roc_auc_std']
    }
    
    print("\n1. PERFORMANCE METRICS (5-fold Cross-Validation)")
    print("-"*60)
    for metric, value in metrics.items():
        print(f"  • {metric}: {value:.4f} (±{metrics_std[metric]:.4f})")
    
    print("\n2. SELECTED FEATURES")
    print("-"*60)
    print(f"  • Number of selected features: {len(selected_feature_names)} out of {len(feature_names)}")
    print(f"  • Selected features:")
    for i, feature in enumerate(selected_feature_names):
        print(f"    {i+1}. {feature}")
    
    print("\n3. MODEL HYPERPARAMETERS")
    print("-"*60)
    print(f"  • KNN Algorithm: K-Nearest Neighbors")
    print(f"  • n_neighbors (k): 5")
    print(f"  • weights: uniform")
    print(f"  • metric: euclidean (p=2)")
    
    print("="*80)

def save_results_to_excel(pareto_solutions, evaluation_results, feature_names, best_idx=None):
    """Save results to Excel file with best solution highlighted"""
    # Create solution details dataframe
    solution_data = []
    
    for i, ind in enumerate(pareto_solutions):
        fitness = ind.fitness.values
        selected_features = [j for j, mask in enumerate(ind) if mask > 0.5]
        selected_feature_names = [feature_names[j] for j in selected_features if j < len(feature_names)]
        
        # Get evaluation results for this solution
        eval_result = evaluation_results[i]
        
        solution_data.append({
            'Solution': i+1,
            'Is_Best': 'Yes' if i == best_idx else 'No',
            'Precision': fitness[0],
            'Recall': fitness[1],
            'F1': fitness[2],
            'Feature_Count': len(selected_features),
            'CV_Precision': eval_result['precision'],
            'CV_Precision_STD': eval_result['precision_std'],
            'CV_Recall': eval_result['recall'],
            'CV_Recall_STD': eval_result['recall_std'],
            'CV_F1': eval_result['f1'],
            'CV_F1_STD': eval_result['f1_std'],
            'CV_Accuracy': eval_result['accuracy'],
            'CV_ROC_AUC': eval_result['roc_auc'],
            'Selected_Features': ', '.join(selected_feature_names)
        })
    
    solutions_df = pd.DataFrame(solution_data)
    
    # Create Excel file
    with pd.ExcelWriter('multi_objective_feature_selection_results.xlsx') as writer:
        solutions_df.to_excel(writer, sheet_name='Pareto_Solutions', index=False)
        
        # If there's a best solution, create a separate sheet for it
        if best_idx is not None:
            best_solution = solutions_df[solutions_df['Is_Best'] == 'Yes']
            best_solution.to_excel(writer, sheet_name='Best_Solution', index=False)
    
    print("Results saved to 'multi_objective_feature_selection_results.xlsx'")
    files.download('multi_objective_feature_selection_results.xlsx')

# --- Main Pipeline ---

def main():
    """Main execution pipeline"""
    print("=== Multi-Objective Feature Selection for KNN with GA ===")
    print("Simultaneously optimizing: Precision, Recall, and F1-score")
    print("-" * 60)
    
    # Step 1: Upload and load dataset
    print("\n=== Step 1: Upload Dataset File ===")
    df = upload_and_load_file()
    
    if df is None:
        print("No dataset uploaded. Please restart and upload a dataset file.")
        return
    
    print(f"Dataset shape: {df.shape}")
    
    # Step 2: Preprocess dataset
    print("\n=== Step 2: Preprocessing ===")
    X, y, scaler, label_encoders, target_encoder = preprocess_dataset(df)
    
    # Apply SMOTE if there's class imbalance
    class_counts = np.bincount(y)
    if max(class_counts) / min(class_counts) > 1.5:  # If imbalance ratio is greater than 1.5
        print("Class imbalance detected, applying SMOTE...")
        X, y = apply_smote(X, y)
    else:
        print("Class distribution is relatively balanced, no SMOTE needed.")
        print(f"Class distribution: {class_counts}")
    
    # Store feature names for analysis
    feature_names = list(X.columns)
    
    print(f"Features: {len(feature_names)}")
    print(f"First 5 features: {feature_names[:5]}")
    
    # Step 3: Run Multi-Objective GA
    print("\n=== Step 3: Running Multi-Objective Feature Selection ===")
    pareto_solutions = run_multi_objective_ga(
        X, y, 
        n_runs=5,          # Number of independent GA runs
        generations=20,    # Number of generations per run
        pop_size=50        # Population size per generation
    )
    
    print(f"\nFound {len(pareto_solutions)} Pareto-optimal solutions")
    
    # Print Pareto-optimal solutions
    print("\n=== Pareto-Optimal Solutions ===")
    for i, ind in enumerate(pareto_solutions):
        fitness = ind.fitness.values
        selected_features = sum(1 for mask in ind if mask > 0.5)
        print(f"Solution {i+1}: Precision={fitness[0]:.4f}, Recall={fitness[1]:.4f}, F1={fitness[2]:.4f}, Features={selected_features}")
    
    # Step 4: Evaluate solutions using cross-validation
    print("\n=== Step 4: Evaluating Solutions with Cross-Validation ===")
    
    evaluation_results = []
    
    for i, solution in enumerate(pareto_solutions):
        print(f"Evaluating solution {i+1}/{len(pareto_solutions)}")
        
        # Evaluate with 5-fold cross-validation
        metrics = evaluate_solution_with_cv(solution, X, y, cv_folds=5)
        
        # Add additional info
        metrics['solution_id'] = i+1
        metrics['feature_names'] = feature_names
        
        evaluation_results.append(metrics)
        
        print(f"  Precision: {metrics['precision']:.4f} (±{metrics['precision_std']:.4f})")
        print(f"  Recall: {metrics['recall']:.4f} (±{metrics['recall_std']:.4f})")
        print(f"  F1-Score: {metrics['f1']:.4f} (±{metrics['f1_std']:.4f})")
        print(f"  Selected Features: {metrics['selected_features']}")
    
    # Step 5: Select best solution (using F1 score by default)
    print("\n=== Step 5: Selecting Best Solution ===")
    best_solution, best_result = select_best_solution(pareto_solutions, evaluation_results, metric='f1')
    best_idx = evaluation_results.index(best_result)
    
    # Step 6: Build and save the best model
    print("\n=== Step 6: Building and Saving Best Model ===")
    model_info = build_and_save_best_model(best_solution, X, y, feature_names, 'best_knn_multi_objective_model.pkl')
    
    # Step 7: Display detailed results for the best model
    display_best_model_details(best_solution, best_result, feature_names)
    
    # Step 8: Analyze and Visualize Results
    print("\n=== Step 7: Analyzing Results ===")
    
    # Display feature selection statistics
    display_feature_selection_results(pareto_solutions, feature_names)
    
    # Plot Pareto front and other visualizations
    plot_pareto_front(pareto_solutions, evaluation_results)
    
    # Save results to Excel with best solution highlighted
    save_results_to_excel(pareto_solutions, evaluation_results, feature_names, best_idx=best_idx)
    
    print("\n=== Pipeline Completed Successfully ===")
    print("Multi-objective feature selection has provided a set of Pareto-optimal solutions")
    print("Each solution represents a different trade-off between precision, recall, and F1-score")
    print("The best model has been saved and results have been generated")

# Execute the main pipeline
if __name__ == "__main__":
    main()