In [None]:
pip install boruta


In [None]:
pip install sklearn-genetic

In [None]:
import pandas as pd
import numpy as np
import joblib
import datetimea
import matplotlib.pyplot as plt
import shap
import warnings
import os
from collections import Counter
from sklearn.model_selection import train_test_split, learning_curve
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, OrdinalEncoder
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.svm import SVR, SVC
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier
from sklearn.feature_selection import f_classif, f_regression, mutual_info_classif, mutual_info_regression, SelectKBest, RFE
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, accuracy_score, classification_report
from imblearn.over_sampling import SMOTE
from boruta import BorutaPy
from sklearn.utils import check_X_y
from scipy import sparse
warnings.filterwarnings('ignore')

SAVE_LEARNING_CURVE = True  # Toggle to True if you want to plot learning curves
SAVE_SHAP_PNG = True

# === Feature Logging ===
def log_feature_importance(features, scores, method):
    df = pd.DataFrame({"Feature": features, "Importance": scores})
    filename = f"feature_importance_{method}.csv"
    df.to_csv(filename, index=False)
    print(f"📄 Feature importances saved to {filename}")

# === Evaluate model performance ===
def evaluate_model(task_type, model, X_test, y_test):
    y_pred = model.predict(X_test)
    if task_type == 'r':
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_test, y_pred)
        return {'MAE': mae, 'MSE': mse, 'RMSE': rmse, 'R2': r2}, r2
    else:
        acc = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred, output_dict=True)
        return {'Accuracy': acc, 'Report': report}, acc

# === Plot learning curve ===
def plot_learning_curve(estimator, X, y, task_type, title="Learning Curve", filename=None):
    if not SAVE_LEARNING_CURVE:
        return
    train_sizes, train_scores, test_scores = learning_curve(
        estimator, X, y, cv=5, scoring='r2' if task_type == 'r' else 'accuracy',
        train_sizes=np.linspace(0.1, 1.0, 10), n_jobs=-1)

    train_scores_mean = np.mean(train_scores, axis=1)
    test_scores_mean = np.mean(test_scores, axis=1)

    plt.figure(figsize=(8, 6))
    plt.plot(train_sizes, train_scores_mean, 'o-', color="r", label="Training score")
    plt.plot(train_sizes, test_scores_mean, 'o-', color="g", label="Cross-validation score")
    plt.title(title)
    plt.xlabel("Training examples")
    plt.ylabel("Score")
    plt.legend(loc="best")
    plt.grid(True)
    if filename:
        plt.savefig(filename)
    else:
        plt.show()
    plt.close()

# === Encode categoricals ===
def encode_categoricals(df):
    df_encoded = df.copy()
    encoder = OrdinalEncoder()
    cat_cols = df.select_dtypes(include=['object', 'category']).columns
    if not cat_cols.empty:
        df_encoded[cat_cols] = encoder.fit_transform(df_encoded[cat_cols].astype(str))
    return df_encoded

# === Feature Selection ===
def apply_feature_selection(method, X, y, task_type, mi_threshold=0.01, p_threshold=0.05):
    if method == 'f_test':
        score_func = f_regression if task_type == 'r' else f_classif
        selector = SelectKBest(score_func=score_func, k='all')
        selector.fit(X, y)
        pvalues = selector.pvalues_
        selected_cols = X.columns[np.where(pvalues < p_threshold)[0]].tolist()
        log_feature_importance(X.columns, pvalues, method)

    elif method == 'mutual_info':
        score_func = mutual_info_regression if task_type == 'r' else mutual_info_classif
        selector = SelectKBest(score_func=score_func, k='all')
        selector.fit(X, y)
        scores = selector.scores_
        selected_cols = X.columns[np.where(scores > mi_threshold)[0]].tolist()
        log_feature_importance(X.columns, scores, method)

    elif method == 'boruta':
        model = RandomForestRegressor(n_jobs=-1, random_state=42) if task_type == 'r' else RandomForestClassifier(n_jobs=-1, random_state=42)
        X_array, y_array = check_X_y(X, y)
        X_array = pd.DataFrame(X_array, columns=X.columns)
        boruta_selector = BorutaPy(model, n_estimators='auto', verbose=0, random_state=42)
        boruta_selector.fit(X_array.values, y_array)
        selected_cols = X.columns[boruta_selector.support_].tolist()

    elif method == 'rfe':
        estimator = LinearRegression() if task_type == 'r' else LogisticRegression(solver='liblinear')
        selector = RFE(estimator, n_features_to_select=int(X.shape[1] * 0.5))
        selector.fit(X, y)
        selected_cols = X.columns[selector.support_].tolist()
        log_feature_importance(X.columns, selector.ranking_, method)

    else:
        raise ValueError("Unsupported feature selection method")

    print(f"✅ Selected Features by {method}: {selected_cols}")
    return X[selected_cols]





# === PSI Calculation ===
def calculate_psi(expected, actual, buckets=10):
    expected = np.array(expected)
    actual = np.array(actual)
    if np.std(expected) == 0 or np.std(actual) == 0:
        return 0
    breakpoints = np.linspace(0, 1, buckets + 1)
    scale = lambda x: (x - np.min(x)) / (np.max(x) - np.min(x) + 1e-9)
    expected_scaled = scale(expected)
    actual_scaled = scale(actual)
    expected_bins = np.histogram(expected_scaled, bins=breakpoints)[0] / len(expected)
    actual_bins = np.histogram(actual_scaled, bins=breakpoints)[0] / len(actual)
    psi_values = []
    for e, a in zip(expected_bins, actual_bins):
        if e == 0: e = 1e-4
        if a == 0: a = 1e-4
        psi_values.append((e - a) * np.log(e / a))
    return np.sum(psi_values)

# === SHAP Explanation ===
def explain_model_with_shap(model, X_sample):
    explainer = shap.TreeExplainer(model)
    explainer.check_additivity = False
    shap_values = explainer.shap_values(X_sample)
    shap.summary_plot(shap_values, X_sample)

# === Conditional SMOTE ===
def apply_smote_if_needed(X_train, y_train, task_type):
    if task_type != 'c':
        print("ℹ️ SMOTE not applied: Task is regression.")
        return X_train, y_train

    class_counts = Counter(y_train)
    total = sum(class_counts.values())
    imbalance = any((count / total) < 0.2 for count in class_counts.values())

    if imbalance:
        print(f"⚠️ Imbalanced classes detected: {class_counts}")
        smote = SMOTE(random_state=42)
        X_res, y_res = smote.fit_resample(X_train, y_train)
        print(f"✅ SMOTE applied: {Counter(y_res)}")
        return X_res, y_res
    else:
        print("✅ No significant class imbalance detected. Skipping SMOTE.")
        return X_train, y_train

# === Main Pipeline ===
def self_healing_ai_framework():
    print("\U0001F4D8 Self-Healing AI Framework with Conditional Healing")
    file_path = input("Enter CSV file path or name: ").strip()
    df = pd.read_csv(file_path)
    print("✅ Dataset loaded successfully.")
    print("\n🧾 Dataset Columns:", list(df.columns))

    target_column = input("Enter Target Column: ").strip()
    task_type = input("Task Type (c=Classification, r=Regression): ").strip().lower()
    if task_type not in ['c', 'r']:
        print("Invalid task type! Choose 'c' or 'r'. Exiting.")
        return

    X = df.drop(columns=[target_column])
    y = df[target_column]
    print("\n⚠️ Dropping rows with missing values (NaNs)...")
    initial_shape = X.shape
    X = X.dropna(axis=0)
    y = y.loc[X.index]
    print(f"Dropped {initial_shape[0] - X.shape[0]} rows due to missing values.")
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42,
                                                        stratify=y if task_type == 'c' else None)

    cat_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()
    num_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()

    preprocessor = ColumnTransformer([
        ('cat', OneHotEncoder(handle_unknown='ignore'), cat_cols),
        ('num', StandardScaler(), num_cols)
    ])

    preprocessor.fit(X_train)
    X_train_trans = preprocessor.transform(X_train)
    X_test_trans = preprocessor.transform(X_test)

    X_train_arr = X_train_trans.toarray() if hasattr(X_train_trans, "toarray") else X_train_trans
    X_test_arr = X_test_trans.toarray() if hasattr(X_test_trans, "toarray") else X_test_trans

    X_train_arr, y_train = apply_smote_if_needed(X_train_arr, y_train, task_type)

    print("\n🔧 Training Base Model (Random Forest)")
    base_model = RandomForestRegressor(random_state=42) if task_type == 'r' else RandomForestClassifier(random_state=42)
    base_model.fit(X_train_arr, y_train)
    base_perf, base_score = evaluate_model(task_type, base_model, X_test_arr, y_test)
    print(f"📊 Base Model Performance: {base_perf}")
    plot_learning_curve(base_model, X_train_arr, y_train, task_type, title="Base Model Learning Curve")

    print("\n🔍 SHAP Feature Importance for Base Model")
    X_train_df = pd.DataFrame(X_train_arr, columns=preprocessor.get_feature_names_out())
    X_train_sampled = X_train_df.sample(min(100, len(X_train_df)), random_state=42)
    explain_model_with_shap(base_model, X_train_sampled)

    print("\n📈 Drift Detection (PSI Scores):")
    drift_detected = False
    X_train_enc = encode_categoricals(X_train)
    X_test_enc = encode_categoricals(X_test)

    for col in X_train_enc.columns:
        try:
            psi = calculate_psi(X_train_enc[col], X_test_enc[col])
            if psi > 0.2:
                print(f"⚠️ Drift detected in column '{col}' | PSI = {psi:.4f}")
                drift_detected = True
        except Exception as e:
            print(f"❌ Error calculating PSI for column {col}: {e}")

    if not drift_detected:
        print("✅ No significant drift detected. Healing phase skipped.")
        return

    print("\n🛠️ Healing Phase Triggered...")
    fs_methods = ['f_test', 'mutual_info', 'boruta', 'rfe']

    models = {
        'RandomForest': RandomForestRegressor(random_state=42) if task_type == 'r' else RandomForestClassifier(random_state=42),
        'DecisionTree': DecisionTreeRegressor(random_state=42) if task_type == 'r' else DecisionTreeClassifier(random_state=42),
        'SVM': SVR() if task_type == 'r' else SVC(probability=True),
        'KNN': KNeighborsRegressor() if task_type == 'r' else KNeighborsClassifier()
    }

    best_model, best_score, best_combo = None, -npa.inf if task_type == 'r' else 0, (None, None)

    for fs in fs_methods:
        print(f"\n🔍 Feature Selection: {fs}")
        try:
            X_enc = encode_categoricals(X)
            X_fs = apply_feature_selection(fs, X_enc, y, task_type)
            X_train_fs = X_train[X_fs.columns]
            X_test_fs = X_test[X_fs.columns]

            cat_fs = X_train_fs.select_dtypes(include=['object', 'category']).columns.tolist()
            num_fs = X_train_fs.select_dtypes(include=['int64', 'float64']).columns.tolist()

            preproc_fs = ColumnTransformer([
                ('cat', OneHotEncoder(handle_unknown='ignore'), cat_fs),
                ('num', StandardScaler(), num_fs)
            ])

            preproc_fs.fit(X_train_fs)
            X_train_fs_tr = preproc_fs.transform(X_train_fs)
            X_test_fs_tr = preproc_fs.transform(X_test_fs)

            X_train_fs_arr = X_train_fs_tr.toarray() if hasattr(X_train_fs_tr, "toarray") else X_train_fs_tr
            X_test_fs_arr = X_test_fs_tr.toarray() if hasattr(X_test_fs_tr, "toarray") else X_test_fs_tr

            X_train_model, y_train_model = apply_smote_if_needed(X_train_fs_arr, y_train, task_type)

            for name, model in models.items():
                print(f"Training {name} with {fs}...")
                model.fit(X_train_model, y_train_model)
                perf, score = evaluate_model(task_type, model, X_test_fs_arr, y_test)
                print(f"Performance: {perf}")
                plot_learning_curve(model, X_train_model, y_train_model, task_type,
                                    title=f"{name} + {fs} Learning Curve")
                if (task_type == 'r' and score > best_score) or (task_type == 'c' and score > best_score):
                    best_score, best_model, best_combo = score, model, (name, fs)

        except Exception as e:
            print(f"❌ FS={fs} failed: {e}")

    if best_model:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"best_model_{best_combo[0]}_{best_combo[1]}_{timestamp}.pkl"
        joblib.dump(best_model, filename)
        print(f"\n✅ Best Model Saved: {filename} | Model: {best_combo[0]} | FS: {best_combo[1]} | Score: {best_score:.4f}")
    else:
        print("\n⚠️ No suitable model found.")

if __name__ == "__main__":
    self_healing_ai_framework()