In [1]:
import os
import pickle
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    classification_report, confusion_matrix
)
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.linear_model import LogisticRegression
import seaborn as sns
import matplotlib.pyplot as plt

# Load data from multiple .pkl files
def load_data_from_directory(directory):
    X_data, y_data = [], []
    for file in os.listdir(directory):
        if file.endswith(".pkl"):
            file_path = os.path.join(directory, file)
            with open(file_path, 'rb') as f:
                data = pickle.load(f)
                X_data.append(data['X'])
                y_data.append(data['y'])
    return np.concatenate(X_data), np.concatenate(y_data)  # Changed to concatenate

# Load data from train and test directories
train_dir = './preprocess/SLEEP_data/cassette_processed/train'
test_dir = './preprocess/SLEEP_data/cassette_processed/test'

X_train, y_train = load_data_from_directory(train_dir)
X_test, y_test = load_data_from_directory(test_dir)

# Create a test set from training data if necessary
if len(X_test) == 0 or len(y_test) == 0:
    print("Test set is empty. Splitting some data from the training set to use as the test set.")
    X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Simplified label mapping
class_mapping = {
    'e': None,
    '1': 'N1',
    '2': 'N2',
    'W': 'Wake',
    'R': 'REM',
    '3': 'N3',
    '4': 'N3',
}

def modify_labels(y):
    return np.array([class_mapping.get(label, label) for label in y])

y_train_modified = modify_labels(y_train)
y_test_modified = modify_labels(y_test)

# Remove instances of class 'e'
train_mask = y_train_modified != None
test_mask = y_test_modified != None

X_train = X_train[train_mask]
y_train_modified = y_train_modified[train_mask]

X_test = X_test[test_mask]
y_test_modified = y_test_modified[test_mask]

# Encode the labels
label_encoder = LabelEncoder()
all_labels_modified = np.concatenate((y_train_modified, y_test_modified))
label_encoder.fit(all_labels_modified)

y_train_encoded = label_encoder.transform(y_train_modified)
y_test_encoded = label_encoder.transform(y_test_modified)

# Flatten the input for classifiers
X_train = X_train.reshape(X_train.shape[0], -1)
X_test = X_test.reshape(X_test.shape[0], -1)

# Define individual classifiers
rf_model = RandomForestClassifier(n_estimators=50, random_state=42)  # Fewer trees
log_reg_model = LogisticRegression(max_iter=500, random_state=42)  # Fewer iterations

# Create an ensemble classifier using voting
ensemble_model = VotingClassifier(
    estimators=[('rf', rf_model), ('log_reg', log_reg_model)],
    voting='soft'
)

# Train the ensemble model
ensemble_model.fit(X_train, y_train_encoded)

# Make predictions
y_pred = ensemble_model.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test_encoded, y_pred)
precision = precision_score(y_test_encoded, y_pred, average='weighted')
recall = recall_score(y_test_encoded, y_pred, average='weighted')
f1 = f1_score(y_test_encoded, y_pred, average='weighted')

# Calculate macro metrics
macro_precision = precision_score(y_test_encoded, y_pred, average='macro')
macro_recall = recall_score(y_test_encoded, y_pred, average='macro')
macro_f1 = f1_score(y_test_encoded, y_pred, average='macro')

# Store results in a dictionary for later use
ensemble_metrics = {
    'model_name': 'Ensemble Learning (Voting Classifier)',
    'accuracy': accuracy,
    'precision': precision,
    'recall': recall,
    'f1_score': f1,
    'macro_precision': macro_precision,
    'macro_recall': macro_recall,
    'macro_f1_score': macro_f1,
    'classification_report': classification_report(y_test_encoded, y_pred, target_names=label_encoder.classes_),
    'confusion_matrix': confusion_matrix(y_test_encoded, y_pred)
}

# Print metrics
print("Ensemble Learning Classifier Metrics:")
print(f"Accuracy: {accuracy:.2f}")
print(f"Weighted Precision: {precision:.2f}")
print(f"Weighted Recall: {recall:.2f}")
print(f"Weighted F1 Score: {f1:.2f}")
print(f"Macro Precision: {macro_precision:.2f}")
print(f"Macro Recall: {macro_recall:.2f}")
print(f"Macro F1 Score: {macro_f1:.2f}")
print("\nClassification Report:")
print(ensemble_metrics['classification_report'])

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(ensemble_metrics['confusion_matrix'], annot=True, fmt='d', cmap='Blues', 
            xticklabels=label_encoder.classes_, 
            yticklabels=label_encoder.classes_)
plt.title('Confusion Matrix for Ensemble Learning')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

# Plot metrics
metrics_names = ['Accuracy', 'Weighted Precision', 'Weighted Recall', 'Weighted F1-score', 'Macro Precision', 'Macro Recall', 'Macro F1-score']
metrics_values = [accuracy, precision, recall, f1, macro_precision, macro_recall, macro_f1]

plt.figure(figsize=(10, 6))
bars = plt.bar(metrics_names, metrics_values, color=['#98df8a', '#2ca02c', '#66c2a5', '#ff7f0e', '#ffbb78', '#ff7f0e', '#ffbb78'], edgecolor='black')

# Add labels on top of the bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval + 0.02, round(yval, 2), ha='center', va='bottom', fontsize=12)

# Set title and labels
plt.title('Ensemble Learning Classifier Metrics', fontsize=16)
plt.ylim(0, 1)
plt.ylabel('Score', fontsize=14)

# Improve layout and show the plot
plt.tight_layout()
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()


ValueError: zero-dimensional arrays cannot be concatenated