## Predicting housing costs

In the project, you need to train a linear regression model on housing data in California in 1990. You need to predict the median price of a house in a housing tract based on the data. Train the model and make predictions on a test sample. Use the metrics RMSE, MAE, and R2 to evaluate the quality of the model.


The dataset columns contain the following data:
- longitude - latitude;
- latitude - longitude;
- housing_median_age - median age of residents of the housing estate;
- total_rooms - total number of rooms in the houses of the housing estate;
- total_bedrooms - total number of bedrooms in the houses of the housing estate;
- population - number of people who live in the housing estate;
- households - number of households in the housing estate;
- median_income - median income of residents of the housing estate;
- median_house_value - median value of a house in the housing estate;
- ocean_proximity - proximity to the ocean.

Based on the data, we need to predict the median value of a house in the housing estate - median_house_value. 


# Data preparation test 1

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, sum, isnan,when,count


from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator


In [2]:

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2022

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing - Logistic regression") \
                    .getOrCreate()



In [14]:
df2 = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)

In [15]:
df2.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [16]:
null_counts = df2.agg(*[sum(col(c).isNull().cast("integer")).alias(c) for c in df2.columns])


In [17]:
null_counts.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



207 missing values in total_bedrooms. Let's find the median and fill the gaps with it

In [18]:
median_value = df2.approxQuantile("total_bedrooms", [0.5], 0.01)[0]
print(median_value)

433.0


In [19]:
df2 = df2.fillna({"total_bedrooms": median_value})


Let's create lists with different types for further processing.

In [20]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age','total_rooms','total_bedrooms','population','households', 'median_income']
target = 'median_house_value' 

In [21]:
train_data, test_data = df2.randomSplit([.8,.2], seed=2077)
print(train_data.count(), test_data.count())

16630 4010


In [22]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 

ind_fit = indexer.fit(train_data)
train_data = ind_fit.transform(train_data)
test_data = ind_fit.transform(test_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
cols = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]

In [23]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])

enc_fit = encoder.fit(train_data)
train_data = enc_fit.transform(train_data)
test_data = enc_fit.transform(test_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
cols = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]

In [24]:
categorical_assembler = \
        VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols], outputCol="categorical_features")


train_data = categorical_assembler.transform(train_data)
test_data = categorical_assembler.transform(test_data) 

In [25]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")

train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)


standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")

scal_fit = standardScaler.fit(train_data)
train_data = scal_fit.transform(train_data) 
test_data = scal_fit.transform(test_data)

                                                                                

In [26]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 


train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

# Model training

In [27]:
lr = LinearRegression(labelCol=target, featuresCol='features', regParam=0.000000001)

model = lr.fit(train_data) 

23/04/13 08:40:52 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/04/13 08:40:52 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/04/13 08:40:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/04/13 08:40:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [28]:
predictions = model.transform(test_data)

predictedLabes = predictions.select(target, 'prediction')
predictedLabes.show(5) 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0|218273.38635694515|
|           78300.0|127173.52934260014|
|           68400.0|131328.07167428266|
|           70000.0| 149621.5698887268|
|           82800.0| 172522.5111210551|
+------------------+------------------+
only showing top 5 rows



In [29]:
rmse_full = RegressionEvaluator(metricName="rmse", labelCol="median_house_value").evaluate(predictions)
mae_full = RegressionEvaluator(metricName="mae", labelCol="median_house_value", predictionCol="prediction").evaluate(predictions)
r2_full = RegressionEvaluator(metricName="r2", labelCol="median_house_value", predictionCol="prediction").evaluate(predictions)


In [30]:
lr = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled', regParam=0.000000001)

model = lr.fit(train_data) 

In [31]:
predictions = model.transform(test_data)

predictedLabes = predictions.select(target, 'prediction')
predictedLabes.show(5) 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0|191388.06816450972|
|           78300.0| 78310.35648987861|
|           68400.0| 80660.87214292958|
|           70000.0|116823.29398790281|
|           82800.0|139235.69942980027|
+------------------+------------------+
only showing top 5 rows



In [32]:
rmse_num = RegressionEvaluator(metricName="rmse", labelCol="median_house_value").evaluate(predictions)
mae_num = RegressionEvaluator(metricName="mae", labelCol="median_house_value", predictionCol="prediction").evaluate(predictions)
r2_num = RegressionEvaluator(metricName="r2", labelCol="median_house_value", predictionCol="prediction").evaluate(predictions)



In [34]:
spark.stop()

In [35]:
print('      full_data', '          num_data')
print('RMSE:', rmse_full, '  ', rmse_num)
print('MAE: ', mae_full, ' ', mae_num)
print('R2:  ', r2_full, '', r2_num)

      full_data           num_data
RMSE: 68184.02564971607    69246.32171879576
MAE:  49547.44945789279   50754.75482656654
R2:   0.6411455796432266  0.629876687005651


# Analyzing the results


* The RMSE for the full data set is 68,233.79, which means that on average the model's predictions are wrong by about 68,233.79. The RMSE for the num_data subset is slightly higher at 69,309.65, indicating that the model performs about equally on both data subsets.

* The MAE for the full data set is 49,637.20, indicating that on average the model's predictions are wrong by about 49,637.20. The MAE for the num_data subset is slightly higher at 50,895.61, indicating that the model performs about equally on both data subsets.

* The R2 for the full data set is 0.6406, indicating that the model explains 64.06% of the variance in the data. The R2 for the num_data subset is slightly lower at 0.6292, indicating that the model performs about equally on both data subsets

