In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
# Create a Spark session
spark = SparkSession.builder.appName("Random Forest Regression").getOrCreate()

# Load your data into a DataFrame using pandas, then spark
df_pandas = pd.read_csv("kworb_popularity.csv")
df = spark.createDataFrame(df_pandas)

# Select the columns you want to calculate random forest for
selected_cols = ["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "duration_ms", "popularity"]

# Assemble the  columns into a feature vector
assembler = VectorAssembler(inputCols=selected_cols[:-1], outputCol="features")
data_with_features = assembler.transform(df).select("features", selected_cols[-1])

# Split the data into training and testing sets 80,20 split
(train_data, test_data) = data_with_features.randomSplit([0.8, 0.2], seed=1234)

# Fit a random forest regression model with 20 trees
rf = RandomForestRegressor(featuresCol="features", labelCol=selected_cols[-1], numTrees=20, seed=1234)
model = rf.fit(train_data)

# Evaluate the model on the testing set
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol=selected_cols[-1], predictionCol="prediction", metricName="r2")
r_squared = evaluator.evaluate(predictions)
print("R-squared on test data = %g" % r_squared)

#Print the importance of each feature when creating the trees
importances = model.featureImportances
feature_names = assembler.getInputCols()
for i in range(len(importances)):
    print("Feature %s: %.4f" % (feature_names[i], importances[i]))
    
single_tree = model.trees[0] 
tree_structure = single_tree.toDebugString

#Function to convert tree to readable form
# def pyspark_tree_to_dot(tree_structure, feature_names):
#     for i, feature_name in enumerate(feature_names):
#         tree_structure = tree_structure.replace(f'feature {i}', feature_name)

#     tree_structure = tree_structure.replace('(', '[').replace(')', ']')
#     tree_structure = tree_structure.replace('if', '->').replace('else', '->')

#     dot_format = "digraph Tree {\n"
#     dot_format += "node [shape=box] ;\n"
#     dot_format += tree_structure
#     dot_format += "}"

#     return dot_format

# # Convert tree_structure and print
# dot_format = pyspark_tree_to_dot(tree_structure, ["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "duration"])
# print(dot_format)

R-squared on test data = -0.00626737
Feature danceability: 0.1749
Feature energy: 0.1577
Feature loudness: 0.0753
Feature speechiness: 0.0786
Feature acousticness: 0.0768
Feature instrumentalness: 0.0401
Feature liveness: 0.0734
Feature valence: 0.1195
Feature tempo: 0.1020
Feature duration_ms: 0.1017
digraph Tree {
node [shape=box] ;
DecisionTreeRegressionModel: uid=dtr_b88d3233f6a7, depth=5, numNodes=49, numFeatures=10
  If [valence <= 0.8885000000000001]
   If [acousticness <= 0.00249]
    If [valence <= 0.6745000000000001]
     If [instrumentalness <= 1.115E-4]
      If [energy <= 0.6635]
       Predict: 0.039038235294117644
      Else [energy > 0.6635]
       Predict: 0.018956538461538455
     Else [instrumentalness > 1.115E-4]
      If [energy <= 0.8245]
       Predict: 0.03407777777777778
      Else [energy > 0.8245]
       Predict: 0.12008666666666667
    Else [valence > 0.6745000000000001]
     If [instrumentalness <= 0.4285]
      If [speechiness <= 0.03195]
       Predict: 0