In [17]:
%matplotlib inline
import matplotlib.pyplot as plt

In [18]:
import numpy as np
import os
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.feature_selection import SelectFromModel
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
import time
import multiprocessing
from sklearn.model_selection import ParameterGrid
import matplotlib.pyplot as plt
from sklearn.model_selection import learning_curve
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.model_selection import cross_val_score, cross_validate
from sklearn.metrics import make_scorer, accuracy_score
import matplotlib.pyplot as plt




In [19]:
def load_features_with_labels(root_folder):
    X, y = [], []
    class_folders = ['Ictal', 'Interictal', 'Normal']
    class_labels = {'Ictal': 0, 'Interictal': 1, 'Normal': 2}

    for class_name in class_folders:
        class_folder = os.path.join(root_folder, class_name)
        for file_name in os.listdir(class_folder):
            if file_name.endswith('.npz'):
                file_path = os.path.join(class_folder, file_name)
                data = np.load(file_path)

                if 'features' in data.keys():
                    features = data['features']
                elif 'mobilenet_features' in data.keys():
                    features = np.concatenate([data['mobilenet_features'].flatten(),
                                               data['vgg16_features'].flatten(),
                                               data['lenet_features'].flatten()])
                else:
                    features = np.concatenate([arr.flatten() for arr in data.values() if isinstance(arr, np.ndarray)])

                X.append(features)
                y.append(class_labels[class_name])

    return np.array(X), np.array(y)

def process_combination(params, estimator, X, y, cv=5):
    estimator.set_params(**params)
    scores = cross_val_score(estimator, X, y, cv=cv, scoring='accuracy')
    return params, scores.mean()

def run_with_timeout(func, args, timeout):
    pool = multiprocessing.Pool(processes=1)
    result = pool.apply_async(func, args)
    try:
        return result.get(timeout=timeout)
    except multiprocessing.TimeoutError:
        pool.terminate()
        return None
    finally:
        pool.close()
        pool.join()



In [20]:
def custom_grid_search(estimator, param_grid, X, y, cv=5, timeout=300):
    param_list = list(ParameterGrid(param_grid))
    print(f"Total combinations to try: {len(param_list)}")

    best_score = -float('inf')
    best_params = None

    start_time = time.time()
    for i, params in enumerate(param_list, 1):
        print(f"\nStarting combination {i}/{len(param_list)}: {params}")
        combination_start_time = time.time()

        result = run_with_timeout(process_combination, (params, estimator, X, y, cv), timeout)

        if result is not None:
            _, score = result
            elapsed_time = time.time() - combination_start_time
            print(f"Combination {i}/{len(param_list)}: {params}")
            print(f"Mean CV score: {score:.4f}")
            print(f"Time taken: {elapsed_time:.2f} seconds")

            if score > best_score:
                best_score = score
                best_params = params
        else:
            print(f"Combination {i}/{len(param_list)} timed out after {timeout} seconds")

        print(f"Total elapsed time: {time.time() - start_time:.2f} seconds")
        print("---")

    return best_params, best_score

def plot_learning_curve(estimator, title, X, y, axes=None, ylim=None, cv=None,
                        n_jobs=None, train_sizes=np.linspace(.1, 1.0, 5)):
    if axes is None:
        fig, axes = plt.subplots(1, 3, figsize=(20, 5))

    axes[0].set_title(title)
    if ylim is not None:
        axes[0].set_ylim(*ylim)
    axes[0].set_xlabel("Training examples")
    axes[0].set_ylabel("Score")

    train_sizes, train_scores, test_scores, fit_times, _ = \
        learning_curve(estimator, X, y, cv=cv, n_jobs=n_jobs,
                       train_sizes=train_sizes,
                       return_times=True)
    train_scores_mean = np.mean(train_scores, axis=1)
    train_scores_std = np.std(train_scores, axis=1)
    test_scores_mean = np.mean(test_scores, axis=1)
    test_scores_std = np.std(test_scores, axis=1)
    fit_times_mean = np.mean(fit_times, axis=1)
    fit_times_std = np.std(fit_times, axis=1)

    # Plot learning curve
    axes[0].grid()
    axes[0].fill_between(train_sizes, train_scores_mean - train_scores_std,
                         train_scores_mean + train_scores_std, alpha=0.1,
                         color="r")
    axes[0].fill_between(train_sizes, test_scores_mean - test_scores_std,
                         test_scores_mean + test_scores_std, alpha=0.1,
                         color="g")
    axes[0].plot(train_sizes, train_scores_mean, 'o-', color="r",
                 label="Training score")
    axes[0].plot(train_sizes, test_scores_mean, 'o-', color="g",
                 label="Cross-validation score")
    axes[0].legend(loc="best")

    # Plot n_samples vs fit_times
    axes[1].grid()
    axes[1].plot(train_sizes, fit_times_mean, 'o-')
    axes[1].fill_between(train_sizes, fit_times_mean - fit_times_std,
                         fit_times_mean + fit_times_std, alpha=0.1)
    axes[1].set_xlabel("Training examples")
    axes[1].set_ylabel("fit_times")
    axes[1].set_title("Scalability of the model")

    # Plot fit_time vs score
    axes[2].grid()
    axes[2].plot(fit_times_mean, test_scores_mean, 'o-')
    axes[2].fill_between(fit_times_mean, test_scores_mean - test_scores_std,
                         test_scores_mean + test_scores_std, alpha=0.1)
    axes[2].set_xlabel("fit_times")
    axes[2].set_ylabel("Score")
    axes[2].set_title("Performance of the model")

    return fig



In [None]:
# Load data
X, y = load_features_with_labels('/content/drive/MyDrive/Bonn_processed5')
print(f"Loaded {len(X)} samples with {X.shape[1]} features each.")
print(f"Class distribution: {np.bincount(y)}")

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Create a pipeline with preprocessing steps
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('feature_selection', SelectFromModel(estimator=RandomForestClassifier(n_estimators=100, random_state=42), max_features=5000)),
    ('dim_reduction', LDA(n_components=2)),
])

# Fit the pipeline on the training data
X_train_processed = pipeline.fit_transform(X_train, y_train)
X_test_processed = pipeline.transform(X_test)

# Define base models with hyperparameter grids
base_models = [
    ('rf', RandomForestClassifier(random_state=42), {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5],
        'min_samples_leaf': [1, 2]
    }),
    ('gb', GradientBoostingClassifier(random_state=42), {
        'n_estimators': [100, 200],
        'learning_rate': [0.01, 0.1],
        'max_depth': [3, 5]
    }),
    ('svm', SVC(probability=True, random_state=42), {
        'C': [0.1, 1, 10],
        'kernel': ['rbf', 'linear'],
        'gamma': ['scale', 'auto']
    }),
    ('knn', KNeighborsClassifier(), {
        'n_neighbors': [3, 5, 7],
        'weights': ['uniform', 'distance']
    }),
    ('lr', LogisticRegression(random_state=42), {
        'C': [0.1, 1, 10],
        'solver': ['liblinear', 'lbfgs']
    })
]




Loaded 500 samples with 150642 features each.
Class distribution: [200 100 200]


In [None]:
best_models = []
for name, model, param_grid in base_models:
    print(f"\nTuning {name}...")
    print(f"Parameters being tuned for {name}:")
    for param, values in param_grid.items():
        print(f"  - {param}: {values}")

    start_time = time.time()
    best_params, best_score = custom_grid_search(model, param_grid, X_train_processed, y_train, timeout=300)
    end_time = time.time()

    if best_params is not None:
        print(f"\nBest parameters for {name}: {best_params}")
        print(f"Best score for {name}: {best_score:.4f}")
        print(f"Total time for {name}: {end_time - start_time:.2f} seconds")

        model.set_params(**best_params)
        model.fit(X_train_processed, y_train)
        best_models.append((name, model))

        # Plot learning curve
        plt.figure(figsize=(20, 5))
        plot_learning_curve(model, f"Learning Curve for {name}", X_train_processed, y_train, cv=5, n_jobs=-1)
        plt.savefig(f"learning_curve_{name}.png")
        plt.close()
    else:
        print(f"\nNo valid parameters found for {name}. Skipping this model.")

# Evaluate individual model performances
print("\nIndividual Model Performances:")
for name, model in best_models:
    y_pred_individual = model.predict(X_test_processed)
    accuracy = accuracy_score(y_test, y_pred_individual)
    print(f"{name} Accuracy:", accuracy)
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred_individual))
    print("---")



In [None]:
# Simple ensemble (averaging probabilities)
ensemble_probs = np.mean([model.predict_proba(X_test_processed) for _, model in best_models], axis=0)
ensemble_preds = np.argmax(ensemble_probs, axis=1)

print("\nEnsemble Model Performance:")
ensemble_accuracy = accuracy_score(y_test, ensemble_preds)
print("Accuracy:", ensemble_accuracy)
print("\nClassification Report:")
print(classification_report(y_test, ensemble_preds))

# Plot individual model accuracies and ensemble accuracy
model_names = [name for name, _ in best_models] + ['Ensemble']
accuracies = [accuracy_score(y_test, model.predict(X_test_processed)) for _, model in best_models] + [ensemble_accuracy]

plt.figure(figsize=(12, 6))
plt.bar(model_names, accuracies)
plt.title('Model Accuracies')
plt.xlabel('Models')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
for i, v in enumerate(accuracies):
    plt.text(i, v + 0.01, f'{v:.2f}', ha='center')
plt.savefig('model_accuracies.png')
plt.show()  # Add this line
plt.close()



In [None]:
# Function to create confusion matrix plot
def plot_confusion_matrix(y_true, y_pred, classes, title):
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plt.subplots(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes, ax=ax)
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.savefig(f'{title.lower().replace(" ", "_")}.png')
    plt.show()  # Add this line
    plt.close()

# Plot confusion matrix for ensemble model
plot_confusion_matrix(y_test, ensemble_preds, ['Ictal', 'Interictal', 'Normal'], 'Ensemble Model Confusion Matrix')

# Test accuracy of the ensemble model using cross-validation
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted



In [None]:
class EnsembleClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, models):
        self.models = models

    def fit(self, X, y):
        # Check that X and y have correct shape
        X, y = check_X_y(X, y)
        # Store the classes seen during fit
        self.classes_ = np.unique(y)

        for _, model in self.models:
            model.fit(X, y)

        # Return the classifier
        return self

    def predict(self, X):
        # Check is fit had been called
        check_is_fitted(self)

        # Input validation
        X = check_array(X)

        predictions = np.array([model.predict_proba(X) for _, model in self.models])
        return np.argmax(np.mean(predictions, axis=0), axis=1)

    def predict_proba(self, X):
        # Check is fit had been called
        check_is_fitted(self)

        # Input validation
        X = check_array(X)

        probas = np.mean([model.predict_proba(X) for _, model in self.models], axis=0)
        return probas

    def score(self, X, y):
        return accuracy_score(y, self.predict(X))


In [None]:
# Perform cross-validation
ensemble_classifier = EnsembleClassifier(best_models)

cv_results = cross_validate(ensemble_classifier, X_train_processed, y_train, cv=5, scoring='accuracy')
cv_scores = cv_results['test_score']

print("\nEnsemble Model Cross-Validation Results:")
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV score: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

In [None]:
# Plot learning curve for ensemble model
plt.figure(figsize=(20, 5))
fig = plot_learning_curve(ensemble_classifier, "Learning Curve for Ensemble Model", X_train_processed, y_train, cv=5, n_jobs=-1)
fig.savefig("learning_curve_ensemble.png")
plt.show()
plt.close(fig)