In [None]:
import numpy as np
import pandas as pd
import matplotlib
from sklearn.tree import DecisionTreeRegressor, plot_tree
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from xgboost import XGBRegressor
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import copy
import os
import re
import shutil

from sklearn.tree import export_graphviz
import graphviz

%matplotlib inline
matplotlib.use('module://ipykernel.pylab.backend_inline')



In [None]:
# Setting parameters

n_samples = None # if none then all the datapoints will be loaded, otherwise (for debugging puproses) set the number of data points here
remove_na = True # if True removes all the datapoints with at least one missed value; used for non-impuedt data set.

gender_column = "Sex" # the name of the column where gender or sex is given
gender = None # used for subsetting of the dataset by gender; if none, all the data points are considered
iqr_coefficient = None# if None then no standrat removal of outliers is performed, othwerise used in Q1 - iqr_cf * (Q3-Q1)

outcome = "Diabetes_012" 

home_directory = os.path.expanduser("~")
working_dir = f"{home_directory}/PRIME/example_data" # sets the working directory where the input output files are written in

# https://archive.ics.uci.edu/dataset/891/cdc+diabetes+health+indicators
# https://www.kaggle.com/datasets/alexteboul/diabetes-health-indicators-dataset?resource=download
input_file = f"{working_dir}/diabetes_012_health_indicators_BRFSS2015.csv"

# creating output directory based on outcome name and gender if given
outcome_dir = outcome.replace(',', '_').replace('/', '_').replace(' ', '')

output_dir = f"{working_dir}/tree_{outcome_dir}"

if gender is None:
  output_dir = f"{output_dir}/"
else:
  output_dir = f"{output_dir}_gender_{gender_column}/"
if not os.path.exists(output_dir):
  os.makedirs(output_dir)


predictors = [ "HighBP", "HighChol", "CholCheck", "BMI","Smoker", "Stroke", 
              "HeartDiseaseorAttack", "MentHlth", "PhysActivity", "DiffWalk", "Fruits", "Veggies", "HvyAlcoholConsump", 
             "AnyHealthcare", "NoDocbcCost", "Sex",  "Age", "Education", "Income"]

# tree building algorithm specific parameters
m_samples_split = 500
m_samples_leaf = 250
m_depth = 6

# used in train and test split and in tree building
RANDOM_STATE = 17


In [None]:
# function for removing outliers via iqr approach
def remove_outliers_iqr(df_, iqr_cf,  outliers):
    print(f"# data points before removing outliers: {len(df_)}")
    if iqr_cf is not None:
        
        lower_bound = {}
        upper_bound = {}
        
        for feat  in outliers:
            print(f"{feat}")
            
            Q1 = df_[feat].quantile(0.25)
            Q3 = df_[feat].quantile(0.75)
            
            IQR = Q3-Q1
            lower_bound[feat] = Q1 - iqr_cf * IQR
            upper_bound[feat] = Q3 + iqr_cf * IQR
            
            
            
        for feat in lower_bound:
            df_= df_[(df_[feat] >= lower_bound[feat]) & (df_[feat] <= upper_bound[feat])]
            
        print(f"# data points after removing outliers: {len(df_)}")
    return(df_)

In [None]:
#CSV data import and subsetting
df = pd.read_table(input_file, nrows=n_samples,sep =",")
df = df[predictors+[outcome]]
print(f"data set {len(df)} rows")
if remove_na:
    df = df.dropna(axis="rows")
    print(f"cleaned check up db has {len(df)} rows")
 
if gender is not None:
   df = df[df[gender_column].eq(gender) ]
   predictors.remove(gender_column)
   df = df.drop(columns=[gender_column], axis=1)
       
df = remove_outliers_iqr(df, iqr_coefficient, df.columns) 

In [None]:
features = [x for x in df.columns if x not in [outcome, outcome]]
# features = [x for x in df.columns if x not in outcome]

In [None]:
print(features)

In [None]:
# train and test split 
X_train, X_val, y_train, y_val = train_test_split(df[features], df[outcome], train_size = 0.75, random_state = RANDOM_STATE)
print(f'train samples: {len(X_train)}')
print(f'validation samples: {len(X_val)}')
min_ = df[outcome].min()
max_ = df[outcome].max()
median_ = df[outcome].median()
mean_ = df[outcome].mean()
print(f"tr min {outcome}: {min_}")
print(f"tr max {outcome}: {max_}")
print(f"tr median {outcome}: {median_}")
print(f"tr mean {outcome}: {mean_}")

In [None]:
# function to compute R1
def r1(y_pred, y_true, median_):
    abs_error_pred = (np.abs(y_true - y_pred)).sum()
    abs_error = (np.abs(y_true - median_)).sum()
    r1 = 1 -abs_error_pred/abs_error
    return r1

y_tr = pd.Series([1,2,3])
m_ = y_tr.median()
print(f"test median {m_}")
y_pr = pd.Series([m_, m_, m_]) # 
test = r1(y_pr, y_tr, m_)
print(f"test r1 function: {test}")

In [None]:
# Tree building and evaluation

specific_name = f"{m_samples_split}_{m_samples_leaf}_{m_depth}_{RANDOM_STATE}"

regressor = DecisionTreeRegressor(min_samples_leaf = m_samples_leaf,
                                  min_samples_split =  m_samples_split,
                                  max_depth = m_depth, 
                                  criterion = "friedman_mse",
                                  random_state = RANDOM_STATE)

regression_tree_model = regressor.fit(X_train,y_train)

print(f"MAE train:\n\t: {mean_absolute_error(regression_tree_model.predict(X_train),y_train):.4f}")
print(f"MAE validation:\n\t: {mean_absolute_error(regression_tree_model.predict(X_val),y_val):.4f}")
print(f"RMSE train:\n\t: {np.sqrt(mean_squared_error(regression_tree_model.predict(X_train),y_train)):.4f}")
print(f"RMSE validation:\n\t: {np.sqrt(mean_squared_error(regression_tree_model.predict(X_val),y_val)):.4f}")
print(f"R1 train:\n\t: {r1(regression_tree_model.predict(X_train),y_train,median_):.4f}")
print(f"R1 validation:\n\t: {r1(regression_tree_model.predict(X_val),y_val, median_):.4f}")
print(f"R2 train:\n\t: {r2_score(regression_tree_model.predict(X_train),y_train):.4f}")
print(f"R2 validation:\n\t: {r2_score(regression_tree_model.predict(X_val),y_val):.4f}")


# Retrieve the feature importances
importances = regression_tree_model.feature_importances_

# Print the feature importances
important_features = []
important_importances = []
df_importances = pd.DataFrame()

for feature_name, importance in zip(features, importances):
    if importance > 0:
        important_features.append(feature_name)
        important_importances.append(importance)
        new_row = {'feature': feature_name, 'importance': importance}
        new_row_df = pd.DataFrame([new_row])
        df_importances = pd.concat([df_importances, new_row_df], ignore_index=True)
        
print(df_importances)       
df_importances.to_csv(f"{output_dir}/feature_importance_{m_samples_split}_{m_samples_leaf}_{specific_name}.csv", sep =",")
# Visualize the feature importances
plt.figure(figsize=(10, 10))
plt.barh(important_features, important_importances, align='center')
plt.xlabel("Feature Importance")
plt.ylabel("Feature")
plt.title(f"Feature Importances in DecisionTreeRegressor {specific_name}")
file_path = os.path.join(output_dir, f'feature_importances_{specific_name}.png')
plt.savefig(file_path)
plt.show()

In [None]:
# Tree visualisation


# adjusting colors
values = regressor.tree_.value.flatten()
# Normalize the values for coloring
norm = plt.Normalize(values.min(), values.max())
# Create a custom color map (green to red)
cmap = mcolors.LinearSegmentedColormap.from_list("GreenYellowRed", ["green", "yellow", "red"])
# cmap = mcolors.LinearSegmentedColormap.from_list("RedYellowGreen", ["red", "yellow", "green"])

# Generate colors for each node based on the normalized values and the custom color map
colors = cmap(norm(values))

dot_data_1 = export_graphviz(regressor,  out_file=None, filled=True, rounded=True, special_characters=True,
                           feature_names=features,
                           proportion=True)


# Split the DOT data into lines
dot_lines = dot_data_1.splitlines()
hex_color_pattern = r'fillcolor="#[0-9a-fA-F]{6}"'

# Modify the DOT file lines to include custom colors
new_dot_lines = []
for line in dot_lines:
    if 'fillcolor' in line:
        parts = line.split()
        if parts[0].isdigit():
            node_id = int(parts[0])
            # Get corresponding color
            color = mcolors.to_hex(colors[node_id])
            # Modify the line to include the color
            line = re.sub(hex_color_pattern, f'fillcolor="{color}"', line)
    new_dot_lines.append(line)

# Combine the modified lines back into a single string
new_dot_data = "\n".join(new_dot_lines)

# Render the DOT file with Graphviz
graph = graphviz.Source(new_dot_data)
graph.render(f"{outcome}_regression_tree_{specific_name}", format='png')
graph.render(f"{output_dir}/{outcome}_regression_tree_{specific_name}", format='png')
graph.view()

In [None]:
#function for evaluating statistics for the node in a tree, given node's path

def eval_node(X, y, logical_path, out_, interval_left, interval_right):
    df_left = pd.concat([X, y], axis=1)
    df_right = pd.concat([X, y], axis=1)
    print(len(logical_path))
    for i in range(len(logical_path)-1):
        predicate = logical_path[i]
        print(predicate)
        if predicate[1] == "le":
            df_left= df_left[df_left[predicate[0]].le(predicate[2])]
            df_right= df_right[df_right[predicate[0]].le(predicate[2])]
        else:
            df_left = df_left[df_left[predicate[0]].gt(predicate[2])]
            df_right = df_right[df_right[predicate[0]].gt(predicate[2])]
        
    predicate=logical_path[-1]
    print(predicate)
    df_left= df_left[df_left[predicate[0]].le(predicate[2])]
    df_right = df_right[df_right[predicate[0]].gt(predicate[2])]
            
    median_left = df_left[out_].median()
    q1_left = df_left[out_].quantile(0.25)
    q3_left = df_left[out_].quantile(0.75)
    mean_left = df_left[out_].mean()
    std_left = df_left[out_].std()
        
    median_right = df_right[out_].median()
    q1_right = df_right[out_].quantile(0.25)
    q3_right = df_right[out_].quantile(0.75)
    mean_right= df_right[out_].mean()
    std_right = df_right[out_].std()
        
    if interval_left is not None:
        n_within_interval_left = (df_left[out_].ge(interval_left[0]) & df_left[out_].le(interval_left[1])).sum()
        percentage_within_interval_left = (n_within_interval_left * 100)/len(df_left)
    else:
        percentage_within_interval_left = None
        
    if interval_right is not None:
        n_within_interval_right = (df_right[out_].ge(interval_left[0]) & df_right[out_].le(interval_left[1])).sum()
        percentage_within_interval_right = (n_within_interval_right * 100)/len(df_right)
    else:
        percentage_within_interval_right = None
        
    ret_val = {
            "median": [median_left,median_right],
            "q1": [q1_left, q1_right],
            "q3": [q3_left, q3_right],
            "mean": [mean_left, mean_right],
            "std": [std_left, std_right],
            "percentage_within_interval": [percentage_within_interval_left, percentage_within_interval_right]
        }
    return ret_val

In [None]:
# example of usage of eval_node function
Walk_path = [['HighBP', "gt", 0.5],  ["BMI", "gt", 31.5], ["HighChol", "gt", 0.5], ['DiffWalk', "gt", 0.5]]
tr_node_stat = eval_node(X_train, y_train, Walk_path, outcome, None, None)
print(tr_node_stat)
interval_left_waist = [tr_node_stat["mean"][0] - tr_node_stat["std"][0], tr_node_stat["mean"][0] + tr_node_stat["std"][0]]
interval_right_waist = [tr_node_stat["mean"][1] - tr_node_stat["std"][1], tr_node_stat["mean"][1] + tr_node_stat["std"][1]]
val_node_stat  = eval_node(X_val, y_val, Walk_path, outcome, interval_left_waist, interval_right_waist)
print(val_node_stat)