In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DLD_project_bagging').getOrCreate()

In [2]:
from google.colab import drive
drive.mount('/content/drive_new')

Mounted at /content/drive_new


In [3]:
df = spark.read.format("parquet").load("/content/drive_new/MyDrive/cleaned_dataset")

In [5]:
trainDF, testDF = df.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

There are 737972 rows in the training set, and 184535 in the test set


**VectorAssembler, OneHotEncoder, StringIndexer, StandartScaler for Multiply Entries**

In [6]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer, StandardScaler

categorical_cols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]
ohe_output_cols = [x + "OHE" for x in categorical_cols]

string_indexer = StringIndexer(inputCols = categorical_cols, outputCols = index_output_cols, handleInvalid = "skip")
ohe_encoder = OneHotEncoder(inputCols = index_output_cols, outputCols = ohe_output_cols)

numeric_cols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") and (field != "actual_worth"))]

assembler_inputs = ohe_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols = assembler_inputs, outputCol = "features")

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

lr_with_scaler = LinearRegression(labelCol = "actual_worth", featuresCol = "scaled_features")
pipeline_with_scaler = Pipeline(stages = [string_indexer, ohe_encoder, vec_assembler, scaler, lr_with_scaler])
lr_model_with_scaler = pipeline_with_scaler.fit(trainDF)
lr_prediction_with_scaler = lr_model_with_scaler.transform(testDF)

In [None]:
lr_prediction_with_scaler.select("actual_worth", "prediction").show(10)

+------------+-------------------+
|actual_worth|         prediction|
+------------+-------------------+
|    262493.0|  -362426.625497828|
|    390000.0|-175553.64861687622|
|    312500.0|-153460.60358589678|
|    650000.0|  394106.2402005645|
|    500000.0| 402905.47590058204|
|   1280000.0| 402905.47590058204|
|    925000.0|  608769.6089686223|
|    250000.0|  638205.9010586096|
|   1150000.0|  734364.4552192347|
|    750000.0|  748481.2146516371|
+------------+-------------------+
only showing top 10 rows



In [9]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
regression_evaluator = RegressionEvaluator(predictionCol = "prediction", labelCol = "actual_worth", metricName = "rmse")
rmse_scaler = regression_evaluator.evaluate(lr_prediction_with_scaler)
print(f"RMSE is {rmse_scaler:.1f}")

r2_scaler = regression_evaluator.setMetricName("r2").evaluate(lr_prediction_with_scaler)
print(f"R2 is {r2_scaler}")

RMSE is 672571.9
R2 is 0.6201786575789652


**Decision tree**

Tree-based methods can naturally handle categorical variables. In spark.ml, pass the categorical columns to the StringIndexer, and the decision tree can take care of the rest.

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(labelCol="actual_worth", featuresCol="features")

assembler_inputs_tree = index_output_cols + numeric_cols
vec_assembler_tree = VectorAssembler(inputCols=assembler_inputs_tree, outputCol="features")

stages_tree = [string_indexer, vec_assembler_tree, dt]
pipeline_tr = Pipeline(stages=stages_tree)

dt.setMaxBins(135)

pipeline_model_tr = pipeline_tr.fit(trainDF)
pred_tree = pipeline_model_tr.transform(testDF)

In [None]:
pred_tree.select("features", "actual_worth", "prediction").show(20)

+--------------------+------------+------------------+
|            features|actual_worth|        prediction|
+--------------------+------------+------------------+
|[61.0,1.0,0.0,0.0...|    262493.0|  657908.733273964|
|[61.0,1.0,0.0,0.0...|    390000.0|  994848.609723248|
|[61.0,1.0,0.0,0.0...|    312500.0|  994848.609723248|
|[61.0,1.0,0.0,1.0...|    650000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|    500000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|   1280000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|    925000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|    250000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|   1150000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|    750000.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|    962500.0|1241403.9938946737|
|[61.0,1.0,0.0,1.0...|   1500000.0|1241403.9938946737|
|[61.0,1.0,0.0,2.0...|    400000.0|  1397628.28283934|
|[61.0,1.0,0.0,2.0...|    800000.0|  1397628.28283934|
|[61.0,1.0,0.0,2.0...|   1500000.0|  1397628.28283934|
|[61.0,1.0

In [None]:
regression_evaluator_dt = RegressionEvaluator(predictionCol="prediction", labelCol="actual_worth", metricName="rmse")

rmse_dt = regression_evaluator_dt.evaluate(pred_tree)
print(f"RMSE is {rmse_dt:.1f}")

r2_dt = regression_evaluator_dt.setMetricName("r2").evaluate(pred_tree)
print(f"R2 is {r2_dt}")

RMSE is 684472.9
R2 is 0.6066181118967662


**Random Forest Regressor**

In [None]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="actual_worth", featuresCol="features", numTrees=100)
pipeline_rf = Pipeline(stages=[string_indexer, ohe_encoder, vec_assembler, rf])
rf_model = pipeline_rf.fit(trainDF)
rf_predictions = rf_model.transform(testDF)

In [None]:
rf_predictions.select("actual_worth", "prediction").show(20)

+------------+------------------+
|actual_worth|        prediction|
+------------+------------------+
|    262493.0| 1027883.501410307|
|    390000.0|1479625.4069014462|
|    312500.0|1479625.4069014462|
|    650000.0| 1626333.095486657|
|    500000.0| 1626333.095486657|
|   1280000.0| 1626333.095486657|
|    925000.0|1819357.1611668752|
|    250000.0|1840634.2916982493|
|   1150000.0|1840634.2916982493|
|    750000.0|1855998.2875859875|
|    962500.0|1855998.2875859875|
|   1500000.0|1855998.2875859875|
|    400000.0| 2149348.458401745|
|    800000.0| 2149348.458401745|
|   1500000.0| 2149348.458401745|
|   1400000.0| 2149348.458401745|
|    350000.0| 2149348.458401745|
|   2200000.0| 2149348.458401745|
|   1830000.0| 2149348.458401745|
|   1250000.0| 2149348.458401745|
+------------+------------------+
only showing top 20 rows



In [None]:
regression_evaluator_rf = RegressionEvaluator(predictionCol="prediction", labelCol="actual_worth", metricName="rmse")

rmse_rf = regression_evaluator_rf.evaluate(rf_predictions)
print(f"RMSE is {rmse_rf:.1f}")

r2_rf = regression_evaluator_dt.setMetricName("r2").evaluate(rf_predictions)
print(f"R2 is {r2_rf}")

RMSE is 769806.1
R2 is 0.5024180935312009


**Gradient Boosting Regression**

In [8]:
from pyspark.ml.regression import GBTRegressor

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

gbt = GBTRegressor(featuresCol="scaled_features", labelCol="actual_worth", maxIter=20, maxDepth=5)
pipeline_gbt = Pipeline(stages = [string_indexer, ohe_encoder, vec_assembler, scaler, gbt])

gbt_model = pipeline_gbt.fit(trainDF)
gbt_prediction = gbt_model.transform(testDF)
gbt_prediction.select("actual_worth", "prediction").show(10)

+------------+------------------+
|actual_worth|        prediction|
+------------+------------------+
|    262493.0| 797174.6433476824|
|    390000.0|1240169.7688490453|
|    312500.0|1240169.7688490453|
|    650000.0|1271423.8822804296|
|    500000.0|1271423.8822804296|
|   1280000.0|1271423.8822804296|
|    925000.0| 1851776.029673512|
|    250000.0| 1851776.029673512|
|   1150000.0| 1851776.029673512|
|    750000.0| 1851776.029673512|
+------------+------------------+
only showing top 10 rows



In [None]:
rmse_gbt = regression_evaluator.setMetricName("rmse").evaluate(gbt_prediction)
print(f"RMSE is {rmse_gbt:.1f}")

r2_gbt = regression_evaluator.setMetricName("r2").evaluate(gbt_prediction)
print(f"R2 is {r2_gbt}")

RMSE is 674426.2
R2 is 0.6180814123261409


**Implementing Bagging**

In [10]:
import random

def generate_random_df(df):
    # number of rows in initial dataframe
    num_rows = df.count()
    # generate sample from 100% to 200% of initial dataframe
    # allow replacement => number of rows in sample can be smaller
    fraction = random.uniform(1.0, 2.0)
    sampled_df = df.sample(withReplacement=True, fraction=fraction)
    # limit sample => number of rows in sample is equal to initial dataframe
    sampled_df = sampled_df.limit(num_rows)
    return sampled_df

In [11]:
number_of_bags = 5
samples = []

for _ in range(number_of_bags):
  samples.append(generate_random_df(trainDF))

In [12]:
# create function to prepare data for training
def prepare_data(df, label_name):
  categorical_cols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
  index_output_cols = [x + "Index" for x in categorical_cols]
  ohe_output_cols = [x + "OHE" for x in categorical_cols]

  string_indexer = StringIndexer(inputCols = categorical_cols, outputCols = index_output_cols, handleInvalid = "skip")
  ohe_encoder = OneHotEncoder(inputCols = index_output_cols, outputCols = ohe_output_cols)

  numeric_cols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") and (field != "actual_worth"))]

  assembler_inputs = ohe_output_cols + numeric_cols
  vec_assembler = VectorAssembler(inputCols = assembler_inputs, outputCol = "features")

  return string_indexer, ohe_encoder, vec_assembler

In [13]:
string_indexer, ohe_encoder, vec_assembler = prepare_data(samples[0], "actual_worth")

In [None]:
models = []
predictions = []

for sample in samples:
  scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

  gbt = GBTRegressor(featuresCol="scaled_features", labelCol="actual_worth", maxIter=20, maxDepth=5)
  pipeline_gbt = Pipeline(stages = [string_indexer, ohe_encoder, vec_assembler, scaler, gbt])

  gbt_model_bag = pipeline_gbt.fit(sample)
  models.append(gbt_model_bag)

  gbt_prediction_bag = gbt_model_bag.transform(testDF)
  predictions.append(gbt_prediction_bag)

In [None]:
for prediction in predictions:
  prediction.select("actual_worth", "prediction").show(5)

+------------+------------------+
|actual_worth|        prediction|
+------------+------------------+
|    262493.0| 718399.2525931613|
|    390000.0|1187989.8307684031|
|    312500.0|1187989.8307684031|
|    650000.0|1598735.4994814058|
|    500000.0|1598735.4994814058|
+------------+------------------+
only showing top 5 rows

+------------+------------------+
|actual_worth|        prediction|
+------------+------------------+
|    262493.0| 676661.2566635377|
|    390000.0|1115005.2208664608|
|    312500.0|1115005.2208664608|
|    650000.0| 1702674.880747555|
|    500000.0| 1702674.880747555|
+------------+------------------+
only showing top 5 rows

+------------+------------------+
|actual_worth|        prediction|
+------------+------------------+
|    262493.0| 785871.4006877741|
|    390000.0|1164686.3368624006|
|    312500.0|1164686.3368624006|
|    650000.0|1724867.9027933925|
|    500000.0|1724867.9027933925|
+------------+------------------+
only showing top 5 rows



Calculate average prediction

In [None]:
from pyspark.sql import functions as F

cumm_predictions = []

# rename column "prediction" to proceed join operation
for i in range(len(predictions)):
  df = predictions[i].withColumnRenamed("prediction", f'prediction_{i+1}').withColumn("row_id", F.monotonically_increasing_id())

  # remove column "actual_worth" in all dataframes except the first for join operation
  if i > 0:
    df = df.drop("actual_worth")
  cumm_predictions.append(df)

# join all predictions from all bags
predictions_joined = cumm_predictions[0].alias('df1')
for i in range(1, len(cumm_predictions)):
  predictions_joined = predictions_joined.join(cumm_predictions[i].alias(f'df{i+1}'), on="row_id")

# create a list of columns with predictions
prediction_columns = [f'df{i+1}.prediction_{i+1}' for i in range(len(cumm_predictions))]

# calculate average value for a particular item
average_expression = sum(F.col(col) for col in prediction_columns) / len(prediction_columns)

prediction_avg = predictions_joined.withColumn("average_prediction", average_expression)

In [None]:
prediction_avg.select("actual_worth", "average_prediction").show(5)

+------------+------------------+
|actual_worth|average_prediction|
+------------+------------------+
|    262493.0| 758735.2904626172|
|    390000.0|1181200.9567657362|
|    312500.0|1181200.9567657362|
|    650000.0|1427096.5135852583|
|    500000.0|1566624.8321848188|
+------------+------------------+
only showing top 5 rows



In [None]:
regression_evaluator_bag = RegressionEvaluator(predictionCol="average_prediction", labelCol="actual_worth", metricName="rmse")

rmse_bag = regression_evaluator_bag.evaluate(prediction_avg)
print(f"RMSE is {rmse_bag:.1f}")

r2_bag = regression_evaluator_bag.setMetricName("r2").evaluate(prediction_avg)
print(f"R2 is {r2_bag}")

RMSE is 650903.4
R2 is 0.6227564928113317
