In [41]:
#Require pyspark implementation on server

import numpy as np
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils import *

In [361]:
#random gen dataset
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
x_all_categorical_arr = np.random.randint(0, 2, (100, 10))
x_all_numerical_arr = np.random.rand(100, 10)
y_categorical_arr = np.random.randint(0, 2, 100)
#balanced_arr = np.concatenate([balanced_categorical_arr, balanced_numerical_arr], axis=1)
print(f"all_categorical_arr: {x_all_categorical_arr.shape=}")
print(f"all_numerical_arr: {x_all_numerical_arr.shape=}")
#print(f"balanced_arr: {balanced_arr.shape=}")

all_categorical_arr: x_all_categorical_arr.shape=(100, 10)
all_numerical_arr: x_all_numerical_arr.shape=(100, 10)


In [362]:
#Generate Spark DataFrame
x_data = spark.createDataFrame(x_all_numerical_arr)
y_data = spark.createDataFrame(y_categorical_arr,['y'])

#Index
x_indexed=x_data.withColumn("id",monotonically_increasing_id())
y_indexed=y_data.withColumn("id",monotonically_increasing_id())

In [365]:
# create joined df for computation with one id
joined_df = x_indexed.join(y_indexed, "id").drop('id')
x_train = joined_df.drop('y')
y_train = joined_df.select('y')
print(x_train.columns)
print(y_train.columns)

['_1', '_2', '_3', '_4', '_5', '_6', '_7', '_8', '_9', '_10']
['y']


In [366]:
#partition the dataframe dataset
joined_df.repartition(10).rdd.getNumPartitions()

10

In [386]:
# Define node split algorithsm in RDD
def new_split(joined_df, feature_index):

    # Select relevant columns
    feature_col_name = joined_df.columns[feature_index]
    y_col_name = joined_df.columns[-1]
    split_data = joined_df.select(feature_col_name, y_col_name)\
        .withColumnRenamed(feature_col_name,"split_value")\
        .withColumnRenamed(y_col_name,"y")

    # Calculate parent entropy
    parent_entropy = class_entropy(joined_df.select("y"))
    parent_data_count = joined_df.count()

    # Calculate potential splits and their Information Gain
    distinct_values = split_data.select("split_value")\
        .withColumnRenamed("split_value","feature")\
        .distinct().orderBy("feature")

    # Cartesian join to get split mask
    splits_info = distinct_values.crossJoin(split_data)\
        .withColumn(
        "is_left", F.col("feature") <= F.col("split_value")
    )

    #aggregate list
    entropies = splits_info.groupBy("split_value", "is_left").agg(
        F.count("y").alias("count"),
        F.sum("y").alias("sum"),
        prob_udf(F.collect_list("y")).alias("prob")
    )
    entropies = entropies.withColumn("entropy",\
                                    -F.col("prob") * F.log2(F.col("prob")) \
                                    -(1-F.col("prob")) * F.log2((1-F.col("prob")))
                                    )
    # Calculate Information Gain for each split
    info_gain = entropies.groupBy("split_value").agg(
        (parent_entropy - F.sum(F.col("entropy") * (F.col("count") / parent_data_count))).alias("info_gain")
    )

    # Get the best split
    best_split = info_gain.orderBy(F.desc("info_gain")).first()

    schema = StructType([
        StructField("feature", IntegerType(), True),
        StructField("split_value", FloatType(), True),
        StructField("info_gain", FloatType(), True),
    ])

    # Prepare output DataFrame
    result_df = spark.createDataFrame([(feature_index, float(best_split["split_value"]), best_split["info_gain"])], schema)

    return result_df

In [388]:
#test parallelized single node splitting using entropy and information gain
test = new_split(joined_df,0).show()

+-------+-----------+---------+
|feature|split_value|info_gain|
+-------+-----------+---------+
|      0| 0.46267977|     NULL|
+-------+-----------+---------+



In [408]:
# Recursively grow the tree

def grow_tree(df,feature_array, depth=0, max_depth=3):

    y_label = df.columns[-1]

    if depth == max_depth or class_entropy(df.select(y_label)) == 0:
        # Return the most common label
        return df.groupBy(y_label).count().orderBy(y_label, ascending=False).first()['label']

    best_feature = None
    best_gain = 0

    #get features
    feature_df = feature_split(df,feature_array)
    feature_list = feature_df.collect()

    for feature in feature_list:  # Assume feature columns
        gain = feature[2]
        if gain > best_gain:
            best_gain = gain
            best_feature = feature[0]

    if best_feature is None:
        return df.groupBy(y_label).count().orderBy('count', ascending=False).first()[y_label]


    # Recursive split
    left_df = df.filter(col(best_feature) <= 0.5)  # Assume binary split
    right_df = df.filter(col(best_feature) > 0.5)

    left_tree = grow_tree(left_df, depth + 1, max_depth)
    right_tree = grow_tree(right_df, depth + 1, max_depth)
    return {best_feature: {'left': left_tree, 'right': right_tree}}

In [320]:
#random forest training
import random

def random_forest_train(df, num_trees, max_depth=3):
    trees = []
    num_features = len(df.first()['features'])
    for _ in range(num_trees):
        sampled_df = df.sample(withReplacement=True, fraction=1.0)
        feature_indices = random.sample(range(num_features), k=int(log(num_features, 2) + 1))
        feature_cols = [f"features[{i}]" for i in feature_indices]
        tree = grow_tree(sampled_df, 0, max_depth)
        trees.append(tree)
    return trees