In [1]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.preprocessing import RobustScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.inspection import permutation_importance
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, History
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import joblib
import os
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 1. Data Loading and Initial Inspection
def load_data(filepath):
    """Load and inspect the dataset"""
    data = pd.read_csv(filepath)
    print("Dataset shape:", data.shape)
    print("\nColumns:", data.columns.tolist())
    print("\nData types:\n", data.dtypes)
    print("\nMissing values:\n", data.isnull().sum())
    return data

# 2. Enhanced Data Cleaning
def clean_data(data):
    """Perform comprehensive data cleaning"""
    # Drop unnecessary or redundant columns
    cols_to_drop = ['User ID', 'VO2 Max', 'Body Fat (%)', 'Workout Type', 'Mood Before Workout', 'Mood After Workout']
    data = data.drop(columns=[col for col in cols_to_drop if col in data.columns])

    # Handle missing values
    numerical_cols = data.select_dtypes(include=['int64', 'float64']).columns
    categorical_cols = data.select_dtypes(include=['object']).columns

    for col in numerical_cols:
        data[col] = data[col].fillna(data[col].median())

    for col in categorical_cols:
        data[col] = data[col].fillna(data[col].mode()[0])

    # Remove duplicates
    data = data.drop_duplicates()

    # Handle outliers using IQR method
    def cap_outliers(series):
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        return series.clip(lower_bound, upper_bound)

    for col in numerical_cols:
        data[col] = cap_outliers(data[col])

    return data

# 3. Improved Feature Engineering
def engineer_features(data):
    """Create meaningful features based on domain knowledge"""
    # Basic health metrics
    data['BMI'] = data['Weight (kg)'] / ((data['Height (cm)']/100) ** 2)
    data['HR_Reserve'] = data['Heart Rate (bpm)'] - data['Resting Heart Rate (bpm)']

    # Activity efficiency metrics
    data['Caloric_Efficiency'] = data['Calories Burned'] / (data['Workout Duration (mins)'] + 1e-6)
    data['Step_Efficiency'] = data['Steps Taken'] / (data['Workout Duration (mins)'] + 1e-6)

    # Fitness level approximation
    data['Fitness_Level'] = (data['Calories Burned'] / data['Weight (kg)']) * (data['Heart Rate (bpm)'] / data['Resting Heart Rate (bpm)'])

    # Interaction terms
    data['Age_Intensity'] = data['Age'] * data['Workout Intensity'].map({'low':1, 'medium':2, 'high':3})
    data['BMI_Intensity'] = data['BMI'] * data['Workout Intensity'].map({'low':1, 'medium':2, 'high':3})

    # Remove infinities and nulls
    data = data.replace([np.inf, -np.inf], np.nan)

    # Fill missing values: numerical with median, categorical with mode
    numerical_cols = data.select_dtypes(include=['int64', 'float64']).columns
    categorical_cols = data.select_dtypes(include=['object']).columns

    for col in numerical_cols:
        data[col] = data[col].fillna(data[col].median())

    for col in categorical_cols:
        data[col] = data[col].fillna(data[col].mode()[0])

    return data

# 4. Target Variable Engineering
def create_target_variable(data):
    """Create more meaningful target variable based on multiple factors"""
    conditions = [
        # Cardio-focused: High heart rate, high calories burned relative to duration
        (data['HR_Reserve'] > 40) & (data['Caloric_Efficiency'] > 10),

        # Strength-focused: Lower cardio metrics but higher BMI
        (data['BMI'] > 25) & (data['HR_Reserve'] < 30),

        # Endurance: Moderate metrics across the board
        (data['HR_Reserve'].between(20, 40)) & (data['Caloric_Efficiency'].between(5, 10))
    ]

    choices = ['cardio-focused', 'strength-focused', 'endurance']
    data['Fitness_Plan'] = np.select(conditions, choices, default='balanced')

    # Ensure balanced classes (if needed)
    min_class_size = data['Fitness_Plan'].value_counts().min()
    balanced_data = pd.DataFrame()

    for plan in data['Fitness_Plan'].unique():
        class_data = data[data['Fitness_Plan'] == plan]
        balanced_data = pd.concat([balanced_data, class_data.sample(min_class_size, random_state=42)])

    return balanced_data

# 5. Advanced Feature Selection
def select_features(X, y):
    """Perform feature selection using multiple methods"""
    # Separate numerical and categorical columns
    numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns
    categorical_cols = X.select_dtypes(include=['object']).columns

    # Correlation analysis on numerical columns only
    if len(numerical_cols) > 0:
        corr_matrix = X[numerical_cols].corr().abs()
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        to_drop = [column for column in upper.columns if any(upper[column] > 0.85)]
    else:
        to_drop = []

    # Drop highly correlated numerical features
    X = X.drop(columns=to_drop)

    # Update numerical columns after dropping
    numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns

    # Preprocess data for ANOVA feature selection
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', RobustScaler(), numerical_cols),
            ('cat', OneHotEncoder(drop='first', handle_unknown='ignore'), categorical_cols)
        ])

    X_transformed = preprocessor.fit_transform(X)

    # Convert to DataFrame with appropriate column names
    cat_feature_names = preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_cols)
    all_feature_names = list(numerical_cols) + list(cat_feature_names)
    X_transformed_df = pd.DataFrame(X_transformed, columns=all_feature_names)

    # Fill NaN values to ensure compatibility with SelectKBest
    X_transformed_df = X_transformed_df.fillna(X_transformed_df.median())

    # Debug NaN values
    print("NaN counts after median fill:\n", X_transformed_df.isnull().sum())

    # Validate no NaN values
    if X_transformed_df.isnull().sum().sum() > 0:
        print("Warning: NaN values detected after filling. Filling with 0 as a fallback.")
        X_transformed_df = X_transformed_df.fillna(0)

    # ANOVA feature selection
    selector = SelectKBest(f_classif, k=min(15, X_transformed_df.shape[1]))
    X_selected = selector.fit_transform(X_transformed_df, y)
    selected_features = X_transformed_df.columns[selector.get_support()]

    # Map selected features back to original features
    original_features = []
    for feature in selected_features:
        if feature in numerical_cols:
            original_features.append(feature)
        else:
            # Extract the original categorical column name
            for cat_col in categorical_cols:
                if feature.startswith(cat_col):
                    if cat_col not in original_features:
                        original_features.append(cat_col)

    return X[original_features]

# 6. Model Building - Enhanced ANN
def build_ann_model(input_shape, num_classes):
    """Create optimized ANN architecture"""
    model = Sequential([
        Dense(128, activation='relu', input_shape=(input_shape,), kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        Dropout(0.4),
        Dense(64, activation='relu', kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        Dropout(0.2),
        Dense(num_classes, activation='softmax')
    ])

    optimizer = Adam(learning_rate=0.0001)
    model.compile(optimizer=optimizer,
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    return model

# 7. Model Building - Optimized XGBoost
def build_xgboost_model():
    """Create optimized XGBoost model"""
    model = xgb.XGBClassifier(
        max_depth=4,
        learning_rate=0.1,
        n_estimators=200,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.5,
        reg_lambda=0.5,
        gamma=0.1,
        min_child_weight=3,
        use_label_encoder=False,
        eval_metric='mlogloss',
        random_state=42
    )
    return model

# 8. Custom Cross-Validation for ANN
def cross_validate_ann(X, y, input_shape, num_classes, cv, epochs=100, batch_size=32):
    """Custom cross-validation for ANN model"""
    scores = []
    skf = StratifiedKFold(n_splits=cv.n_splits, shuffle=True, random_state=42)

    for train_idx, val_idx in skf.split(X, y):
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        # Build and train ANN
        model = build_ann_model(input_shape, num_classes)
        early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
        model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_val, y_val),
            callbacks=[early_stopping],
            verbose=0
        )

        # Evaluate on validation fold
        y_pred = np.argmax(model.predict(X_val), axis=1)
        score = accuracy_score(y_val, y_pred)
        scores.append(score)

    return np.array(scores)

# 9. Model Evaluation
def evaluate_model(model, X_test, y_test, feature_names, model_name='Model', history=None):
    """Comprehensive model evaluation"""
    # Handle ANN prediction differently
    if isinstance(model, tf.keras.Model):
        y_pred = np.argmax(model.predict(X_test), axis=1)
    else:
        y_pred = model.predict(X_test)

    print(f"\n{model_name} Performance:")
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

    # Confusion matrix
    plt.figure(figsize=(8, 6))
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'{model_name} Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.savefig(f'figures/{model_name.lower()}_confusion_matrix.png')
    plt.close()

    # Feature importance for tree-based models
    if hasattr(model, 'feature_importances_'):
        feat_importances = pd.Series(model.feature_importances_, index=feature_names)
        plt.figure(figsize=(10, 6))
        feat_importances.nlargest(10).plot(kind='barh')
        plt.title(f'{model_name} Feature Importance')
        plt.savefig(f'figures/{model_name.lower()}_feature_importance.png')
        plt.close()

    # Plot training history for ANN
    if history and isinstance(model, tf.keras.Model):
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(history['loss'], label='Training Loss')
        plt.plot(history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Training and Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(history['accuracy'], label='Training Accuracy')
        plt.plot(history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Training and Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.savefig(f'figures/{model_name.lower()}_training_history.png')
        plt.close()

# Main Pipeline
def main():
    # Create directories
    os.makedirs('figures', exist_ok=True)

    # Load dataset from Google Drive
    data_path = '/content/drive/My Drive/ML Project/fitness_data.csv'
    data = load_data(data_path)

    # Clean data
    cleaned_data = clean_data(data)

    # Feature engineering
    engineered_data = engineer_features(cleaned_data)

    # Create target variable
    final_data = create_target_variable(engineered_data)

    # Prepare features and target
    X = final_data.drop(columns=['Fitness_Plan'])
    y = final_data['Fitness_Plan']

    # Encode target variable
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    # Feature selection
    X_selected = select_features(X, y_encoded)

    # Preprocessing pipeline
    numerical_cols = X_selected.select_dtypes(include=['int64', 'float64']).columns
    categorical_cols = X_selected.select_dtypes(include=['object']).columns

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', RobustScaler(), numerical_cols),
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
        ])

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X_selected, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded)

    # Apply preprocessing and get feature names
    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_test)

    # Get feature names after preprocessing
    cat_feature_names = preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_cols)
    all_feature_names = list(numerical_cols) + list(cat_feature_names)

    # Convert sparse matrix to dense for ANN compatibility
    X_train_processed = X_train_processed.toarray() if hasattr(X_train_processed, 'toarray') else X_train_processed
    X_test_processed = X_test_processed.toarray() if hasattr(X_test_processed, 'toarray') else X_test_processed

    # Handle class imbalance
    smote = SMOTE(random_state=42)
    X_train_res, y_train_res = smote.fit_resample(X_train_processed, y_train)

    # Cross-validation for statistical measures
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # ANN Custom Cross-validation
    ann_cv_scores = cross_validate_ann(X_train_res, y_train_res, X_train_res.shape[1], len(np.unique(y_train_res)), cv)
    print("\nANN Cross-Validation Scores:", ann_cv_scores)
    print("ANN Mean CV Accuracy:", ann_cv_scores.mean())
    print("ANN Standard Deviation:", ann_cv_scores.std())

    # XGBoost Cross-validation
    xgb_model = build_xgboost_model()
    xgb_cv_scores = cross_val_score(xgb_model, X_train_res, y_train_res, cv=cv, scoring='accuracy')
    print("\nXGBoost Cross-Validation Scores:", xgb_cv_scores)
    print("XGBoost Mean CV Accuracy:", xgb_cv_scores.mean())
    print("XGBoost Standard Deviation:", xgb_cv_scores.std())

    # Box plot of cross-validation scores
    plt.figure(figsize=(10, 6))
    plt.boxplot([ann_cv_scores, xgb_cv_scores], labels=['ANN', 'XGBoost'])
    plt.title('Cross-Validation Accuracy Comparison')
    plt.ylabel('Accuracy')
    plt.savefig('figures/cv_accuracy_comparison.png')
    plt.close()

    # Build and train ANN
    ann_model = build_ann_model(X_train_res.shape[1], len(np.unique(y_train_res)))
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    history = ann_model.fit(
        X_train_res, y_train_res,
        epochs=100,
        batch_size=32,
        validation_split=0.2,
        callbacks=[early_stopping],
        verbose=1)

    # Build and train XGBoost
    xgb_model = build_xgboost_model()
    xgb_model.fit(X_train_res, y_train_res)

    # Evaluate models
    evaluate_model(ann_model, X_test_processed, y_test, all_feature_names, 'ANN', history.history)
    evaluate_model(xgb_model, X_test_processed, y_test, all_feature_names, 'XGBoost')

    # Correlation heatmap of selected numeric features (if feasible)
    if X_selected.select_dtypes(include=['int64', 'float64']).shape[1] <= 20:  # Limit to avoid performance issues
        plt.figure(figsize=(12, 8))
        sns.heatmap(X_selected.select_dtypes(include=['int64', 'float64']).corr(), annot=True, cmap='coolwarm', vmin=-1, vmax=1)
        plt.title('Correlation Heatmap of Selected Features')
        plt.savefig('figures/feature_correlation_heatmap.png')
        plt.close()

    # Save models
    joblib.dump(preprocessor, 'preprocessor.pkl')
    joblib.dump(le, 'label_encoder.pkl')
    ann_model.save('fitness_ann_model.h5')
    joblib.dump(xgb_model, 'fitness_xgb_model.pkl')

if __name__ == '__main__':
    main()

ModuleNotFoundError: No module named 'pandas'

In [4]:
from google.colab import files

# Download preprocessing and model files
files.download('preprocessor.pkl')
files.download('label_encoder.pkl')
files.download('fitness_ann_model.h5')
files.download('fitness_xgb_model.pkl')

# Download all plots from the figures directory
import os
figures_dir = 'figures'
for filename in os.listdir(figures_dir):
    if filename.endswith('.png'):
        files.download(os.path.join(figures_dir, filename))

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>