<a href="https://colab.research.google.com/github/islington-college-ing/week-6-introduction-to-machine-learning-with-pyspark-silwalprabin/blob/main/Week_6_Introduction_to_Machine_Learning_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [38]:
!pip install pyspark pandas



In [39]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("HousePricePrediction").getOrCreate()

**Sample dataset**

SquareFootage: The area of the house in square feet.

Bedrooms: The number of bedrooms in the house.

Bathrooms: The number of bathrooms in the house.

Price: The price of the house.

In [40]:
# Step 1: Load  Data

# Sample dataset
data = [
    (1200, 3, 2, 300000),
    (1500, 4, 3, 350000),
    (800, 2, 1, 200000),
    (2500, None, 3, 450000),
    (1800, 4, 2, 400000),
    (2500, 5, 3, 500000),
    (None, 4, 3, 400000)

]

# Define schema
columns = ["SquareFootage", "Bedrooms", "Bathrooms", "Price"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)
df.show()


+-------------+--------+---------+------+
|SquareFootage|Bedrooms|Bathrooms| Price|
+-------------+--------+---------+------+
|         1200|       3|        2|300000|
|         1500|       4|        3|350000|
|          800|       2|        1|200000|
|         2500|    NULL|        3|450000|
|         1800|       4|        2|400000|
|         2500|       5|        3|500000|
|         NULL|       4|        3|400000|
+-------------+--------+---------+------+



In [64]:
# Step 2: Data Preprocessing
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Preprocessing

# Check for missing values
df.describe().show()
# Fill missing values (if any) with mean of the column
df = df.na.fill({
    'SquareFootage': df.agg({'SquareFootage': 'mean'}).collect()[0][0],
    'Bedrooms': df.agg({'Bedrooms': 'mean'}).collect()[0][0],
    'Bathrooms': df.agg({'Bathrooms': 'mean'}).collect()[0][0]
})
df.show()

# Normalize the features using StandardScaler if needed:
# Assemble features into a vector
assembler = VectorAssembler(inputCols=["SquareFootage", "Bedrooms", "Bathrooms"], outputCol="unscaled_features")
output = assembler.transform(df)
# Scale features
scaler = StandardScaler(inputCol="unscaled_features", outputCol="features", withStd=True, withMean=True)
scaler_model = scaler.fit(output)
scaled_output = scaler_model.transform(output)
# Show scaled features
scaled_output.select("features", "Price").show()


+-------+------------------+------------------+------------------+-----------------+
|summary|     SquareFootage|          Bedrooms|         Bathrooms|            Price|
+-------+------------------+------------------+------------------+-----------------+
|  count|                 7|                 7|                 7|                7|
|   mean|1716.5714285714287|3.5714285714285716|2.4285714285714284|371428.5714285714|
| stddev| 630.9164913422528|0.9759000729485332|0.7867957924694432| 99402.9797388005|
|    min|               800|                 2|                 1|           200000|
|    max|              2500|                 5|                 3|           500000|
+-------+------------------+------------------+------------------+-----------------+

+-------------+--------+---------+------+
|SquareFootage|Bedrooms|Bathrooms| Price|
+-------------+--------+---------+------+
|         1200|       3|        2|300000|
|         1500|       4|        3|350000|
|          800|       2|

In [65]:
# Step 3: Model Training
from pyspark.ml.regression import LinearRegression

# Split the data into training and test sets
train_data, test_data = scaled_output.randomSplit([0.8, 0.2])

# Initialize the linear regression model
lr = LinearRegression(labelCol="Price", featuresCol="features")

# Fit the model to the training data
model = lr.fit(train_data)

# Model Evaluation

RMSE, R², and MAE are commonly used metrics in regression analysis, each offering a different perspective on model performance. Here's a comparison of these metrics:

1. Root Mean Squared Error (RMSE)
* Definition: RMSE measures the square root of the average of the squared differences between predicted and actual values.
* Scale: It has the same units as the target variable.
* Sensitivity: RMSE is sensitive to outliers due to the squaring of errors, which can disproportionately increase the metric for models with large errors.
* Interpretation: A lower RMSE indicates better model performance, meaning the predictions are closer to the actual values. It's often preferred when large errors are particularly undesirable.

2. R-squared (R²)
* Definition: R² is a statistical measure that represents the proportion of the variance in the dependent variable that is predictable from the independent variables.
* Scale: R² ranges from 0 to 1 (or can be negative if the model is worse than a horizontal line). An R² of 1 indicates that the model explains all the variance in the data.
* Sensitivity: R² is less sensitive to outliers compared to RMSE and does not directly measure error magnitude.
* Interpretation: A higher R² value indicates a better fit of the model to the data. However, a high R² doesn’t necessarily mean the model is good—it could also indicate overfitting, especially in complex models.

3. Mean Absolute Error (MAE)
* Definition: MAE measures the average of the absolute differences between predicted and actual values.
* Scale: Like RMSE, MAE is in the same units as the target variable.
* Sensitivity: MAE is less sensitive to outliers compared to RMSE because it does not square the errors.
* Interpretation: A lower MAE indicates better model performance, reflecting the average error magnitude. It is often preferred when you want a straightforward interpretation of average prediction errors.


**Summary**
* RMSE is useful when large errors are particularly undesirable.
* R² provides an overall measure of how well the model explains the data.
* MAE gives a straightforward average error, useful for general understanding.


Depending on the context of your regression problem, you might choose one metric over another or use a combination to get a comprehensive view of model performance

In [63]:
# Step 4: Model Evaluation
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the test data
predictions = model.transform(test_data)
predictions.select("features", "Price", "prediction").show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


# Make predictions on the test data
predictions = model.transform(test_data)

# Calculate R-squared
evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared (R²): {r2}")

# Calculate Mean Absolute Error (MAE)
evaluator_mae = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="mae"
)
mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

+--------------------+------+------------------+
|            features| Price|        prediction|
+--------------------+------+------------------+
|[-0.3432648084862...|350000| 400000.0000000065|
|[1.24173100906247...|450000|  299999.999999953|
|[1.24173100906247...|500000|499999.99999999173|
+--------------------+------+------------------+

Root Mean Squared Error (RMSE): 91287.09291755459
R-squared (R²): -1.1428571428584062
Mean Absolute Error (MAE): 66666.66666668725


In [66]:
# Save the model
model.write().overwrite().save("house_price_model")

In [71]:
# PREDICT Price for given data (SquareFootage:1600, Bedrooms:3, Bathrooms:2 ::

from pyspark.ml.regression import LinearRegressionModel

# Load the saved model
model = LinearRegressionModel.load("house_price_model")

# New data example (SquareFootage, Bedrooms, Bathrooms)
new_df = spark.createDataFrame([(1600, 3, 2)], schema=["SquareFootage", "Bedrooms", "Bathrooms"])
# Assemble features into a single vector (no need to scale if the model was trained on unscaled data)
assembler = VectorAssembler(inputCols=["SquareFootage", "Bedrooms", "Bathrooms"], outputCol="features")
new_vector_data = assembler.transform(new_df)

# Make prediction
new_predictions = model.transform(new_vector_data)

# Show the prediction
new_predictions.show()

+-------------+--------+---------+----------------+--------------------+
|SquareFootage|Bedrooms|Bathrooms|        features|          prediction|
+-------------+--------+---------+----------------+--------------------+
|         1600|       3|        2|[1600.0,3.0,2.0]|1.2157012553426452E8|
+-------------+--------+---------+----------------+--------------------+

