In [1]:
# Cell 1: Import necessary libraries and create Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import Imputer

# Create Spark session
spark = SparkSession.builder.appName("HousePricePrediction").getOrCreate()


23/08/07 11:05:47 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.69 instead (on interface en0)
23/08/07 11:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/07 11:05:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Cell 2: Read the data
df_train = spark.read.csv("data/train.csv", header=True, inferSchema=True)
df_test = spark.read.csv("data/test.csv", header=True, inferSchema=True)


In [3]:
# Cell 3: Drop unnecessary columns and handle missing values for "MSZoning" column
cols_to_drop = ['FireplaceQu', 'Fence', 'Alley', 'MiscFeature', 'PoolQC']
df_train_cleaned = df_train.drop(*cols_to_drop)
df_test_cleaned = df_test.drop(*cols_to_drop)

mszoning_mode = df_train_cleaned.select("MSZoning").groupBy("MSZoning").count().orderBy(col("count").desc()).first()["MSZoning"]
df_train_cleaned = df_train_cleaned.na.fill({"MSZoning": mszoning_mode})
df_test_cleaned = df_test_cleaned.na.fill({"MSZoning": mszoning_mode})


                                                                                

In [5]:
# Cell 4: Handle missing values for both categorical and numerical features
from pyspark.sql import functions as F

categorical_cols = [col_name for col_name, dtype in df_train_cleaned.dtypes if dtype == "string"]
for col in categorical_cols:
    mode_value = df_train_cleaned.select(col).groupBy(col).count().orderBy(F.col("count").desc()).first()[col]
    df_train_cleaned = df_train_cleaned.na.fill({col: mode_value})
    df_test_cleaned = df_test_cleaned.na.fill({col: mode_value})

numerical_cols = [col_name for col_name, dtype in df_train_cleaned.dtypes if dtype != "string" and col_name != "Id" and col_name != "SalePrice"]
for col in numerical_cols:
    df_train_cleaned = df_train_cleaned.withColumn(col, F.col(col).cast("double"))
    df_test_cleaned = df_test_cleaned.withColumn(col, F.col(col).cast("double"))

imputer = Imputer(inputCols=numerical_cols, outputCols=[f"{col}_imputed" for col in numerical_cols])
imputer_model = imputer.fit(df_train_cleaned)
df_train_cleaned = imputer_model.transform(df_train_cleaned)
df_test_cleaned = imputer_model.transform(df_test_cleaned)


23/08/07 11:07:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
# Cell 5: Convert columns to the correct data types (after filling missing values for categorical columns)
from pyspark.sql import functions as F

for col in df_train_cleaned.columns:
    if col != "Id" and col != "SalePrice":
        df_train_cleaned = df_train_cleaned.withColumn(col, F.col(col).cast("double"))
        df_test_cleaned = df_test_cleaned.withColumn(col, F.col(col).cast("double"))

In [8]:
# Cell 6: Drop columns with a high percentage of missing values
missing_threshold = 0.8
cols_to_drop = [col for col in df_train_cleaned.columns if (df_train_cleaned.select(col).na.drop().count() / df_train_cleaned.count()) < missing_threshold]
df_train_cleaned = df_train_cleaned.drop(*cols_to_drop)
df_test_cleaned = df_test_cleaned.drop(*cols_to_drop)


In [10]:
# Cell 7: Feature Engineering and Transformation
for col_name in ["BsmtFinSF1", "BsmtFinSF2", "BsmtUnfSF", "TotalBsmtSF", "BsmtFullBath", "BsmtHalfBath", "GarageCars", "GarageArea"]:
    df_train_cleaned = df_train_cleaned.withColumn(col_name, F.col(col_name).cast("double"))
    df_test_cleaned = df_test_cleaned.withColumn(col_name, F.col(col_name).cast("double"))

categorical_cols = [col_name for col_name, dtype in df_train_cleaned.dtypes if dtype == "string"]
numerical_cols = [col_name for col_name, dtype in df_train_cleaned.dtypes if dtype != "string" and col_name != "Id" and col_name != "SalePrice"]

assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df_train_assembled = assembler.transform(df_train_cleaned)
df_test_assembled = assembler.transform(df_test_cleaned)

scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_train_assembled)
df_train_scaled = scaler_model.transform(df_train_assembled)
df_test_scaled = scaler_model.transform(df_test_assembled)

indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid='keep') for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=f"{indexer.getOutputCol()}_encoded") for indexer in indexers]

pipeline = Pipeline(stages=indexers + encoders)
df_train_encoded = pipeline.fit(df_train_scaled).transform(df_train_scaled)
df_test_encoded = pipeline.fit(df_test_scaled).transform(df_test_scaled)

assembler = VectorAssembler(inputCols=[f"{col}_encoded" for col in categorical_cols] + ["scaled_numerical_features"],
                            outputCol="features")
df_train_final = assembler.transform(df_train_encoded)
df_test_final = assembler.transform(df_test_encoded)

df_train_final = df_train_final.select("Id", "features", "SalePrice")
df_test_final = df_test_final.select("Id", "features")


23/08/07 11:21:08 ERROR Executor: Exception in task 0.0 in stage 790.0 (TID 534)
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$3943/0x0000000801b4dd48: (struct<MSSubClass:double,LotFrontage:double,LotArea:double,OverallQual:double,OverallCond:double,YearBuilt:double,YearRemodAdd:double,MasVnrArea:double,BsmtFinSF1:double,BsmtFinSF2:double,BsmtUnfSF:double,TotalBsmtSF:double,1stFlrSF:double,2ndFlrSF:double,LowQualFinSF:double,GrLivArea:double,BsmtFullBath:double,BsmtHalfBath:double,FullBath:double,HalfBath:double,BedroomAbvGr:double,KitchenAbvGr:double,TotRmsAbvGrd:double,Fireplaces:double,GarageYrBlt:double,GarageCars:double,GarageArea:double,WoodDeckSF:double,OpenPorchSF:double,EnclosedPorch:double,3SsnPorch:double,ScreenPorch:double,PoolArea:double,MiscVal:double,MoSold:double,YrSold:double,MSSubClass_imputed:double,LotArea_imputed:double,OverallQual_imputed:double,OverallCond_imputed:double,YearBuilt_imputed:do

Py4JJavaError: An error occurred while calling o2956.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 790.0 failed 1 times, most recent failure: Lost task 0.0 in stage 790.0 (TID 534) (192.168.1.69 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$3943/0x0000000801b4dd48: (struct<MSSubClass:double,LotFrontage:double,LotArea:double,OverallQual:double,OverallCond:double,YearBuilt:double,YearRemodAdd:double,MasVnrArea:double,BsmtFinSF1:double,BsmtFinSF2:double,BsmtUnfSF:double,TotalBsmtSF:double,1stFlrSF:double,2ndFlrSF:double,LowQualFinSF:double,GrLivArea:double,BsmtFullBath:double,BsmtHalfBath:double,FullBath:double,HalfBath:double,BedroomAbvGr:double,KitchenAbvGr:double,TotRmsAbvGrd:double,Fireplaces:double,GarageYrBlt:double,GarageCars:double,GarageArea:double,WoodDeckSF:double,OpenPorchSF:double,EnclosedPorch:double,3SsnPorch:double,ScreenPorch:double,PoolArea:double,MiscVal:double,MoSold:double,YrSold:double,MSSubClass_imputed:double,LotArea_imputed:double,OverallQual_imputed:double,OverallCond_imputed:double,YearBuilt_imputed:double,YearRemodAdd_imputed:double,BsmtFinSF1_imputed:double,BsmtFinSF2_imputed:double,BsmtUnfSF_imputed:double,TotalBsmtSF_imputed:double,1stFlrSF_imputed:double,2ndFlrSF_imputed:double,LowQualFinSF_imputed:double,GrLivArea_imputed:double,BsmtFullBath_imputed:double,BsmtHalfBath_imputed:double,FullBath_imputed:double,HalfBath_imputed:double,BedroomAbvGr_imputed:double,KitchenAbvGr_imputed:double,TotRmsAbvGrd_imputed:double,Fireplaces_imputed:double,GarageCars_imputed:double,GarageArea_imputed:double,WoodDeckSF_imputed:double,OpenPorchSF_imputed:double,EnclosedPorch_imputed:double,3SsnPorch_imputed:double,ScreenPorch_imputed:double,PoolArea_imputed:double,MiscVal_imputed:double,MoSold_imputed:double,YrSold_imputed:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:161)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$3943/0x0000000801b4dd48: (struct<MSSubClass:double,LotFrontage:double,LotArea:double,OverallQual:double,OverallCond:double,YearBuilt:double,YearRemodAdd:double,MasVnrArea:double,BsmtFinSF1:double,BsmtFinSF2:double,BsmtUnfSF:double,TotalBsmtSF:double,1stFlrSF:double,2ndFlrSF:double,LowQualFinSF:double,GrLivArea:double,BsmtFullBath:double,BsmtHalfBath:double,FullBath:double,HalfBath:double,BedroomAbvGr:double,KitchenAbvGr:double,TotRmsAbvGrd:double,Fireplaces:double,GarageYrBlt:double,GarageCars:double,GarageArea:double,WoodDeckSF:double,OpenPorchSF:double,EnclosedPorch:double,3SsnPorch:double,ScreenPorch:double,PoolArea:double,MiscVal:double,MoSold:double,YrSold:double,MSSubClass_imputed:double,LotArea_imputed:double,OverallQual_imputed:double,OverallCond_imputed:double,YearBuilt_imputed:double,YearRemodAdd_imputed:double,BsmtFinSF1_imputed:double,BsmtFinSF2_imputed:double,BsmtUnfSF_imputed:double,TotalBsmtSF_imputed:double,1stFlrSF_imputed:double,2ndFlrSF_imputed:double,LowQualFinSF_imputed:double,GrLivArea_imputed:double,BsmtFullBath_imputed:double,BsmtHalfBath_imputed:double,FullBath_imputed:double,HalfBath_imputed:double,BedroomAbvGr_imputed:double,KitchenAbvGr_imputed:double,TotRmsAbvGrd_imputed:double,Fireplaces_imputed:double,GarageCars_imputed:double,GarageArea_imputed:double,WoodDeckSF_imputed:double,OpenPorchSF_imputed:double,EnclosedPorch_imputed:double,3SsnPorch_imputed:double,ScreenPorch_imputed:double,PoolArea_imputed:double,MiscVal_imputed:double,MoSold_imputed:double,YrSold_imputed:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:161)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 26 more


In [None]:
# Cell 8: Model Training and Evaluation using Cross-Validation
# Split data into training and validation sets
train_data, validation_data = df_train_final.randomSplit([0.8, 0.2], seed=42)

# Initialize Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='SalePrice', maxIter=100, regParam=0.1)

# Set up the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 0.5]).build()

# Initialize CrossValidator
evaluator = RegressionEvaluator(labelCol="SalePrice", predictionCol="prediction", metricName="rmse")
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Train the model using CrossValidator
cvModel = cv.fit(train_data)

# Make predictions on the validation set
validation_predictions = cvModel.transform(validation_data)

# Evaluate the model on the validation set
rmse = evaluator.evaluate(validation_predictions)
print(f"Root Mean Squared Error (RMSE) on validation data: {rmse:.2f}")


In [None]:
# Cell 9: Model Prediction on Test Data and Save Results
# Make predictions on the test set
test_predictions = cvModel.transform(df_test_final)

# Select the necessary columns for the final result
final_result = test_predictions.select("Id", "prediction").withColumnRenamed("prediction", "SalePrice")

# Save the predictions to a CSV file
final_result.coalesce(1).write.csv("predictions.csv", header=True, mode="overwrite")
