In [1]:
import pandas as pd
import numpy as np
import os
import openpyxl

from data_preprocessing import encode_categorical_features
from sampling import create_stratified_kfolds
from sklearn.model_selection import StratifiedKFold

from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, RFE, SelectFromModel, f_classif, mutual_info_classif, chi2, RFE

from sklearn.linear_model import LogisticRegression, Lasso
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB

from catboost import CatBoostClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score, \
    matthews_corrcoef, mean_squared_error, r2_score, roc_auc_score, roc_curve, auc
from math import sqrt

import xml.etree.ElementTree as ET

In [None]:
# notebook parameters
input_dataset_path = "data/heart_disease_health_indicators_BRFSS2015.csv"
target_col = "HeartDiseaseorAttack"
generate_new_folds = False
n_splits = 5
k_best_features = 10

In [None]:
def calculate_metrics(y_pred: np.array, y_test: pd.Series):
    """ Calculate model quality metrics based on 
        expected label values from testing dataset (y_test) and predicted values.
    """
    tn, fp, fn, tp = calculate_test_results_from_confusion_matrix(y_test, y_pred)
    model_precision = precision_score(y_test, y_pred)
    model_recall = recall_score(y_test, y_pred) # sensitivity
    model_specificity = specificity_score(tn, fp)
    model_acc = accuracy_score(y_test, y_pred)
    model_npv = calculate_npv(tn, fn)

    model_f1_score = f1_score(y_test, y_pred)
    model_mcc = matthews_corrcoef(y_test, y_pred)
    rmse = sqrt(mean_squared_error(y_test, y_pred))
    model_r2 = r2_score(y_test, y_pred)
    roc_auc = roc_auc_score(y_test, y_pred)
    fpr, tpr, thresholds = roc_curve(y_test, y_pred)

    model_scores = {
        "True_Negative": tn,
        "False_Positive": fp,
        "False_Negative": fn,
        "True_Positive": tp,
        "Precision_PPV": model_precision,
        "Sensitivity_TPR_Recall": model_recall,
        "Speciticity_TNR": model_specificity,
        "Accuracy": model_acc,
        "Negative_Predictive_Value_NPV": model_npv,
        "F1_Score": model_f1_score,
        "RMSE": rmse,
        "R_Squared": model_r2,
        "Matthews_Correlation_Coefficient_MCC": model_mcc,
        "Threshold_from_ROC_Curve": list(thresholds),
        "False_Positive_Rate_FPR": list(fpr),
        "ROC_AUC_score": roc_auc
    }

    return model_scores

def calculate_test_results_from_confusion_matrix(y_test: pd.DataFrame, y_pred: pd.DataFrame):
    """ Calculate the confusion matrix and extract TP, FP, TN, FN from that matrix """
    conf_matrix = confusion_matrix(y_test, y_pred)
    tn, fp, fn, tp = conf_matrix.ravel()

    return tn, fp, fn, tp

def specificity_score(tn: float, fp: float):
    return tn / (tn + fp)

def calculate_npv(tn: float, fn: float):
    return tn / (tn + fn)

In [None]:
def dict_to_xml(dictionary, root_name='root'):
    root = ET.Element(root_name)
    for key, value in dictionary.items():
        if isinstance(value, dict):
            root.append(dict_to_xml(value, key))
        else:
            element = ET.Element(key)
            element.text = str(value)
            root.append(element)
    return root


def save_dict_to_xml(dictionary, file_path, root_name='root'):
    root = dict_to_xml(dictionary, root_name)
    tree = ET.ElementTree(root)
    with open(file_path, 'wb') as file:
        tree.write(file)

### Loading dataset

In [None]:
heart_df = pd.read_csv(input_dataset_path)
heart_df[target_col] = heart_df[target_col].astype(int)
heart_df.head()

Unnamed: 0,HeartDiseaseorAttack,HighBP,HighChol,CholCheck,BMI,Smoker,Stroke,Diabetes,PhysActivity,Fruits,...,AnyHealthcare,NoDocbcCost,GenHlth,MentHlth,PhysHlth,DiffWalk,Sex,Age,Education,Income
0,0,1.0,1.0,1.0,40.0,1.0,0.0,0.0,0.0,0.0,...,1.0,0.0,5.0,18.0,15.0,1.0,0.0,9.0,4.0,3.0
1,0,0.0,0.0,0.0,25.0,1.0,0.0,0.0,1.0,0.0,...,0.0,1.0,3.0,0.0,0.0,0.0,0.0,7.0,6.0,1.0
2,0,1.0,1.0,1.0,28.0,0.0,0.0,0.0,0.0,1.0,...,1.0,1.0,5.0,30.0,30.0,1.0,0.0,9.0,4.0,8.0
3,0,1.0,0.0,1.0,27.0,0.0,0.0,0.0,1.0,1.0,...,1.0,0.0,2.0,0.0,0.0,0.0,0.0,11.0,3.0,6.0
4,0,1.0,1.0,1.0,24.0,0.0,0.0,0.0,1.0,1.0,...,1.0,0.0,2.0,3.0,0.0,0.0,0.0,11.0,5.0,4.0


In [None]:
# general dataset descriptors
print(f"Input dataset has {heart_df.shape[0]} rows and {heart_df.shape[1]} colums")
print(f"Input dataset consists of {heart_df.drop(columns=[target_col]).shape[1]} features and 1 target column")

print(f"Target values are: {heart_df[target_col].unique()}")
print(f"Input dataset contains {heart_df[heart_df.duplicated()].shape[0]} duplicated rows and {heart_df[heart_df.duplicated()==False].shape[0]} unique rows")

Input dataset has 253680 rows and 22 colums
Input dataset consists of 21 features and 1 target column
Target values are: [0 1]
Input dataset contains 23899 duplicated rows and 229781 unique rows


In [None]:
heart_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 253680 entries, 0 to 253679
Data columns (total 22 columns):
 #   Column                Non-Null Count   Dtype  
---  ------                --------------   -----  
 0   HeartDiseaseorAttack  253680 non-null  int32  
 1   HighBP                253680 non-null  float64
 2   HighChol              253680 non-null  float64
 3   CholCheck             253680 non-null  float64
 4   BMI                   253680 non-null  float64
 5   Smoker                253680 non-null  float64
 6   Stroke                253680 non-null  float64
 7   Diabetes              253680 non-null  float64
 8   PhysActivity          253680 non-null  float64
 9   Fruits                253680 non-null  float64
 10  Veggies               253680 non-null  float64
 11  HvyAlcoholConsump     253680 non-null  float64
 12  AnyHealthcare         253680 non-null  float64
 13  NoDocbcCost           253680 non-null  float64
 14  GenHlth               253680 non-null  float64
 15  

### Data preprocessing

In [None]:
# delete all duplicated values within the dataset
heart_df.drop_duplicates(inplace=True)

In [None]:
# encode categorical features using LabelEncoder and OneHotEncoding
heart_df = encode_categorical_features(heart_df, target_col)

Ordinal Categorical Features: []
Nominal Categorical Features: []


In [None]:
# divide a heart failure dataset into features and target value sets
x = heart_df.drop(columns=[target_col])
y = heart_df[target_col]

### Data stratified sampling

In [None]:
# if needed generare and save into CSV files new folds created by stratified data sampling
if generate_new_folds:
    create_stratified_kfolds(x_df=x, y_df=y, dataset=heart_df, n_splits=n_splits)

# Read folds that are available
# Create empty lists to store train and test DataFrames
train_datasets = []
test_datasets = []

for fold_num in range(1, n_splits+1):
    train_file_path = f"folds/fold_{fold_num}_train.csv"
    test_file_path = f"folds/fold_{fold_num}_test.csv"
    
    # Load the train and test fold data into DataFrames
    train_fold = pd.read_csv(train_file_path)
    test_fold = pd.read_csv(test_file_path)
    
    train_datasets.append(train_fold)
    test_datasets.append(test_fold)
print("Folds data were loaded successfully!")

Folds data were loaded successfully!


### Models definitions

In [None]:
models = {
    "Random_Forest": RandomForestClassifier(n_estimators=100, random_state=42),
    "Decision_Tree": DecisionTreeClassifier(random_state=11, criterion="entropy"),
    "Pruning_Decision_Tree": DecisionTreeClassifier(random_state=11, ccp_alpha=0.02, criterion="entropy"),
    "Logistic_Regression": LogisticRegression(random_state=0, C=10, penalty="l2"),
    "Support_Vector_Machine": SVC(kernel="linear", C=0.3),
    "K_Nearest_Neighbours": KNeighborsClassifier(leaf_size=1, n_neighbors=3),
    "Gaussian_Naive_Bayes": GaussianNB(),
    "XGBoost_Classifier": XGBClassifier(n_estimators=2, max_depth=2, learning_rate=1, objective="binary:logistic")
    # ,"CatBoost_Classifier": CatBoostClassifier(verbose=False)
}

### Model Training and Evaluation

In [None]:
# Create a directory for the folds if it doesn't exist
if not os.path.exists('model_results'):
    os.makedirs('model_results')

In [None]:
folds_results = {}

for fold_num, (train_data, test_data) in enumerate(zip(train_datasets, test_datasets),1):
    # Standardize features using StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(train_data.drop(target_col, axis=1))
    X_test_scaled = scaler.transform(test_data.drop(target_col, axis=1))

    # Feature Selection using SelectKBest with ANOVA F-statistic
    selector = SelectKBest(score_func=f_classif, k=k_best_features)
    X_train_selected = selector.fit_transform(X_train_scaled, train_data[target_col])
    X_test_selected = selector.transform(X_test_scaled)

    models_results_dict = {}
    # Model Building
    for model_name, model in models.items():
        # train model
        model.fit(X_train_selected, train_data[target_col])

        # Model Evaluation (e.g., accuracy score)
        y_pred = model.predict(X_test_selected)
        model_scores = calculate_metrics(y_test=test_data[target_col], y_pred=y_pred)
        models_results_dict[model_name] = model_scores

    # Add the selected feature names or indices to the dictionary
    models_results_dict["selected_features"] = list(x.columns[selector.get_support()])

    folds_results[f"fold_{fold_num}"] = models_results_dict

# fclassif_results_df = pd.DataFrame.from_dict(folds_results) 
# fclassif_results_df.to_csv(f"model_results/f_classif_feature_selection_{k_best_features}best.csv", index=False, header=True)

# fclassif_results_df = pd.DataFrame(folds_results)
# fclassif_results_df.to_excel(f"model_results/f_classif_feature_selection_{k_best_features}best.xlsx", header=True, index=False)

save_dict_to_xml(folds_results, f"model_results/f_classif_feature_selection_{k_best_features}best.xml")

  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
folds_results = {}

for fold_num, (train_data, test_data) in enumerate(zip(train_datasets, test_datasets),1):
    # Standardize features using StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(train_data.drop(target_col, axis=1))
    X_test_scaled = scaler.transform(test_data.drop(target_col, axis=1))

    # Feature Selection Mutual Information (MI): This score function measures the dependence between two random variables. 
    # It is a good choice when you want to capture both linear and non-linear relationships between features and the target variable.
    selector = SelectKBest(score_func=mutual_info_classif, k=k_best_features)
    X_train_selected = selector.fit_transform(X_train_scaled, train_data[target_col])
    X_test_selected = selector.transform(X_test_scaled)

    models_results_dict = {}
    # Model Building
    for model_name, model in models.items():
        # train model
        model.fit(X_train_selected, train_data[target_col])

        # Model Evaluation (e.g., accuracy score)
        y_pred = model.predict(X_test_selected)
        model_scores = calculate_metrics(y_test=test_data[target_col], y_pred=y_pred)
        models_results_dict[model_name] = model_scores

    # Add the selected feature names or indices to the dictionary
    models_results_dict["selected_features"] = list(x.columns[selector.get_support()])
    
    folds_results[f"fold_{fold_num}"] = models_results_dict

# mutual_info_results_df = pd.DataFrame.from_dict(folds_results) 
# mutual_info_results_df.to_csv(f"model_results/mutual_info_feature_selection_{k_best_features}best.csv", index=False, header=True)

# mutual_info_results_df = pd.DataFrame(folds_results)
# mutual_info_results_df.to_excel(f"model_results/mutual_info_feature_selection_{k_best_features}best.xlsx", header=True, index=False)

save_dict_to_xml(folds_results, f"model_results/mutual_info_feature_selection_{k_best_features}best.xml")

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
folds_results = {}

for fold_num, (train_data, test_data) in enumerate(zip(train_datasets, test_datasets),1):
    # Standardize features using StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(train_data.drop(target_col, axis=1))
    X_test_scaled = scaler.transform(test_data.drop(target_col, axis=1))

    # Feature Selection Recursive Feature Elimination (RFE): RFE is an iterative feature selection method that recursively removes the least significant features. 
    # It is particularly useful when a prior model (e.g., logistic regression, SVM) in mind.
    selector = RFE(estimator=LogisticRegression(), n_features_to_select=k_best_features)
    X_train_selected = selector.fit_transform(X_train_scaled, train_data[target_col])
    X_test_selected = selector.transform(X_test_scaled)

    models_results_dict = {}
    # Model Building
    for model_name, model in models.items():
        # train model
        model.fit(X_train_selected, train_data[target_col])

        # Model Evaluation (e.g., accuracy score)
        y_pred = model.predict(X_test_selected)
        model_scores = calculate_metrics(y_test=test_data[target_col], y_pred=y_pred)
        models_results_dict[model_name] = model_scores

    # Add the selected feature names or indices to the dictionary
    models_results_dict["selected_features"] = list(x.columns[selector.get_support()])
    
    folds_results[f"fold_{fold_num}"] = models_results_dict

# RFE_results_df = pd.DataFrame.from_dict(folds_results) 
# RFE_results_df.to_csv(f"model_results/RFE_feature_selection_{k_best_features}best.csv", index=False, header=True)

# RFE_results_df = pd.DataFrame(folds_results)
# RFE_results_df.to_excel(f"model_results/RFE_feature_selection_{k_best_features}best.xlsx", header=True, index=False)

save_dict_to_xml(folds_results, f"model_results/RFE_feature_selection_{k_best_features}best.xml")

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
folds_results = {}
for fold_num, (train_data, test_data) in enumerate(zip(train_datasets, test_datasets),1):
    # Standardize features using StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(train_data.drop(target_col, axis=1))
    X_test_scaled = scaler.transform(test_data.drop(target_col, axis=1))

    # Feature Selection Feature Importance from Tree-based Models: For ensemble tree-based models like Random Forest or XGBoost, you can use feature importances to select the most important features.
    selector = SelectFromModel(RandomForestClassifier(), max_features=k_best_features)
    X_train_selected = selector.fit_transform(X_train_scaled, train_data[target_col])
    X_test_selected = selector.transform(X_test_scaled)

    models_results_dict = {}
    # Model Building
    for model_name, model in models.items():
        # train model
        model.fit(X_train_selected, train_data[target_col])

        # Model Evaluation (e.g., accuracy score)
        y_pred = model.predict(X_test_selected)
        model_scores = calculate_metrics(y_test=test_data[target_col], y_pred=y_pred)
        models_results_dict[model_name] = model_scores

    # Add the selected feature names or indices to the dictionary
    models_results_dict["selected_features"] = list(x.columns[selector.get_support()])
    
    folds_results[f"fold_{fold_num}"] = models_results_dict

# tree_feature_importance_results_df = pd.DataFrame.from_dict(folds_results) 
# tree_feature_importance_results_df.to_csv(f"model_results/tree_feature_importance_feature_selection_{k_best_features}best.csv", index=False, header=True)

# tree_feature_importance_results_df = pd.DataFrame(folds_results)
# tree_feature_importance_results_df.to_excel(f"model_results/tree_feature_importance_feature_selection_{k_best_features}best.xlsx", header=True, index=False)

save_dict_to_xml(folds_results, f"model_results/tree_feature_importance_feature_selection_{k_best_features}best.xml")

In [None]:
# from sklearn.feature_selection import SelectFromModel
# from sklearn.linear_model import Lasso

# folds_results = {}
# for fold_num, (train_data, test_data) in enumerate(zip(train_datasets, test_datasets),1):
#     # Standardize features using StandardScaler
#     scaler = StandardScaler()
#     X_train_scaled = scaler.fit_transform(train_data.drop(target_col, axis=1))
#     X_test_scaled = scaler.transform(test_data.drop(target_col, axis=1))

#     # Feature Selection L1-based feature selection: L1 regularization methods like Lasso can be used for feature selection. Features with zero coefficients can be pruned.
#     selector = SelectFromModel(Lasso(), max_features=k_best_features)
#     X_train_selected = selector.fit_transform(X_train_scaled, train_data[target_col])
#     X_test_selected = selector.transform(X_test_scaled)

#     models_results_dict = {}
#     # Model Building
#     for model_name, model in models.items():
#         # train model
#         model.fit(X_train_selected, train_data[target_col])

#         # Model Evaluation (e.g., accuracy score)
#         y_pred = model.predict(X_test_selected)
#         model_scores = calculate_metrics(y_test=test_data[target_col], y_pred=y_pred)
#         models_results_dict[model_name] = model_scores

#     folds_results[f"fold_{fold_num}"] = models_results_dict

# lasso_results_df = pd.DataFrame.from_dict(folds_results) 
# lasso_results_df.to_csv("model_evaluation_results_lasso_feature_selection.csv", index=False, header=True)

# lasso_results_df = pd.DataFrame(folds_results)
# lasso_results_df.to_excel("model_evaluation_results_lasso_feature_selection.xlsx", header=True, index=False)


In [None]:
# folds_results = {}
# 
# for fold_num, (train_data, test_data) in enumerate(zip(train_datasets, test_datasets),1):
#     # Standardize features using StandardScaler
#     scaler = StandardScaler()
#     X_train_scaled = scaler.fit_transform(train_data.drop(target_col, axis=1))
#     X_test_scaled = scaler.transform(test_data.drop(target_col, axis=1))

#     # Feature Selection Chi-Squared (χ²): Chi-squared tests can be used for feature selection when dealing with categorical data. 
#     # It measures the dependency between variables and is particularly useful for feature selection in classification tasks with categorical features.
#     selector = SelectKBest(score_func=chi2, k=k_best_features)
#     X_train_selected = selector.fit_transform(X_train_scaled, train_data[target_col])
#     X_test_selected = selector.transform(X_test_scaled)

#     models_results_dict = {}
#     # Model Building
#     for model_name, model in models.items():
#         # train model
#         model.fit(X_train_selected, train_data[target_col])

#         # Model Evaluation (e.g., accuracy score)
#         y_pred = model.predict(X_test_selected)
#         model_scores = calculate_metrics(y_test=test_data[target_col], y_pred=y_pred)
#         models_results_dict[model_name] = model_scores
    
#     folds_results[f"fold_{fold_num}"] = models_results_dict

# chi2_results_df = pd.DataFrame.from_dict(folds_results) 
# chi2_results_df.to_csv("model_evaluation_results_chi2_feature_selection.csv", index=False, header=True)

# chi2_results_df = pd.DataFrame(folds_results)
# chi2_results_df.to_excel("model_evaluation_results_chi2_feature_selection.xlsx", header=True, index=False)