# Random forst algorithmen based on regression trees - implementation

In [134]:
import numpy as np
import random
import pandas as pd

## Regression trees - implementation

To dos:  
Splitting:  
- option for minimum sample size for split
- option for minimum information gain for split
- spit by minimum SSR of all possible splits
- candidates for the root will be decided via min SSR

Evaluation:  
- average of values in note level

### Helper functions

This function evaluates the effectiveness of a splitting candidate by calculating a specified metric for two subsets of data based on the given method.

In [135]:
def evaluate(subset1, subset2, method):
    if method == "min. MSR":
        predictor1 = subset1.mean()
        predictor2 = subset2.mean()
        
        SSR1 = ((subset1 - predictor1) ** 2).sum() 
        SSR2 = ((subset2 - predictor2) ** 2).sum()
        MSR = (SSR1 + SSR2)/ (len(subset1) + len(subset2))
        return MSR
    else:
        print("Error! ", method, " is not a method!")

This function identifies the best splitting candidate for the first column of a dataset by evaluating the effectiveness of splits on the second column, based on a specified evaluation method and optimization criterion (e.g., minimizing MSR).

In [136]:
def set_optimize_best_evaluation(method):
    if (method == "min. MSR"):
        optimize = "min"
        best_evaluation = float("inf")

    elif (method == "???"):
        optimize = "max"
        best_evaluation = float("-inf")
        
    else:
        print("ERROR! ", method, " is not a implemented method!")
        method = "min. MSR"
        optimize = "min"
        best_evaluation = float("inf")
    
    return optimize, best_evaluation

In [137]:
def RT_univariate_split(independent_variable, dependent_variable, method = "min. MSR"):
    
    #print("DEBUG: type(independent_variable) is \n", type(independent_variable), "\n")

    if isinstance(independent_variable, pd.DataFrame):
        independent_variable = independent_variable.iloc[:, 0]
        #print("DEBUG: type(independent_variable) is \n", type(independent_variable), "\n")
    
    candidates = [
        x for x in independent_variable.unique() if x != independent_variable.min() and x != independent_variable.max()
    ]
    ##print("DEBUG: candidates is \n", candidates, "\n")
    

    optimize, best_evaluation = set_optimize_best_evaluation(method)

    best_candidate = None
    
    for candidate in candidates:
        ##print("DEBUG: candidate set to \n", candidate, "\n")

        subset1 = dependent_variable[independent_variable < candidate]
        subset2 = dependent_variable[independent_variable >= candidate]
        #print("DEBUG: subset1 set to \n", subset1, "\n")
        #print("DEBUG: subset2 set to \n", subset2, "\n")
        #print("DEBUG: type(subset1) set to \n", type(subset1), "\n")
        #print("DEBUG: type(subset2) set to \n", type(subset2), "\n")

        subset1 = subset1.iloc[:, 0]
        subset2 = subset2.iloc[:, 0]
        #print("DEBUG: subset1 set to \n", subset1, "\n")
        #print("DEBUG: subset2 set to \n", subset2, "\n")
        #print("DEBUG: type(subset1) set to \n", type(subset1), "\n")
        #print("DEBUG: type(subset2) set to \n", type(subset2), "\n")

        candidate_evaluation = evaluate(subset1, subset2, method)
        ##print("DEBUG: candidate_evaluation set to \n", candidate_evaluation, "\n")

        if optimize == "min":

            ##print("DEBUG: best_evaluation is \n", best_evaluation, "\n")
            if candidate_evaluation < best_evaluation:
                best_evaluation = candidate_evaluation
                best_candidate = candidate
                ##print("DEBUG: best_candidate set to \n", best_candidate, "\n")
        else:
            if candidate_evaluation > best_evaluation:
                best_evaluation = candidate_evaluation
                best_candidate = candidate
                ##print("DEBUG: best_candidate set to \n", best_candidate, "\n")

    return best_candidate, best_evaluation

In [138]:
def RT_split(independent_variables, dependent_variable, method = "min. MSR", random_feature_count = None):
    
    optimize, best_evaluation = set_optimize_best_evaluation(method)
    best_column = None
    best_candidate = None

    split_results = {}

    feature_columns = independent_variables.columns
    ##print("DEBUG: feature_columns set to \n", feature_columns, "\n")
    if random_feature_count:
        if random_feature_count >= len(feature_columns):
            #print("Warning! Not enough features! Deactivate random feature selection.")
            random_feature_count = None
        elif not isinstance(random_feature_count, int) or random_feature_count < 1:
            #print("DEBUG: isinstance(random_feature_count, int) is set to \n", isinstance(random_feature_count, int), "\n")
            #print("DEBUG: random_feature_count is set to \n", random_feature_count, "\n")
            #print("Warning! random_feature_count must be int and >= 1! Deactivate random feature selection.")
            random_feature_count = None

    if random_feature_count:
        feature_columns = random.sample(list(feature_columns), random_feature_count)
        #print("DEBUG: feature_columns set to \n", feature_columns, "\n")

    for col in feature_columns:
        
        candidate, evaluation = RT_univariate_split(independent_variables[col], dependent_variable, method)
        split_results[col] = {"candidate": candidate, "evaluation": evaluation}
        ##print("DEBUG: split_results[", col, "] set to \n", split_results[col], "\n")

        if optimize == "min":
            for col, result in split_results.items():
                if result["evaluation"] < best_evaluation:
                    best_evaluation = result["evaluation"]
                    best_column = col
                    best_candidate = result["candidate"]
        else:
            for col, result in split_results.items():
                if result["evaluation"] > best_evaluation:
                    best_evaluation = result["evaluation"]
                    best_column = col
                    best_candidate = result["candidate"]

    return best_column, best_candidate, best_evaluation

In [139]:
def subset_data(data, dependent_column):
    dependent_variable = data[[dependent_column]]
    independent_variables = data.drop(columns=[dependent_column])
    return independent_variables, dependent_variable

In [150]:
def regression_tree(independent_variables, dependent_variable, min_evaluation_gain=0.1, 
                    method="min. MSR", max_depth=20, random_feature_count=None, depth=0, min_samples_split=0):
    # Initialize
    optimize, best_evaluation = set_optimize_best_evaluation(method)
    evaluation_gain = min_evaluation_gain
    results = {}
    num_splits = 0

    # Stop recursion if max depth is exceeded
    if depth > max_depth:
        return results

    while evaluation_gain >= min_evaluation_gain:
        # Determine the best split
        column, candidate, evaluation = RT_split(independent_variables, dependent_variable, method, random_feature_count)
        
        if column is None or candidate is None:
            # Stop if no valid split is found
            break

        # Calculate evaluation gain
        if optimize == "min":
            evaluation_gain = best_evaluation / evaluation - 1
            best_evaluation = evaluation
        else:
            evaluation_gain = evaluation / best_evaluation - 1
            best_evaluation = evaluation

        # Store results for the current split
        results[num_splits] = {
            "split_column": column,
            "split_value": candidate,
            "evaluation_score": evaluation,
            "evaluation_gain": evaluation_gain,
            "depth": depth
        }
        print("DEBUG: results[", num_splits, "] set to \n", results[num_splits], "\n")

        # Split the data into subsets
        indep_subset1 = independent_variables[independent_variables[column] < candidate]
        indep_subset2 = independent_variables[independent_variables[column] >= candidate]
        dep_subset1 = dependent_variable[independent_variables[column] < candidate]
        dep_subset2 = dependent_variable[independent_variables[column] >= candidate]

        # Stop splitting if subset size is less than min_samples_split
        if len(indep_subset1) < min_samples_split or len(indep_subset2) < min_samples_split:
            break

        num_splits += 1

        # Recursive call for the left subtree
        left_results = regression_tree(
            independent_variables=indep_subset1,
            dependent_variable=dep_subset1,
            min_evaluation_gain=min_evaluation_gain,
            method=method,
            max_depth=max_depth,
            random_feature_count=random_feature_count,
            depth=depth + 1,
            min_samples_split=min_samples_split
        )

        # Recursive call for the right subtree
        right_results = regression_tree(
            independent_variables=indep_subset2,
            dependent_variable=dep_subset2,
            min_evaluation_gain=min_evaluation_gain,
            method=method,
            max_depth=max_depth,
            random_feature_count=random_feature_count,
            depth=depth + 1,
            min_samples_split=min_samples_split
        )

        # Combine results
        results.update(left_results)
        results.update(right_results)

        

    return results

### Tree developement

In [151]:
data = pd.DataFrame({
    "Bedrooms": [2, 3, 4, 5, 3, 4, 2, 3],
    "Square Footage": [1500, 1800, 2200, 2000, 2500, 2300, 1600, 1900],
    "Age": [20, 15, 25, 10, 5, 8, 30, 12],
    "Price": [250, 300, 300, 320, 500, 450, 350, 350]
})

data2 = pd.DataFrame({
    "Bedrooms": [2, 3, 4, 3, 5, 4, 2, 3],
    "Price": [250, 300, 400, 320, 500, 450, 200, 350]
})

independent_variables, dependent_variable = subset_data(data, "Price")
#print(independent_variables)

#print("Result of evaluate function:", evaluate(subset1, subset2, method="min. MSR"))
#print(RT_univariate_split(independent_variables, dependent_variable))
#print(RT_split(independent_variables, dependent_variable, random_feature_count = 2))
print(regression_tree(independent_variables, dependent_variable, min_evaluation_gain=0.1))


DEBUG: results[ 0 ] set to 
 {'split_column': 'Square Footage', 'split_value': 2300, 'evaluation_score': 1041.6666666666667, 'evaluation_gain': inf, 'depth': 0} 

DEBUG: results[ 0 ] set to 
 {'split_column': 'Square Footage', 'split_value': 1600, 'evaluation_score': 420.0, 'evaluation_gain': inf, 'depth': 1} 

DEBUG: results[ 0 ] set to 
 {'split_column': 'Bedrooms', 'split_value': 3, 'evaluation_score': 335.0, 'evaluation_gain': inf, 'depth': 2} 

DEBUG: results[ 0 ] set to 
 {'split_column': 'Age', 'split_value': 15, 'evaluation_score': 112.5, 'evaluation_gain': inf, 'depth': 3} 

DEBUG: results[ 1 ] set to 
 {'split_column': 'Age', 'split_value': 15, 'evaluation_score': 112.5, 'evaluation_gain': 0.0, 'depth': 3} 

DEBUG: results[ 1 ] set to 
 {'split_column': 'Bedrooms', 'split_value': 3, 'evaluation_score': 335.0, 'evaluation_gain': 0.0, 'depth': 2} 

DEBUG: results[ 0 ] set to 
 {'split_column': 'Age', 'split_value': 15, 'evaluation_score': 112.5, 'evaluation_gain': inf, 'depth':