In [1]:
import pandas as pd
import numpy as np
import itertools
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score

In [2]:
dataset = pd.read_csv('/content/sleep_deprivation_dataset_detailed.csv', nrows=10)
dataset.head()

Unnamed: 0,Participant_ID,Sleep_Hours,Sleep_Quality_Score,Daytime_Sleepiness,Stroop_Task_Reaction_Time,N_Back_Accuracy,Emotion_Regulation_Score,PVT_Reaction_Time,Age,Gender,BMI,Caffeine_Intake,Physical_Activity_Level,Stress_Level
0,P1,5.25,15,12,1.6,64.2,12,365.85,35,Female,30.53,2,1,33
1,P2,8.7,12,14,2.54,65.27,21,288.95,20,Male,27.28,3,8,37
2,P3,7.39,17,10,3.4,74.28,35,325.93,18,Male,30.0,1,2,32
3,P4,6.59,14,3,3.54,72.42,25,276.86,18,Male,34.47,5,0,23
4,P5,3.94,20,12,3.09,99.72,60,383.45,36,Male,29.7,3,4,14


In [3]:
def entropy(y):
  if isinstance(y, pd.Series):
    a = y.value_counts()/y.shape[0]
    entropy = np.sum(-a*np.log2(a+1e-9))
    return(entropy)
  else:
    raise('Object must be a Pandas Series.')

entropy(dataset['Sleep_Quality_Score'])

2.6464393345721504

In [4]:
def variance(y):
  if(len(y) == 1):
    return 0
  else:
    return y.var()

def information_gain(y, mask, func=entropy):
  a = sum(mask)
  b = mask.shape[0] - a

  if(a == 0 or b ==0):
    gain = 0
  else:
    if y.dtypes != 'O':
      gain = variance(y) - (a/(a+b)* variance(y[mask])) - (b/(a+b)*variance(y[~mask]))
    else:
      gain = func(y)-a/(a+b)*func(y[mask])-b/(a+b)*func(y[~mask])

  return gain

In [5]:
information_gain(dataset['Physical_Activity_Level'], dataset['Sleep_Quality_Score'] == 1)

0

To calculate the best split of a numeric variable, first, all possible values that the variable is taking must be obtained. Once we have the options, for each option we will calculate the Information Gain using as a filter if the value is less than that value. Obviously, the first possible data will be drop, because the split will include all values.

In case we have categorical variables, the idea is the same, only that in this case we will have to calculate the Information Gain for all possible combinations of that variable, excluding the option that includes all the options (since it would not be doing any split). This is quite computationally costly if we have a high number of categories, that decision tree algorithms usually only accept categorical variables with less than 20 categories.

So, once we have all the splits, we will stick with the split that generates the highest Information Gain.

In [6]:
def categorical_options(a):
  #Creates all possible combinations from a Pandas Series.

  a = a.unique()

  opciones = []
  for L in range(0, len(a)+1):
      for subset in itertools.combinations(a, L):
          subset = list(subset)
          opciones.append(subset)

  return opciones[1:-1]

def max_information_gain_split(x, y, func=entropy):

  split_value = []
  gain = []

  numeric_variable = True if x.dtypes != 'O' else False

  if numeric_variable:
    options = x.sort_values().unique()[1:]
  else:
    options = categorical_options(x)

  # Calculate gain for all values
  for val in options:
    mask =   x < val if numeric_variable else x.isin(val)
    val_gain = information_gain(y, mask, func)
    # Append results
    gain.append(val_gain)
    split_value.append(val)

  if len(gain) == 0:
    return(None,None,None,False)

  else:
    best_gain = max(gain)
    best_gain_index = gain.index(best_gain)
    best_split = split_value[best_gain_index]
    return(best_gain,best_split,numeric_variable, True)

age_gain, age_split, _, _ = max_information_gain_split(dataset['Age'], dataset['Physical_Activity_Level'],)

print(
  "The best split for Age is when the variable is less than ",
  age_split,"\nInformation Gain for that split is:", age_gain)

The best split for Age is when the variable is less than  29 
Information Gain for that split is: 0.8666666666666663


best split will be the one that generates the highest Information Gain. To know which one is it, we simply have to calculate the Information Gain for each of the predictor variables of the model.

In [7]:
dataset.drop('Physical_Activity_Level', axis= 1).apply(max_information_gain_split, y = dataset['Physical_Activity_Level'])

Unnamed: 0,Participant_ID,Sleep_Hours,Sleep_Quality_Score,Daytime_Sleepiness,Stroop_Task_Reaction_Time,N_Back_Accuracy,Emotion_Regulation_Score,PVT_Reaction_Time,Age,Gender,BMI,Caffeine_Intake,Stress_Level
0,5.026667,2.558333,4.133333,1.358333,3.3,2.186667,1.358333,1.506667,0.866667,-0.352381,1.358333,2.466667,2.558333
1,"[P1, P3, P4, P8]",8.7,14,21,3.28,72.42,66,325.93,29,[Male],34.47,3,37
2,False,True,True,True,True,True,True,True,True,False,True,True,True
3,True,True,True,True,True,True,True,True,True,True,True,True,True


For Training

In [8]:
def get_best_split(y, dataset):
  masks = dataset.drop(y, axis= 1).apply(max_information_gain_split, y = dataset[y])
  if sum(masks.loc[3,:]) == 0:
    return(None, None, None, None)

  else:
    # Get only masks that can be splitted
    masks = masks.loc[:,masks.loc[3,:]]

    # Get the results for split with highest IG
    split_variable = masks.iloc[0].astype(np.float32).idxmax()
    #split_valid = masks[split_variable][]
    split_value = masks[split_variable][1]
    split_gain = masks[split_variable][0]
    split_numeric = masks[split_variable][2]

    return(split_variable, split_value, split_ig, split_numeric)


def make_split(variable, value, dataset, is_numeric):
  if is_numeric:
    dataset_1 = dataset[dataset[variable] < value]
    dataset_2 = dataset[(dataset[variable] < value) == False]
  else:
    dataset_1 = dataset[dataset[variable].isin(value)]
    dataset_2 = dataset[(dataset[variable].isin(value)) == False]

  return(dataset_1,dataset_2)


def make_prediction(dataset, target_factor):
  if target_factor:
    pred = dataset.value_counts().idxmax()
  else:
    pred = dataset.mean()

  return pred

In [9]:
def train_tree(dataset,y, target_factor, max_depth = None,min_samples_split = None, min_information_gain = 1e-20, counter=0, max_categories = 20):

  if counter==0:
    types = dataset.dtypes
    check_columns = types[types == "object"].index
    for column in check_columns:
      var_length = len(dataset[column].value_counts())
      if var_length > max_categories:
        raise ValueError('The variable ' + column + ' has '+ str(var_length) + ' unique values, which is more than the accepted ones: ' +  str(max_categories))

  if max_depth == None:
    depth_cond = True

  else:
    if counter < max_depth:
      depth_cond = True

    else:
      depth_cond = False

  if min_samples_split == None:
    sample_cond = True

  else:
    if dataset.shape[0] > min_samples_split:
      sample_cond = True

    else:
      sample_cond = False

  if depth_cond & sample_cond:
    var,val,gain,var_type = get_best_split(y, dataset)

    if gain is not None and gain >= min_information_gain:
      counter += 1
      left,right = make_split(var, val, dataset,var_type)

      # Instantiate sub-tree
      split_type = "<=" if var_type else "in"
      question =   "{} {}  {}".format(var,split_type,val)
      # question = "\n" + counter*" " + "|->" + var + " " + split_type + " " + str(val)
      subtree = {question: []}

      # Find answers (recursion)
      yes_answer = train_tree(left,y, target_factor, max_depth,min_samples_split,min_information_gain, counter)

      no_answer = train_tree(right,y, target_factor, max_depth,min_samples_split,min_information_gain, counter)

      if yes_answer == no_answer:
        subtree = yes_answer

      else:
        subtree[question].append(yes_answer)
        subtree[question].append(no_answer)

    else:
      pred = make_prediction(dataset[y],target_factor)
      return pred

  else:
    pred = make_prediction(dataset[y],target_factor)
    return pred

  return subtree

max_depth = 5
min_samples_split = 20
min_information_gain  = 1e-5

decision = train_tree(dataset,'Physical_Activity_Level',True, max_depth,min_samples_split,min_information_gain)

decision

4

In [10]:
def classify_data(observation, tree):
  question = list(tree.keys())[0]

  if question.split()[1] == '<=':
    if observation[question.split()[0]] <= float(question.split()[2]):
      answer = tree[question][0]
    else:
      answer = tree[question][1]
  else:
    if observation[question.split()[0]] in (question.split()[2]):
      answer = tree[question][0]
    else:
      answer = tree[question][1]

  if not isinstance(answer, dict):
    return answer
  else:
    residual_tree = answer
    return classify_data(observation, answer)

TASKS:


1.   Implement decision tree algorithm on a dataset of your choice.
2.   Instead of Entropy, use GINI INDEX and observe the performance for any difference(s).
3.   Employ SciKit Learn implementation of decision tree and observe for any difference(s). Try different types of trees.

In [11]:
def gini(y):
  if isinstance(y, pd.Series):
    a = y.value_counts()/y.shape[0]
    gini = 1-np.sum(a**2)
    return(gini)
  else:
    raise('Object must be a Pandas Series.')

gini(dataset['Sleep_Quality_Score'])

0.82

In [12]:
gini_index = gini(dataset['Physical_Activity_Level'])
print(f"Gini index for 'Physical_Activity_Level': {gini_index}")

Gini index for 'Physical_Activity_Level': 0.82


In [13]:
def information_gain(y, mask, func=gini):
  a = sum(mask)
  b = mask.shape[0] - a

  if(a == 0 or b ==0):
    gain = 0
  else:
    if y.dtypes != 'O':
      gain = variance(y) - (a/(a+b)* variance(y[mask])) - (b/(a+b)*variance(y[~mask]))
    else:
      gain = func(y)-a/(a+b)*func(y[mask])-b/(a+b)*func(y[~mask])

  return gain

In [14]:
def max_information_gain_split(x, y, func=gini):

  split_value = []
  gain = []

  numeric_variable = True if x.dtypes != 'O' else False

  if numeric_variable:
    options = x.sort_values().unique()[1:]
  else:
    options = categorical_options(x)

  # Calculate gain for all values
  for val in options:
    mask =   x < val if numeric_variable else x.isin(val)
    val_gain = information_gain(y, mask, func)
    # Append results
    gain.append(val_gain)
    split_value.append(val)

  if len(gain) == 0:
    return(None,None,None,False)

  else:
    best_gain = max(gain)
    best_gain_index = gain.index(best_gain)
    best_split = split_value[best_gain_index]
    return(best_gain,best_split,numeric_variable, True)

age_gain, age_split, _, _ = max_information_gain_split(dataset['Age'], dataset['Physical_Activity_Level'], func=gini)

print(
  "The best split for Age is when the variable is less than ",
  age_split,"\nInformation Gain for that split is:", age_gain)

The best split for Age is when the variable is less than  29 
Information Gain for that split is: 0.8666666666666663


In [15]:
dataset['Sleep_Deprived'] = dataset['Sleep_Hours'].apply(lambda x: 1 if x <= 6 else 0)

In [16]:
X = dataset.drop(columns=['Sleep_Quality_Score', 'Emotion_Regulation_Score', 'Caffeine_Intake','Physical_Activity_Level',	'Stress_Level'])
y = dataset['Sleep_Deprived']

In [17]:
categorical_features = X.select_dtypes(include=['object']).columns

In [18]:
X = pd.get_dummies(X, columns=categorical_features, drop_first=True)

In [19]:
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

In [20]:
logistic = LogisticRegression()
logistic.fit(X_train,y_train)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [21]:
# Gini index performance evaluation
gini_tree = DecisionTreeClassifier(criterion='gini', random_state=0)
gini_tree.fit(X_train, y_train)
gini_predictions = gini_tree.predict(X_test)
gini_accuracy = accuracy_score(y_test, gini_predictions)
print(f"Accuracy of decision tree (Gini): {gini_accuracy}")
print(classification_report(y_test, gini_predictions))


# Entropy performance evaluation
entropy_tree = DecisionTreeClassifier(criterion='entropy', random_state=0)
entropy_tree.fit(X_train, y_train)
entropy_predictions = entropy_tree.predict(X_test)
entropy_accuracy = accuracy_score(y_test, entropy_predictions)
print(f"Accuracy of decision tree (Entropy): {entropy_accuracy}")
print(classification_report(y_test, entropy_predictions))


# Logistic Regression model performance evaluation
logistic_predictions = logistic.predict(X_test)
logistic_accuracy = accuracy_score(y_test, logistic_predictions)
print(f"Accuracy of Logistic Regression: {logistic_accuracy}")
print(classification_report(y_test, logistic_predictions))


Accuracy of decision tree (Gini): 1.0
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2

    accuracy                           1.00         2
   macro avg       1.00      1.00      1.00         2
weighted avg       1.00      1.00      1.00         2

Accuracy of decision tree (Entropy): 1.0
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2

    accuracy                           1.00         2
   macro avg       1.00      1.00      1.00         2
weighted avg       1.00      1.00      1.00         2

Accuracy of Logistic Regression: 1.0
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2

    accuracy                           1.00         2
   macro avg       1.00      1.00      1.00         2
weighted avg       1.00      1.00      1.00         2



In [22]:
for max_depth in [3, 5, 10]:
    for min_samples_split in [2, 10, 20]:
        clf = DecisionTreeClassifier(criterion='gini', max_depth=max_depth, min_samples_split=min_samples_split)
        clf.fit(X_train, y_train)
        y_pred = clf.predict(X_test)
        print(f'Model with max_depth={max_depth}, min_samples_split={min_samples_split}')
        print(classification_report(y_test, y_pred))

Model with max_depth=3, min_samples_split=2
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2

    accuracy                           1.00         2
   macro avg       1.00      1.00      1.00         2
weighted avg       1.00      1.00      1.00         2

Model with max_depth=3, min_samples_split=10
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2

    accuracy                           1.00         2
   macro avg       1.00      1.00      1.00         2
weighted avg       1.00      1.00      1.00         2

Model with max_depth=3, min_samples_split=20
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2

    accuracy                           1.00         2
   macro avg       1.00      1.00      1.00         2
weighted avg       1.00      1.00      1.00         2

Model with max_depth=5, min_samples_split=2
   