In [1]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
import random as rand
from pyspark.sql.window import Window

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("randomforest").getOrCreate()

24/05/05 11:32:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
#random gen dataset
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
x_all_categorical_arr = np.random.randint(0, 2, (100, 10))
x_all_numerical_arr = np.random.rand(100, 10)
y_categorical_arr = np.random.randint(0, 2, 100)
#balanced_arr = np.concatenate([balanced_categorical_arr, balanced_numerical_arr], axis=1)
print(f"all_categorical_arr: {x_all_categorical_arr.shape=}")
print(f"all_numerical_arr: {x_all_numerical_arr.shape=}")
#print(f"balanced_arr: {balanced_arr.shape=}")

all_categorical_arr: x_all_categorical_arr.shape=(100, 10)
all_numerical_arr: x_all_numerical_arr.shape=(100, 10)


In [4]:
x_data = spark.createDataFrame(x_all_numerical_arr)

In [5]:
y_data = spark.createDataFrame(y_categorical_arr,['y'])

In [6]:
#Index
x_indexed=x_data.withColumn("id",monotonically_increasing_id())
y_indexed=y_data.withColumn("id",monotonically_increasing_id())

In [7]:
# DEVELOPMENT: create joined df for computation with one id
joined_df = x_indexed.join(y_indexed, "id").drop('id')
x_train = joined_df.drop('y')
y_train = joined_df.select('y')
print(x_train.columns)
print(y_train.columns)

['_1', '_2', '_3', '_4', '_5', '_6', '_7', '_8', '_9', '_10']
['y']


In [8]:
# bootstrap function definition
RANDOM_SEED = 42

# weighted bootstrap subdataset

#partition the dataframe dataset
joined_df = joined_df.repartition(10)
joined_df.cache()

DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint]

In [9]:
# Define entropy for classification evaluation crtieria
# receives a probably as input to calculate entropy

def class_entropy(df):
    #  entropy calculation for binary classification
    col_name = "y"
    counts = df.groupBy(col_name).count()
    total = df.count()
    return counts.withColumn("prob", F.col("count") / total).select(
        F.sum(-F.col("prob") * F.log2(F.col("prob"))).alias("entropy")
    ).first()["entropy"]

def prob(df):
    #  probably calcluation for binary classification
    count = np.count_nonzero(df)   
    if count == 0:
        return float(0)
    else:
        total = len(df)
        prob = np.divide(count,total)
        return float(prob)

class_entropy_udf = udf(class_entropy, ArrayType(DoubleType()))
prob_udf = udf(prob,FloatType())

In [10]:
def new_split_2(joined_df, feature_index):
    
    # Select relevant columns
    feature_col_name = joined_df.columns[feature_index]
    y_col_name = joined_df.columns[-1]
    split_data = joined_df.select(feature_col_name, y_col_name)\
        .withColumnRenamed(feature_col_name,"feature")\
        .withColumnRenamed(y_col_name,"y")
    
    # Calculate parent entropy
    parent_entropy = class_entropy(joined_df.select("y"))
    broadcast_parent_entropy = spark.sparkContext.broadcast(parent_entropy)
    parent_data_count = joined_df.count()
    broadcast_parent_count = spark.sparkContext.broadcast(parent_data_count)

    
    # Calculate potential splits and their Information Gain
    distinct_values = split_data.select("feature")\
        .withColumnRenamed("feature","split_value")\
        .distinct()\
        .sample(False, 0.1)
    
    joined = split_data.join(F.broadcast(distinct_values),\
                             split_data["feature"] <= distinct_values["split_value"], \
                             "left")

    # Calculate conditional entropy more efficiently
    conditional_counts = joined.groupBy("split_value").agg(
        F.count("y").alias("left_count"),
        F.sum("y").alias("left_sum")
    )
      
    # Calculate probabilities and entropy
    conditional_counts = conditional_counts.withColumn("left_prob", F.col("left_sum") / F.col("left_count"))
    conditional_counts = conditional_counts.withColumn(
        "left_entropy",
        -F.col("left_prob") * F.log2(F.col("left_prob"))\
        - (1 - F.col("left_prob")) * F.log2(1 - F.col("left_prob"))
    )


    # Calculate Information Gain
    info_gain = conditional_counts.withColumn(
        "info_gain",
        broadcast_parent_entropy.value - (F.col("left_entropy") * (F.col("left_count") / broadcast_parent_count.value))
    )
    
    # Get the best split
    best_split = info_gain.orderBy(F.desc("info_gain")).first()
    
    schema = StructType([
        StructField("feature", IntegerType(), True),
        StructField("split_value", FloatType(), True),
        StructField("info_gain", FloatType(), True),
    ])
    
    # Prepare output DataFrame
    if best_split is None or best_split["info_gain"] is None or best_split["split_value"] is None:
        result_df = spark.createDataFrame([(feature_index, float(0), float(0))], schema)
    else:
        result_df = spark.createDataFrame([(feature_index, float(best_split["split_value"]), best_split["info_gain"])], schema)
    
    
    
    
    return result_df

In [166]:
'''
TESTING: 
result_df = new_split_2(joined_df,8)
result_df.show()
'''


In [11]:
# Build tree from splitting

# each tree
# (i) for each feature: find_split
# (ii) Mapbypartition(find_split)

def feature_split(dataset, feature_array):
    
    ''' 
    Input: 
    partition: a pyspark dataframe partition to be called by foreachPartition,
    feature_array: a broadcasted feature array for the tree that is intiialized earlier on
    '''
    #define schema
    schema = StructType([
        StructField("feature", IntegerType(), True),
        StructField("split_value", FloatType(), True),
        StructField("info_gain", FloatType(), True),
    ])
    feature_df = spark.createDataFrame([], schema)
    
    # for each feature array, get a split and append the dataframe 
    for feature_index in feature_array:
        
        # find split
        feature_split = new_split_2(dataset, feature_index)
        
        #add feature  
        feature_df = feature_df.union(feature_split)
        
    return feature_df

                                                                                

DataFrame[feature: int, split_value: float, info_gain: float]

In [43]:
feature_split(joined_df,[0,1,2])

DataFrame[feature: int, split_value: float, info_gain: float]

In [12]:
def split_node(df,feature_array):

    #init
    y_label = df.columns[-1]
    node_parent = {}
    best_gain = -1
    best_feature= 0 
    best_split = 0
    
    #get first tree
    feature_df = feature_split(df,feature_array)
    feature_list = feature_df.rdd.collect()
    
    #init
    for feature in feature_list:
        
        #assign val
        feature_idx = feature[0]
        split_val = feature[1]
        gain = feature[2]
        
        if gain is None:
            gain = 0
        
        #check best
        if gain > best_gain:
            best_gain = gain
            best_feature = feature_idx
            best_split = split_val
        
    #generate split
    left_df = df.filter(col(joined_df.columns[best_feature]) <= best_split)
    right_df = df.filter(col(joined_df.columns[best_feature]) > best_split)
        
    #assign dictinary value of; key: feature, split_value, gain, and child
    node_parent['feature'] = best_feature
    node_parent['split_value'] = best_split
    node_parent['gain'] = best_gain
    
    
    return node_parent, left_df, right_df

In [46]:
'''
TESTING
split_node(joined_df,[0,1,2])
'''




({'feature': 0,
  'split_value': 0.031586144119501114,
  'gain': 0.9721625447273254},
 DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint],
 DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint])

In [13]:
def build_decision_tree(df, feature_array, max_depth, current_depth=0):
    
    """ Recursively build the decision tree. """
    #set y label
    label_col = df.columns[-1]
    distinct_count = df.select(label_col).distinct().count()
    
    if df is None or df.rdd.isEmpty():
        return df
    elif current_depth == max_depth:
        # Return the most common label
        most_common_label = df.groupBy(label_col).count().orderBy("count", ascending=False).first()[label_col]
        return df
        #return most_common_label 
    elif distinct_count == 1:
        return {"label": df.select(label_col).first()[0]}
    
    #split node
    node_parent, left_df, right_df = split_node(df,feature_array)
    
    
    # Cache potentially reused DataFrames
    if left_df is not None and not left_df.rdd.isEmpty():
        left_df.cache()
    if right_df is not None and not right_df.rdd.isEmpty():
        right_df.cache()

    
    #split if not empty    
    if left_df is None or left_df == []:
        right_subtree = build_decision_tree(right_df, feature_array, max_depth, current_depth + 1)
    elif right_df is None or right_df ==[]:
        left_subtree = build_decision_tree(left_df, feature_array, max_depth, current_depth + 1)
    elif (left_df is None) and (right_df is None):
        return {"feature": node_parent['feature'] , "threshold": node_parent['split_value'], "left": None, "right": None}
    else:
        left_subtree = build_decision_tree(left_df, feature_array, max_depth, current_depth + 1)
        right_subtree = build_decision_tree(right_df, feature_array, max_depth, current_depth + 1)

    # Clear cache after use
    if left_df is not None:
        left_df.unpersist()
    if right_df is not None:
        right_df.unpersist()
        
    
    # return node structure
    return {
        "feature": node_parent['feature'] , 
        "threshold": node_parent['split_value'], 
        "left": left_subtree, 
        "right": right_subtree
    }
    

In [174]:
#TESTING:
#tree = build_decision_tree(joined_df, [0,2,3], max_depth=5)
#tree


                                                                                

{'feature': 0,
 'threshold': 0.06707647442817688,
 'left': {'feature': 0,
  'threshold': 0.0,
  'left': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint],
  'right': {'feature': 2,
   'threshold': 0.17803595960140228,
   'left': {'label': 0},
   'right': {'label': 1}}},
 'right': {'feature': 0,
  'threshold': 0.0,
  'left': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint],
  'right': {'feature': 3,
   'threshold': 0.05708685517311096,
   'left': {'feature': 3,
    'threshold': 0.039426811039447784,
    'left': {'feature': 0,
     'threshold': 0.0,
     'left': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint],
     'right': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: doubl

In [38]:
def random_forest_train(df, num_trees, num_feature, max_depth):

    #init
    trees = {}
    
    # Pre-calculate the feature columns
    columns = df.columns
    feature_columns = df.columns[:-1]
    column_size = len(columns)
    
    #set as indices list
    column_indices = range(len(feature_columns))
    feature_array = [*column_indices]

    # def train_trees
    def train_tree(df, column_size, feature_array, num_feature, max_depth):
            
        #sample feature
        sampled = rand.sample(feature_array, num_feature)
            
        #add y column
        sampled.append(column_size-1)

        # Create a  DataFrame from the part of the partition
        selected_columns = [columns[i] for i in sampled]
        sampled_df = joined_df.select(selected_columns)
            
        # build information of new sample_trees
        sample_df_columns = sampled_df.columns
        sampled_df_features = sample_df_columns[:-1]
        sampled_columns = range(len(sampled_df_features))
        sampled_array = [*sampled_columns]
        
        tree = build_decision_tree(sampled_df, sampled_array, max_depth)
            
        return tree
    
    
    # Apply our function to each partition of the DataFrame
    for tree_idx in range(num_trees):
        tree = train_tree(df, column_size, feature_array, num_feature, max_depth)
        trees[tree_idx] = tree
        df.cache()

    # Unpersist the DataFrame as it's no longer needed
    df.unpersist()

    return trees

In [19]:
trees = random_forest_train(joined_df, num_trees=3,num_feature = 3, max_depth=5)

24/05/05 11:37:01 WARN CacheManager: Asked to cache already cached data.        
24/05/05 11:37:30 WARN CacheManager: Asked to cache already cached data.
24/05/05 11:38:08 WARN CacheManager: Asked to cache already cached data.


In [20]:
trees

{0: {'feature': 0,
  'threshold': 0.16362382471561432,
  'left': {'feature': 0,
   'threshold': 0.0,
   'left': DataFrame[_3: double, _8: double, _9: double, y: bigint],
   'right': {'feature': 0,
    'threshold': 0.0,
    'left': DataFrame[_3: double, _8: double, _9: double, y: bigint],
    'right': {'feature': 0,
     'threshold': 0.0,
     'left': DataFrame[_3: double, _8: double, _9: double, y: bigint],
     'right': {'feature': 0,
      'threshold': 0.0,
      'left': DataFrame[_3: double, _8: double, _9: double, y: bigint],
      'right': DataFrame[_3: double, _8: double, _9: double, y: bigint]}}}},
  'right': {'feature': 1,
   'threshold': 0.36239388585090637,
   'left': {'feature': 2,
    'threshold': 0.35923337936401367,
    'left': {'feature': 0,
     'threshold': 0.0,
     'left': DataFrame[_3: double, _8: double, _9: double, y: bigint],
     'right': {'feature': 0,
      'threshold': 0.0,
      'left': DataFrame[_3: double, _8: double, _9: double, y: bigint],
      'right':

In [40]:
import time

#measuring time for randomforest 
rf_start = time.time()
trees = random_forest_train(joined_df, num_trees=3 , num_feature = 3, max_depth=5)
rf_end = time.time()
rf_time = rf_end - rf_start

24/05/05 11:59:25 WARN CacheManager: Asked to cache already cached data.        
24/05/05 11:59:50 WARN CacheManager: Asked to cache already cached data.


In [41]:
print(rf_time)

75.26498413085938


In [42]:
# show values
trees

{0: {'feature': 2,
  'threshold': 0.03700763359665871,
  'left': {'feature': 0,
   'threshold': 0.0,
   'left': DataFrame[_6: double, _5: double, _1: double, y: bigint],
   'right': {'feature': 0,
    'threshold': 0.7704074382781982,
    'left': {'feature': 1,
     'threshold': 0.6638045310974121,
     'left': {'feature': 0,
      'threshold': 0.0,
      'left': DataFrame[_6: double, _5: double, _1: double, y: bigint],
      'right': DataFrame[_6: double, _5: double, _1: double, y: bigint]},
     'right': {'label': 0}},
    'right': DataFrame[_6: double, _5: double, _1: double, y: bigint]}},
  'right': {'feature': 1,
   'threshold': 0.03038705885410309,
   'left': {'label': 0},
   'right': {'feature': 0,
    'threshold': 0.02997359074652195,
    'left': {'label': 1},
    'right': {'feature': 1,
     'threshold': 0.03038705885410309,
     'left': DataFrame[_6: double, _5: double, _1: double, y: bigint],
     'right': {'feature': 1,
      'threshold': 0.018110183998942375,
      'left': 

In [43]:
#measuring time for one individual tree
tree_start = time.time()
tree = build_decision_tree(joined_df, [0,2,3], max_depth=5)
tree_end = time.time()
tree_time = tree_end - tree_start

                                                                                

In [44]:
print(tree_time)

47.100401878356934


In [45]:
tree    

{'feature': 2,
 'threshold': 0.10659302771091461,
 'left': {'feature': 0,
  'threshold': 0.23107479512691498,
  'left': {'feature': 2,
   'threshold': 0.05197128280997276,
   'left': {'label': 1},
   'right': {'label': 0}},
  'right': {'feature': 0,
   'threshold': 0.0,
   'left': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint],
   'right': {'feature': 2,
    'threshold': 0.0420931912958622,
    'left': {'feature': 0,
     'threshold': 0.0,
     'left': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint],
     'right': DataFrame[_1: double, _2: double, _3: double, _4: double, _5: double, _6: double, _7: double, _8: double, _9: double, _10: double, y: bigint]},
    'right': {'label': 1}}}},
 'right': {'feature': 3,
  'threshold': 0.03113408200442791,
  'left': {'feature': 0,
   'threshold': 0.0,
   'left':

In [None]:
'''
OLD VERSION OF new_split, please refer to new_split_2



def new_split(joined_df, feature_index):
    
    # Select relevant columns
    feature_col_name = joined_df.columns[feature_index]
    y_col_name = joined_df.columns[-1]
    split_data = joined_df.select(feature_col_name, y_col_name)\
        .withColumnRenamed(feature_col_name,"feature")\
        .withColumnRenamed(y_col_name,"y")
    
    # Calculate parent entropy
    parent_entropy = class_entropy(joined_df.select("y"))
    parent_data_count = joined_df.count()

    
    # Calculate potential splits and their Information Gain
    distinct_values = split_data.select("feature")\
        .withColumnRenamed("feature","split_value")\
        .distinct()\
        .sample(False, 0.1)
    
    # Cartesian join to get split mask
    splits_info = distinct_values.crossJoin(split_data)\
        .withColumn(
        "is_left", F.col("feature") <= F.col("split_value")
    )
    
    #aggregate list
    entropies = splits_info.groupBy("split_value", "is_left").agg(
        F.count("y").alias("count"),
        F.sum("y").alias("sum"),
    ).withColumn('prob', F.col('sum')/F.col('count'))
    entropies = entropies.withColumn("entropy",\
                                    -F.col("prob") * F.log2(F.col("prob")) \
                                    -(1-F.col("prob")) * F.log2((1-F.col("prob")))
                                    )
    # Calculate Information Gain for each split
    info_gain = entropies.groupBy("split_value").agg(
        (parent_entropy - F.sum(F.col("entropy") * (F.col("count") / parent_data_count))).alias("info_gain")
    )
    
    # Get the best split
    best_split = info_gain.orderBy(F.desc("info_gain")).first()
    
    schema = StructType([
        StructField("feature", IntegerType(), True),
        StructField("split_value", FloatType(), True),
        StructField("info_gain", FloatType(), True),
    ])
    
    # Prepare output DataFrame
    if best_split is None:
        result_df = spark.createDataFrame([(feature_index, float(0), float(0))], schema)
    else:
        result_df = spark.createDataFrame([(feature_index, float(best_split["split_value"]), best_split["info_gain"])], schema)
    
    return result_df
'''''