The goal of this project is to **predict housing prices in Seattle** based on various property features. 
## Table of Contents

1. **Introduction**
2. **Load the Data**
3. **Cleaning and Preprocessing** 
4. **Explore the Data**  
5. **Feature Engineering**  
6. **Modeling**  
7. **Evaluation**  

## 1.Introduction
I use **supervised learning** for this task. Since the target variable (`price`) is numeric, this is a **regression problem**. I apply **Multiple Linear Regression (MLR)** to understand how features like bedrooms, bathrooms, square footage, lot size, year built, and location influence house prices.  

This notebook covers the full workflow: loading and exploring the data, visualizing key features, preparing data for modeling, building the regression model, and evaluating its performance.

In [0]:
spark.sql('CREATE CATALOG IF NOT EXISTS svl_project')
spark.sql('USE CATALOG svl_project')

spark.sql('CREATE SCHEMA IF NOT EXISTS housePrice_pred')
spark.sql('USE SCHEMA housePrice_pred')

spark.sql('CREATE VOLUME IF NOT EXISTS input')
spark.sql('CREATE VOLUME IF NOT EXISTS output')

In [0]:
%sh
wget https://raw.githubusercontent.com/grzegorzgajda/spark-examples/master/spark-examples/data/house-data.csv -P /Volumes/svl_project/houseprice_pred/input/data/housing-data

In [0]:
userDir = "/Volumes/svl_project/houseprice_pred"
dbutils.fs.ls(f"{userDir}/input/data/housing-data")

## 2.Load the Data

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, BooleanType
schema = StructType([
  StructField('id', StringType(), True),
  StructField('date', StringType(), True),
  StructField('price', DoubleType(), True),
  StructField('bedrooms', IntegerType(), True),
  StructField('bathrooms', DoubleType(), True),
  StructField('sqft_living', IntegerType(), True),
  StructField('sqft_lot', IntegerType(), True),
  StructField('floors', DoubleType(), True),
  StructField('waterfront', IntegerType(), True),
  StructField('view', IntegerType(), True),
  StructField('condition', IntegerType(), True),
  StructField('grade', IntegerType(), True),
  StructField('sqft_above', IntegerType(), True),
  StructField('sqft_basement', IntegerType(), True),
  StructField('yr_built', IntegerType(), True),
  StructField('yr_renovated', IntegerType(), True),
  StructField('zipcode', StringType(), True),
  StructField('lat', DoubleType(), True),
  StructField('long', DoubleType(), True),
  StructField('sqft_living15', IntegerType(), True),
  StructField('sqft_lot15', IntegerType(), True)
])

In [0]:
from pyspark.sql import functions as F

data = (spark.read
          .option('header','True')
          .schema(schema)
          .csv(f"{userDir}/input/data/housing-data"))

display(data.sample(False,0.001,32))

In [0]:
data.printSchema()

In [0]:
dataDF = data.withColumn(
    "date",
    F.to_date(F.col("date"), "yyyyMMdd'T'HHmmss")
)

display(dataDF)

In [0]:
# no missing values
nCount = dataDF.select([
  F.count(F.when(F.col(c).isNull(), c)).alias(c)
  for c in dataDF.columns
])
display(nCount)

In [0]:
dataDF.createOrReplaceTempView('housePricetbl')

## 4.Explore the Data

In [0]:
%sql
SELECT COUNT(DISTINCT zipcode) FROM housePricetbl 

In [0]:
display(dataDF.select('price').summary())

In [0]:
%sql SELECT price FROM housePricetbl;

Databricks visualization. Run in Databricks to view.

In [0]:
%sql 
SELECT 
  zipcode, lat, long,
  ROUND(AVG(price), 2) AS avg_price,
  COUNT(*) AS total_sales
FROM housePricetbl
GROUP BY zipcode, lat, long
ORDER BY avg_price DESC
LIMIT 20;

Databricks visualization. Run in Databricks to view.

In [0]:
%sql 
SELECT bedrooms, COUNT(*) AS num_houses, ROUND(AVG(price),2) AS avg_price
FROM housePricetbl
GROUP BY bedrooms
ORDER BY bedrooms;

Databricks visualization. Run in Databricks to view.

In [0]:
%sql 
SELECT FLOOR(sqft_lot/500) *500 AS sqft_bin,
       COUNT(*) AS num_houses,
       ROUND(AVG(price), 2) AS avg_price
FROM housePricetbl
GROUP BY sqft_bin 
ORDER BY sqft_bin

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT FLOOR(yr_built/10)*10 AS decade,
       COUNT(*) AS num_houses,
       ROUND(AVG(price), 2) AS avg_price
FROM housePricetbl
GROUP BY decade
ORDER BY decade;

Databricks visualization. Run in Databricks to view.

In [0]:
%sql 
SELECT 
  grade,
  COUNT(*) AS num_houses,
  ROUND(AVG(price), 2) AS avg_price
FROM housePricetbl
GROUP BY grade
ORDER BY grade;

Databricks visualization. Run in Databricks to view.

In [0]:
import seaborn as sns 
import matplotlib.pyplot as plt
import pandas as pd

pd_DF = dataDF.sample(0.3).toPandas()

sns.pairplot(pd_DF[['price', 'sqft_living', 'sqft_lot', 'sqft_above', 'sqft_basement', 'yr_built', 'sqft_living15', 'sqft_lot15']])
plt.show()

In [0]:
spark.conf.set("spark.sql.ansi.enabled", "false")

## 5.Feature Engineering 

In [0]:
dataDF_features = (dataDF 
    .withColumn('house_age', F.year(F.current_date()) - F.col('yr_built'))
    .withColumn('is_renovated', F.when(F.col('yr_renovated') > 0, 1).otherwise(0))
    .withColumn('yr_since_renovation', 
                F.when(F.col('yr_renovated') > 0, 
                       F.year(F.current_date()) - F.col('yr_renovated'))  
                .otherwise(0))
    .withColumn('price_per_sqft', F.col('price') / F.col('sqft_living'))
    .withColumn('total_sqft', F.col('sqft_living') + F.col('sqft_basement'))
)

display(dataDF_features)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Convert to string type for encoding
dataDF_features = dataDF_features.withColumn('condition', F.col('condition').cast('string'))
dataDF_features = dataDF_features.withColumn('grade', F.col('grade').cast('string'))
dataDF_features = dataDF_features.withColumn('view', F.col('view').cast('string'))

# Create indexers and encoders
indexers = [
    StringIndexer(inputCol='zipcode', outputCol='zipcode_index', handleInvalid='keep'),
    StringIndexer(inputCol='condition', outputCol='condition_index', handleInvalid='keep'),
    StringIndexer(inputCol='grade', outputCol='grade_index', handleInvalid='keep'),
    StringIndexer(inputCol='view', outputCol='view_index', handleInvalid='keep')
]

encoders = [
    OneHotEncoder(inputCol='zipcode_index', outputCol='zipcode_encoded', dropLast=True),
    OneHotEncoder(inputCol='condition_index', outputCol='condition_encoded', dropLast=True),
    OneHotEncoder(inputCol='grade_index', outputCol='grade_encoded', dropLast=True),
    OneHotEncoder(inputCol='view_index', outputCol='view_encoded', dropLast=True)
]

# Apply transformations
pipeline = Pipeline(stages=indexers + encoders)
pipeline_model = pipeline.fit(dataDF_features)
dataDF_encoded = pipeline_model.transform(dataDF_features)

In [0]:
from pyspark.ml.feature import VectorAssembler

# Numeric features to keep
numeric_features = [
    'bedrooms', 
    'bathrooms', 
    'sqft_living', 
    'sqft_lot', 
    'floors', 
    'waterfront',
    'sqft_above', 
    'sqft_basement',
    'house_age', 
    'is_renovated',
    'lat', 
    'long', 
    'sqft_living15', 
    'sqft_lot15'
]

# dummy variables
categorical_features = [
    'zipcode_encoded',
    'condition_encoded', 
    'grade_encoded',
    'view_encoded'
]

# Combine all features
all_features = numeric_features + categorical_features

# Assemble features
assembler = VectorAssembler(
    inputCols=all_features,
    outputCol='features',
    handleInvalid='skip'
)

# final dataset with only features and price
df_final = assembler.transform(dataDF_encoded).select('features', 'price')
print(f"Total records: {df_final.count()}")
print(f"Number of features: {df_final.select('features').first()[0].size}")

display(df_final)

## 6.Modelling 

In [0]:
(train_data, test_data) = df_final.randomSplit([0.8, 0.2], seed=42)

In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol='features',
    labelCol='price',
    predictionCol='predicted_price',
    maxIter=100,
    regParam=0.0,
    elasticNetParam=0.0
)

lr_model = lr.fit(train_data)

# Display model parameters
print(f"\nIntercept: ${lr_model.intercept:,.2f}")
print(f"Coefficients: {lr_model.coefficients}")
print(f"Number of iterations: {lr_model.summary.totalIterations}")

In [0]:
# Predictions on training data
train_predictions = lr_model.transform(train_data)
# Predictions on test data
test_predictions = lr_model.transform(test_data)

## 7.Evaluation

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_r2 = RegressionEvaluator(
    labelCol='price',
    predictionCol='predicted_price',
    metricName='r2'
)

evaluator_rmse = RegressionEvaluator(
    labelCol='price',
    predictionCol='predicted_price',
    metricName='rmse'
)

evaluator_mae = RegressionEvaluator(
    labelCol='price',
    predictionCol='predicted_price',
    metricName='mae'
)

# Calculate metrics for TRAINING set
train_r2 = evaluator_r2.evaluate(train_predictions)
train_rmse = evaluator_rmse.evaluate(train_predictions)
train_mae = evaluator_mae.evaluate(train_predictions)

# Calculate metrics for TEST set
test_r2 = evaluator_r2.evaluate(test_predictions)
test_rmse = evaluator_rmse.evaluate(test_predictions)
test_mae = evaluator_mae.evaluate(test_predictions)


print("\n TRAINING SET METRICS:")
print(f"  R² Score:  {train_r2:.4f}")
print(f"  RMSE:      ${train_rmse:,.2f}")
print(f"  MAE:       ${train_mae:,.2f}")

print("\n TEST SET METRICS:")
print(f"  R² Score:  {test_r2:.4f}")
print(f"  RMSE:      ${test_rmse:,.2f}")
print(f"  MAE:       ${test_mae:,.2f}") 

# Check for overfitting
print("\n OVERFITTING CHECK:")
r2_diff = train_r2 - test_r2
if r2_diff < 0.05:
    print(f"Good! Minimal overfitting (R² difference: {r2_diff:.4f})")
elif r2_diff < 0.10:
    print(f"Moderate overfitting (R² difference: {r2_diff:.4f})")
else:
    print(f"Significant overfitting (R² difference: {r2_diff:.4f})")




In [0]:
print("\n TOP 10 MOST IMPORTANT FEATURES:")
print("(Based on absolute coefficient values)")

feature_names = numeric_features + categorical_features
coefficients = lr_model.coefficients.toArray()
feature_importance = list(zip(feature_names, coefficients))

# Sort by absolute value of coefficient
feature_importance_sorted = sorted(feature_importance, key=lambda x: abs(x[1]), reverse=True)

# Display top 10
for i, (feature, coef) in enumerate(feature_importance_sorted[:10], 1):
    print(f"{i:2d}. {feature:20s}: ${coef:>15,.2f}")


In [0]:
# Calculate residuals
test_predictions_with_residuals = test_predictions.withColumn(
    'residual', F.col('price') - F.col('predicted_price')
).withColumn(
    'abs_error', F.abs(F.col('price') - F.col('predicted_price'))
).withColumn(
    'percent_error', (F.abs(F.col('price') - F.col('predicted_price')) / F.col('price')) * 100
)

# Summary statistics of residuals
print("\n RESIDUAL ANALYSIS:")
residual_stats = test_predictions_with_residuals.select('residual').summary()
display(residual_stats)