In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, count

In [2]:
# spark = SparkSession.builder \
#     .appName('used-car-price') \
#     .config("spark.executor.instances", "4")\
#     .config("spark.executor.memory", "8g")\
#     .config("spark.driver.memory", "16g") \
#     .config("spark.executor.memoryOverhead", "2g") \
#     .getOrCreate()
spark = SparkSession.builder.master('local[*]').appName('used-car-price').config('spark.executor.memory', '16g').getOrCreate()

24/09/11 15:14:21 WARN Utils: Your hostname, langchain resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface wlp3s0)
24/09/11 15:14:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/11 15:14:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
scaled_training_data = spark.read.csv("../data/scaled_training_data", header=True, inferSchema=True)
scaled_test_data = spark.read.csv("../data/scaled_test_data_new", header=True, inferSchema=True)


                                                                                

In [4]:
scaled_training_data


DataFrame[scaled_feature_1: double, scaled_feature_2: double, scaled_feature_3: double, scaled_feature_4: double, scaled_feature_5: double, scaled_feature_6: double, scaled_feature_7: double, scaled_feature_8: double, scaled_feature_9: double, scaled_feature_10: double, scaled_feature_11: double, price: double]

In [5]:
scaled_test_data


DataFrame[scaled_feature_1: double, scaled_feature_2: double, scaled_feature_3: double, scaled_feature_4: double, scaled_feature_5: double, scaled_feature_6: double, scaled_feature_7: double, scaled_feature_8: double, scaled_feature_9: double, scaled_feature_10: double, scaled_feature_11: double, price: double]

In [6]:
from pyspark.ml.regression import RandomForestRegressionModel,DecisionTreeRegressor,GBTRegressionModel
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


In [7]:
from pyspark.ml.feature import VectorAssembler

feature_columns = [f"scaled_feature_{i+1}" for i in range(11)]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
training_data = assembler.transform(scaled_training_data).select("features", "price")
test_data = assembler.transform(scaled_test_data).select("features")
training_data.show(5)


+--------------------+-------+
|            features|  price|
+--------------------+-------+
|[0.29978248695051...|49999.0|
|[0.77221815175127...|45000.0|
|[1.03079405059283...|16900.0|
|[-0.0093420838450...|10500.0|
|[-1.0553107573545...|44500.0|
+--------------------+-------+
only showing top 5 rows



In [8]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", featuresCol="features")
rf_model = rf.fit(training_data)
rf_predictions = rf_model.transform(test_data)
rf_predictions.show(5)




+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[-1.6171786879200...|58841.518121184316|
|[-1.6599506411118...| 50125.44700489645|
|[-1.6268995863727...| 60601.88236911197|
|[-1.6618948208024...| 63445.89083541832|
|[-1.6618948208024...| 61819.85506575326|
+--------------------+------------------+
only showing top 5 rows



In [9]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price", featuresCol="features")
dt_model = dt.fit(training_data)
dt_predictions = dt_model.transform(test_data)
dt_predictions.show(5)


+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[-1.6171786879200...|53449.365853658535|
|[-1.6599506411118...| 50325.88205128205|
|[-1.6268995863727...| 68576.18616846936|
|[-1.6618948208024...|53449.365853658535|
|[-1.6618948208024...| 68576.18616846936|
+--------------------+------------------+
only showing top 5 rows



In [10]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="price", featuresCol="features", maxIter=10)
gbt_model = gbt.fit(training_data)
gbt_predictions = gbt_model.transform(test_data)

gbt_predictions.show(5)


+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[-1.6171786879200...|45477.980364308954|
|[-1.6599506411118...|54742.515395193324|
|[-1.6268995863727...| 59444.67769330832|
|[-1.6618948208024...| 54003.59270398596|
|[-1.6618948208024...| 67485.83463625018|
+--------------------+------------------+
only showing top 5 rows



In [11]:
test_data_with_price = assembler.transform(scaled_test_data).select("features", "price")
rf_predictions = rf_model.transform(test_data_with_price)
rf_predictions_with_price = rf_predictions.select("prediction", "price")
rf_predictions_with_price.show(5)
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rf_rmse = evaluator.evaluate(rf_predictions_with_price)
print(f"Random Forest RMSE: {rf_rmse}")


+------------------+--------+
|        prediction|   price|
+------------------+--------+
|58841.518121184316| 22500.0|
| 50125.44700489645|154900.0|
| 60601.88236911197| 26000.0|
| 63445.89083541832| 84900.0|
| 61819.85506575326| 50000.0|
+------------------+--------+
only showing top 5 rows

Random Forest RMSE: 74976.39805961156


In [12]:
test_data_with_price = assembler.transform(scaled_test_data).select("features", "price")
dt_predictions = dt_model.transform(test_data_with_price)
dt_predictions_with_price = dt_predictions.select("prediction", "price")
dt_predictions_with_price.show(5)

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
dt_rmse = evaluator.evaluate(dt_predictions_with_price)
print(f"Decision tree RMSE: {dt_rmse}")


+------------------+--------+
|        prediction|   price|
+------------------+--------+
|53449.365853658535| 22500.0|
| 50325.88205128205|154900.0|
| 68576.18616846936| 26000.0|
|53449.365853658535| 84900.0|
| 68576.18616846936| 50000.0|
+------------------+--------+
only showing top 5 rows

Decision tree RMSE: 75233.98195910703


In [13]:
test_data_with_price = assembler.transform(scaled_test_data).select("features", "price")
gbt_predictions = gbt_model.transform(test_data_with_price)
gbt_predictions_with_price = gbt_predictions.select("prediction", "price")
gbt_predictions_with_price.show(5)

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
gbt_rmse = evaluator.evaluate(gbt_predictions_with_price)
print(f"Decision tree RMSE: {gbt_rmse}")


+------------------+--------+
|        prediction|   price|
+------------------+--------+
|45477.980364308954| 22500.0|
|54742.515395193324|154900.0|
| 59444.67769330832| 26000.0|
| 54003.59270398596| 84900.0|
| 67485.83463625018| 50000.0|
+------------------+--------+
only showing top 5 rows

Decision tree RMSE: 74667.15440574866


In [14]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="price", featuresCol="features")

paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.numTrees, [10, 20, 30])      
                .addGrid(rf.maxDepth, [5, 10, 15])         
                .build())

rf_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=rf_evaluator,
                             numFolds=5)  # 5-fold cross-validation

rf_cv_model = crossval_rf.fit(training_data)

rf_predictions = rf_cv_model.transform(test_data)

rf_predictions.show(5)

rmse_rf = rf_evaluator.evaluate(rf_predictions)
print(f"Random Forest RMSE: {rmse_rf}")


24/09/11 15:14:48 WARN DAGScheduler: Broadcasting large task binary with size 1543.5 KiB
24/09/11 15:14:51 WARN DAGScheduler: Broadcasting large task binary with size 1543.5 KiB
24/09/11 15:14:52 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/09/11 15:14:53 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
24/09/11 15:14:55 WARN DAGScheduler: Broadcasting large task binary with size 1123.9 KiB
24/09/11 15:14:56 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
24/09/11 15:15:02 ERROR Executor: Exception in task 7.0 in stage 221.0 (TID 3228)
java.lang.OutOfMemoryError: Java heap space
24/09/11 15:15:02 ERROR Executor: Exception in task 3.0 in stage 221.0 (TID 3224)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.Ran

ConnectionRefusedError: [Errno 111] Connection refused