<a href="https://colab.research.google.com/github/radiaated/MedicalInsurancePricePrediction/blob/main/Medical_Insurance_Price_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Medical Insurance Price prediction

In [306]:
# Import modules
import numpy as np
import pandas as pd

## ID3 Decision Tree

In [307]:
# Helper functions
def cov(values: pd.Series):
  """
  Compute the coefficient of variation (CoV) for a numeric pandas Series.

  The coefficient of variation is defined as the ratio of the standard
  deviation to the mean:

      CoV = std(values) / mean(values)

  Parameters
  ----------
  values : pd.Series
      A pandas Series containing numeric values.

  Returns
  -------
  float
      The coefficient of variation of the input values.
  """


  return np.std(values) / np.mean(values)

def stdr_xy(X: pd.Series, y: pd.Series):
  """
  Compute the standard deviation reduction (STDR) of target variable y
  after splitting by the categorical values in X.

  The function calculates:
      STDR = std(y) - sum_over_groups( (n_i / n) * std(y_i) )

  where:
      - y_i is the subset of y corresponding to each unique value in X
      - n_i is the size of each subset
      - n is the total number of samples

  Parameters
  ----------
  X : pd.Series
      A pandas Series containing categorical or discrete grouping values.
  y : pd.Series
      A pandas Series containing numeric target values aligned with X.

  Returns
  -------
  float
      The reduction in standard deviation of y after partitioning by X.
  """

  stdr = 0
  for val in X.unique():

    x_value = X[X == val]
    y_value = y.loc[x_value.index]

    stdr += (len(x_value) / len(X)) * np.std(y_value)

  stdr = np.std(y) - stdr

  return stdr


def encode_columns(columns: list):

  encoding = {}

  for i in range(len(columns)):

    encoding[i] = columns[i]

  return encoding



In [308]:
class DecisionNode():
  """
  A node in a decision tree structure.

  A DecisionNode can represent either:
  - An internal decision node that splits on a feature and contains branches, or
  - A leaf node that stores a predicted value.

  Parameters
  ----------
  feature_idx : int, optional
      The index of the feature used for splitting at this node.
      Should be None for leaf nodes.
  branches : dict, optional
      A dictionary mapping feature values to child DecisionNode objects.
      Used only for internal (non-leaf) nodes.
  leaf_value : float, optional
      The prediction value stored in the node if it is a leaf.
      Should be None for internal nodes.

  Attributes
  ----------
  feature_idx : int
      Feature index used for splitting at this node.
  branches : dict
      Mapping of feature values to child nodes.
  leaf_value : float
      Prediction value if the node is a leaf.
  """

  def __init__(self, feature_idx: int = None, branches: dict =None, leaf_value: float =None):

    self.feature_idx = feature_idx
    self.branches = branches
    self.leaf_value = leaf_value

In [309]:
class DecisionTree():
  """
  A simple Decision Tree implementation for regression tasks.

  The tree splits data based on the feature that maximizes a custom
  standard deviation reduction metric (stdr_xy). Splitting stops when:
  - The number of samples is less than or equal to min_samples
  - The maximum depth is reached
  - The covariance of the target values is below the splitting threshold

  Parameters
  ----------
  splitting_threhold : float
      Minimum covariance threshold required to continue splitting.
  max_depth : int
      Maximum depth allowed for the tree.
  min_samples : int
      Minimum number of samples required to split a node.
  """

  def __init__(self, max_depth: int, min_samples: int, splitting_threhold: float = None):
    """
    Initialize the Decision Tree with stopping criteria.

    Parameters
    ----------
    splitting_threhold : float
        Minimum covariance required to continue splitting.
    max_depth : int
        Maximum depth of the tree.
    min_samples : int
        Minimum number of samples required to split.
    """

    # Threshold for covariance stopping condition
    self.splitting_threhold = splitting_threhold

    # Maximum allowed depth of the tree
    self.max_depth = max_depth

    # Minimum number of samples required to perform a split
    self.min_samples = min_samples

    # Current depth of the tree
    self.depth = 0

    # Root node of the tree (DecisionNode)
    self.root: DecisionNode = None

    self.column_encoding = None


  def build(self, X_train: pd.DataFrame, y_train: pd.Series):
    """
    Recursively builds the decision tree.

    Parameters
    ----------
    X_train : pd.DataFrame
        Feature dataset.
    y_train : pd.Series
        Target values.

    Returns
    -------
    DecisionNode
        A node representing either a leaf or an internal split node.
    """

    # Stopping conditions:
    # 1. Too few samples
    # 2. Maximum depth reached
    # 3. Target covariance is below threshold

    if len(X_train) <= self.min_samples or self.depth >= self.max_depth or (self.splitting_threhold and cov(y_train) <= self.splitting_threhold):

      # Create a leaf node with the mean target value
      return DecisionNode(leaf_value = np.mean(y_train))

    # Track the best standard deviation reduction
    max_stdr = -1

    # Feature chosen for splitting
    split_feature = None

    # Iterate over all features to find the best split
    for col in X_train.columns:

      # Compute custom standard deviation reduction metric
      stdr = stdr_xy(X_train[col], y_train)

      # Update best feature if improvement found
      if stdr > max_stdr:

        max_stdr = stdr
        split_feature = col

    # Dictionary to store child branches
    branches = {}

    # Create a branch for each unique value of the selected feature
    for col_value in X_train[split_feature].unique():

      # Subset of X where feature equals the specific value
      X_col_value = X_train[X_train[split_feature] == col_value]

      # Corresponding target values
      y_col_value = y_train.loc[X_col_value.index]

      # Remove the splitting feature from the subset
      X_col_value = X_col_value.drop(split_feature, axis=1)

      # Recursively build subtree for this branch
      branches[col_value] = self.build(X_col_value, y_col_value)

    # Increase depth after building this level
    self.depth += 1

    # Return internal decision node
    return DecisionNode(feature_idx = split_feature, branches = branches)


  def fit(self, X_train: pd.DataFrame, y_train: pd.Series):
    """
    Train the Decision Tree on the provided dataset.

    Parameters
    ----------
    X_train : pd.DataFrame
        Training features.
    y_train : pd.Series
        Training target values.
    """

    self.column_encoding = encode_columns(X_train.columns)

    reverse_column_encoding = { val: key for key, val in self.column_encoding.items() }

    X_train = X_train.rename(columns=reverse_column_encoding)
    X_train.columns = X_train.columns.astype(int)

    # Build the tree starting from the root
    self.root = self.build(X_train, y_train)


  def __visualize_node(self, node: DecisionNode, depth = 0):
    """
    Recursively prints a visual representation of the tree.

    Parameters
    ----------
    node : DecisionNode
        Current node being visualized.
    depth : int
        Current depth (used for indentation).
    """

    # If this is a leaf node, print its value
    if node.leaf_value:
      print(node.leaf_value)
      return

    # Print feature used for splitting
    print("[ %s ]" % self.column_encoding[node.feature_idx])

    # Recursively print branches
    for feature_value, node in node.branches.items():

      # Indentation based on depth level
      print("\t" * depth + f" -- {feature_value} --> ", end="")

      self.__visualize_node(node, depth = depth + 1)


  def visualize(self):
    """
    Public method to print the full tree structure.
    """

    self.__visualize_node(self.root)


  def __traverse(self, node: DecisionNode, X: dict):
    """
    Recursively traverse the tree to make a prediction.

    Parameters
    ----------
    node : DecisionNode
        Current node in traversal.
    X : dict
        Single sample represented as a dictionary of feature-value pairs.

    Returns
    -------
    float
        Predicted value from the leaf node.
    """

    # If leaf node, return stored prediction
    if node.leaf_value:
      return node.leaf_value

    # Get the feature value for current split
    feature_value = X[node.feature_idx]

    # Move to the corresponding child node
    next_node = node.branches[feature_value]

    # Continue traversal
    return self.__traverse(next_node, X)


  def predict(self, X: dict):
    """
    Predict the output for a single sample.

    Parameters
    ----------
    X : dict
        Feature dictionary for one data sample.

    Returns
    -------
    float
        Predicted value.
    """

    # Encode the column names of input data to feature idx
    X_inp = X.copy()

    for key, value in self.column_encoding.items():

      X_inp[key] = X_inp.pop(value)

    # Return the predicted value
    return self.__traverse(self.root, X_inp)


## Fitting a dummy dataset on the decision tree

Refer to:
LINK!!!!

In [310]:
# Dummy data
data = {
    "Outlook": ["Rainy", "Rainy", "Overcast", "Sunny", "Sunny", "Sunny", "Overcast", "Rainy", "Rainy", "Sunny", "Rainy", "Overcast", "Overcast", "Sunny"],
    "Temp": ["Hot", "Hot", "Hot", "Mild", "Cool", "Cool", "Cool", "Mild", "Cool", "Mild", "Mild", "Mild", "Hot", "Mild"],
    "Humidity": ["High", "High", "High", "High", "Normal", "Normal", "Normal", "High", "Normal", "Normal", "Normal", "High", "Normal", "High"],
    "Windy": ["False", "True", "False", "False", "False", "True", "True", "False", "False", "False", "True", "True", "False", "True"],
    "Windy": ["False", "True", "False", "False", "False", "True", "True", "False", "False", "False", "True", "True", "False", "True"],
    "HoursPlayed": [25,30,46,45,52,23,43,35,38,46,48,52,44,30],
}

df = pd.DataFrame(data)

# Fitting decision tree
tree = DecisionTree(max_depth=4, min_samples=3, splitting_threhold=0.1)
tree.fit(df.drop("HoursPlayed", axis=1), df["HoursPlayed"])

tree.visualize()

[ Outlook ]
 -- Rainy --> [ Temp ]
	 -- Hot --> 27.5
	 -- Mild --> 41.5
	 -- Cool --> 38.0
 -- Overcast --> 46.25
 -- Sunny --> [ Windy ]
	 -- False --> 47.666666666666664
	 -- True --> 26.5


## Gradient Boosting

The differentiable loss function used is `1 / 2 (observed - predicted)^2`.
Therefore all the prediction functions are made to accomodate the given loss fucntion for easier calculations.

In [311]:
class GradientBoosting():

  def __init__(self, learners_count: int, learning_rate: float, max_depth: int, min_samples: int, splitting_threhold: float = None):

    self.learners_count = learners_count
    self.learning_rate = learning_rate
    self.splitting_threhold = splitting_threhold
    self.max_depth = max_depth
    self.min_samples = min_samples
    self.learners = None
    self.f_c = None

  def predict(self, X):

    pred = self.f_c

    for i in range(self.learners_count):

      pred += self.learning_rate * self.learners[i].predict(X)


    return pred


  def fit(self, X_train: pd.DataFrame, y_train: pd.Series):


    self.f_c = f_x = np.mean(y_train)


    for i in range(self.learners_count):

      residual = y_train - f_x

      d_t = DecisionTree(splitting_threhold = self.splitting_threhold, max_depth = self.max_depth, min_samples = self.min_samples)

      d_t.fit(X_train, residual)

      if self.learners:

        self.learners.append(d_t)

      else:
        self.learners = [d_t]

      f_x += self.learning_rate * pd.Series([d_t.predict(row.to_dict()) for _, row in X_train.iterrows()])

  def predict(self, X: dict):

    y_pred = self.f_c + sum(learner.predict(X) for learner in self.learners)

    return y_pred


### Testing on Dummy Data

In [312]:
df


Unnamed: 0,Outlook,Temp,Humidity,Windy,HoursPlayed
0,Rainy,Hot,High,False,25
1,Rainy,Hot,High,True,30
2,Overcast,Hot,High,False,46
3,Sunny,Mild,High,False,45
4,Sunny,Cool,Normal,False,52
5,Sunny,Cool,Normal,True,23
6,Overcast,Cool,Normal,True,43
7,Rainy,Mild,High,False,35
8,Rainy,Cool,Normal,False,38
9,Sunny,Mild,Normal,False,46


In [313]:
X = df.drop("HoursPlayed", axis=1)
y = df["HoursPlayed"]

In [314]:
# Fitting the dummy data on Gradient Boosting model
gb = GradientBoosting(learners_count=2, learning_rate=0.1, max_depth=4, min_samples=2, splitting_threhold = 0.1)

gb.fit(X, y)

### Visualizing Learner Trees from GB

In [315]:
for tree in gb.learners:

  tree.visualize()

[ Outlook ]
 -- Rainy --> -4.5857142857142845
 -- Overcast --> [ Temp ]
	 -- Hot --> 5.214285714285715
	 -- Cool --> 3.2142857142857153
	 -- Mild --> 12.214285714285715
 -- Sunny --> -0.5857142857142847
[ Outlook ]
 -- Rainy --> -4.127142857142853
 -- Overcast --> [ Temp ]
	 -- Hot --> 4.692857142857143
	 -- Cool --> 2.892857142857146
	 -- Mild --> 10.992857142857147
 -- Sunny --> -0.5271428571428587


### Predicting and comparing the result from the model

In [316]:
y_pred = pd.Series([gb.predict(item.to_dict()) for _, item in X.iterrows()], name="Prediction")
pd.concat([y, y_pred], axis=1)

Unnamed: 0,HoursPlayed,Prediction
0,25,31.072857
1,30,31.072857
2,46,49.692857
3,45,38.672857
4,52,38.672857
5,23,38.672857
6,43,45.892857
7,35,31.072857
8,38,31.072857
9,46,38.672857
