In [2]:
!pip install pyspark pandas

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=c3903f6ec2391a470cd22392d8fcc26fedf3ec4333b82d11fea1a708b54603f6
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [5]:
# Import necessary libraries
from pyspark.sql import SparkSession


In [6]:
 # Initialize Spark session
spark = SparkSession.builder.appName("HousePricePrediction").getOrCreate()

In [7]:
# Step 1: Load  Data
# Sample dataset
data = [(1200, 3, 2, 300000),
(1500, 4, 3, 350000),
(800, 2, 1, 200000),
(2500, None, 3, 450000),
(1800, 4, 2, 400000),
(2500, 5, 3, 500000),
(None, 4, 3, 400000)
]


In [8]:
# Define schema
columns = ["SquareFootage", "Bedrooms", "Bathrooms", "Price"]



In [9]:
# Create DataFrame
df = spark.createDataFrame(data, schema=columns)
df.show()

+-------------+--------+---------+------+
|SquareFootage|Bedrooms|Bathrooms| Price|
+-------------+--------+---------+------+
|         1200|       3|        2|300000|
|         1500|       4|        3|350000|
|          800|       2|        1|200000|
|         2500|    NULL|        3|450000|
|         1800|       4|        2|400000|
|         2500|       5|        3|500000|
|         NULL|       4|        3|400000|
+-------------+--------+---------+------+



In [10]:
 # Step 2: Data Preprocessing
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [11]:
# Check for missing values
df.describe().show()


+-------+------------------+------------------+------------------+-----------------+
|summary|     SquareFootage|          Bedrooms|         Bathrooms|            Price|
+-------+------------------+------------------+------------------+-----------------+
|  count|                 6|                 6|                 7|                7|
|   mean|1716.6666666666667|3.6666666666666665|2.4285714285714284|371428.5714285714|
| stddev| 691.1343333004565|1.0327955589886444|0.7867957924694432| 99402.9797388005|
|    min|               800|                 2|                 1|           200000|
|    max|              2500|                 5|                 3|           500000|
+-------+------------------+------------------+------------------+-----------------+



In [12]:
 # Fill missing values (if any) with mean of the column
df = df.na.fill({
'SquareFootage': df.agg({'SquareFootage': 'mean'}).collect()[0][0],
'Bedrooms': df.agg({'Bedrooms': 'mean'}).collect()[0][0],
'Bathrooms': df.agg({'Bathrooms': 'mean'}).collect()[0][0]
})
df.show()

+-------------+--------+---------+------+
|SquareFootage|Bedrooms|Bathrooms| Price|
+-------------+--------+---------+------+
|         1200|       3|        2|300000|
|         1500|       4|        3|350000|
|          800|       2|        1|200000|
|         2500|       3|        3|450000|
|         1800|       4|        2|400000|
|         2500|       5|        3|500000|
|         1716|       4|        3|400000|
+-------------+--------+---------+------+



In [13]:
# Normalize the features using StandardScaler if needed:
# Assemble features into a vector
assembler = VectorAssembler(inputCols=["SquareFootage", "Bedrooms",
"Bathrooms"], outputCol="unscaled_features")
output = assembler.transform(df)


In [14]:
# Scale features
scaler = StandardScaler(inputCol="unscaled_features", outputCol="features",
withStd=True, withMean=True)
scaler_model = scaler.fit(output)
scaled_output = scaler_model.transform(output)


In [15]:
 # Show scaled features
scaled_output.select("features", "Price").show()

+--------------------+------+
|            features| Price|
+--------------------+------+
|[-0.8187635537508...|300000|
|[-0.3432648084862...|350000|
|[-1.4527618807703...|200000|
|[1.24173100906247...|450000|
|[0.13223393677835...|400000|
|[1.24173100906247...|500000|
|[-9.0571189574193...|400000|
+--------------------+------+



In [23]:
# Split the data into training and test sets
train_data, test_data = scaled_output.randomSplit([0.8, 0.2])



In [24]:
# Step 3: Model Training
from pyspark.ml.regression import LinearRegression





In [25]:
# Initialize the linear regression model
lr = LinearRegression(labelCol="Price", featuresCol="features")


In [26]:
# Fit the model to the training data
model = lr.fit(train_data)

In [28]:
# Step 4: Model Evaluation
from pyspark.ml.evaluation import RegressionEvaluator


In [29]:
# Make predictions on the test data
predictions = model.transform(test_data)
predictions.select("features", "Price", "prediction").show()


+--------------------+------+-----------------+
|            features| Price|       prediction|
+--------------------+------+-----------------+
|[-9.0571189574193...|400000|457999.9999999798|
|[1.24173100906247...|450000|1049999.999999836|
|[1.24173100906247...|500000|649999.9999999648|
+--------------------+------+-----------------+



In [30]:
# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction",
metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 358638.16491453256


In [31]:
# Make predictions on the test data
predictions = model.transform(test_data)


In [32]:
# Calculate R-squared
evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction",
metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared (R²): {r2}")


R-squared (R²): -76.17279999995809


In [33]:
# Calculate Mean Absolute Error (MAE)
evaluator_mae = RegressionEvaluator(
labelCol="Price", predictionCol="prediction", metricName="mae"
)
mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

Mean Absolute Error (MAE): 269333.3333332602


In [34]:
# Save the model for future use
model.write().overwrite().save("house_price_model")


In [36]:
# PREDICT Price for given data (SquareFootage:1600, Bedrooms:3, Bathrooms:2 ::
from pyspark.ml.regression import LinearRegressionModel


In [37]:
# Load the saved model
model = LinearRegressionModel.load("house_price_model")


In [38]:
# New data example (SquareFootage, Bedrooms, Bathrooms)
new_df = spark.createDataFrame([(1600, 3, 2)], schema=["SquareFootage",
"Bedrooms", "Bathrooms"])


In [40]:
# Assemble features into a single vector (no need to scale if the model was trained on unscaled data)
assembler = VectorAssembler(inputCols=["SquareFootage", "Bedrooms",
"Bathrooms"], outputCol="features")
new_vector_data = assembler.transform(new_df)


In [41]:
# Make prediction
new_predictions = model.transform(new_vector_data)
# Show the prediction
new_predictions.show()

+-------------+--------+---------+----------------+-------------------+
|SquareFootage|Bedrooms|Bathrooms|        features|         prediction|
+-------------+--------+---------+----------------+-------------------+
|         1600|       3|        2|[1600.0,3.0,2.0]|5.047918693312821E8|
+-------------+--------+---------+----------------+-------------------+

