# California Housing Prices

### Описание проекта и условия задачи

На основе представленных данных необходимо предсказать медианную стоимость дома в жилом массиве — median_house_value. 

Обучим модель и сделаем предсказания на тестовой выборке. 
Для оценки качества модели будут использованы метрики RMSE, MAE и R2.

### Описание данных
**Признаки**
- longitude — широта
- latitude — долгота
- housing_median_age — медианный возраст жителей жилого массива
- total_rooms — общее количество комнат в домах жилого массива
- total_bedrooms — общее количество спален в домах жилого массива
- population — количество человек, которые проживают в жилом массиве
- households — количество домовладений в жилом массиве
- median_income — медианный доход жителей жилого массива
- ocean_proximity — близость к океану

**Целевая переменная**
- median_house_value — медианная стоимость дома в жилом массиве

### План работы

1. Инициализировать локальную Spark-сессию.
2. Загрузить и предварительно изучить данные и их типы.
3. Выполнить предобработку данных.
4. Подготовить данные к обучению.
5. Построить две модели линейной регрессии на разных наборах данных:
    - используя все данные из файла;
    - используя только числовые переменные, исключив категориальные.
6. Сравнить результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2.

# Подготовка данных

In [1]:
!pip install --upgrade pyspark -q

In [12]:
import pandas as pd 
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder as OHE_encoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator as OHE_encoder 
    
from pyspark.ml import Pipeline
    
RANDOM_SEED = 2023

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Californication") \
                    .getOrCreate()

In [3]:
df_housing = spark.read.load('/datasets/housing.csv',
                                     format="csv",
                                     sep=",",
                                     inferSchema=True,
                                     header="true")
df_housing.printSchema()  

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

In [4]:
#типы колонок

print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']).head(10),'\n')

df_housing.show(10)

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string 

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          3585

In [5]:
# выведем статистику
df_housing.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
longitude,20640,-119.56970445736148,2.003531723502584,-124.35,-114.31
latitude,20640,35.6318614341087,2.135952397457101,32.54,41.95
housing_median_age,20640,28.639486434108527,12.58555761211163,1.0,52.0
total_rooms,20640,2635.7630813953488,2181.6152515827944,2.0,39320.0
total_bedrooms,20433,537.8705525375618,421.38507007403115,1.0,6445.0
population,20640,1425.4767441860465,1132.46212176534,3.0,35682.0
households,20640,499.5396802325581,382.3297528316098,1.0,6082.0
median_income,20640,3.8706710029070246,1.899821717945263,0.4999,15.0001
median_house_value,20640,206855.81690891474,115395.61587441359,14999.0,500001.0


### Предобработка данных

Проверим наличие пропусков

In [6]:
columns = df_housing.columns

for column in columns:
    check_col = F.col(column).cast(FloatType())
    print(column, df_housing.filter(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


В колонке `total bedrooms` обнаружено 207 пропусков, устраним их с помощью медианных значений

In [7]:
window = Window().partitionBy('total_rooms')

# столбец с медианными значениями по окнам
df_housing = df_housing.withColumn('median_bedrooms',
                                   F.percentile_approx('total_bedrooms', 0.5).over(window))

# пропущенные значения заполняем значениями из оконной медианы
df_housing = df_housing.withColumn('total_bedrooms',
                                   F.coalesce('total_bedrooms', 'median_bedrooms'))

df_housing = df_housing.drop('median_bedrooms')

Проверка

In [8]:
for column in columns:
    check_col = F.col(column).cast(FloatType())
    print(column, df_housing.filter(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0


                                                                                

total_bedrooms 15
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [9]:
median_value = df_housing.approxQuantile('total_bedrooms', [0.5], 0)[0]
df_housing = df_housing.na.fill({'total_bedrooms': median_value})

Проверка

In [10]:
for column in columns:
    check_col = F.col(column).cast(FloatType())
    print(column, df_housing.filter(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


### Разработка модели

Имеем категориальный признак `ocean_proximity`, обработаем его с помощью OHE

In [13]:
stages = []

categoricalColumns = ['ocean_proximity']

for categoricalCol in categoricalColumns:
    #to binary vectors
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    
    encoder = OHE_encoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    
    stages += [stringIndexer, encoder]

####
numericCols = ['longitude',
               'latitude',
               'housing_median_age',
               'total_rooms',
               'total_bedrooms',
               'population',
               'households',
               'median_income']

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols

#assemble all
assembler_all = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler_all]

In [14]:
#split for train & test (75/25)

train_data, test_data = df_housing.randomSplit([0.75, 0.25], seed=RANDOM_SEED)

#train LinearRegression
lr = LinearRegression(labelCol="median_house_value", featuresCol="features")

stages += [lr]

pipeline_all = Pipeline(stages=stages)

# Обучаем модели

In [15]:
model = pipeline_all.fit(train_data)

predictions_all = model.transform(test_data)

predictedLabes = predictions_all.select("median_house_value", "prediction")

predictedLabes.show() 

                                                                                

23/03/19 21:29:41 WARN Instrumentation: [c3d0ea92] regParam is zero, which might cause numerical instability and overfitting.


[Stage 114:>                                                        (0 + 1) / 1]

23/03/19 21:29:41 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/19 21:29:41 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/03/19 21:29:42 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/03/19 21:29:42 WARN Instrumentation: [c3d0ea92] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/03/19 21:29:43 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed
23/03/19 21:29:43 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved?


                                                                                

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           78300.0|126053.06313147536|
|           67000.0| 152674.9062986672|
|           81300.0|151709.92211808916|
|           62500.0|166995.67592948908|
|          100600.0| 191275.1949282037|
|          104200.0| 200436.3732560724|
|           86400.0|193272.47017765278|
|           74100.0|157532.02935895463|
|          128100.0|221782.68750601867|
|          130600.0|217837.95310454862|
|           99600.0|186214.21014077123|
|           92800.0|210766.21543930937|
|           83000.0|177852.73969092593|
|           70500.0|173305.99998215633|
|           87500.0|127955.75183109986|
|           82100.0|158756.84895353578|
|           99600.0|151048.75713353418|
|          135600.0|173710.16976094898|
|           78800.0|133522.06435851054|
|          111300.0| 157173.7435074742|
+------------------+------------------+
only showing top 20 rows



In [16]:
eval = RegressionEvaluator(labelCol = 'median_house_value')
rmse = eval.evaluate(predictions_all, {eval.metricName:'rmse'})
mae = eval.evaluate(predictions_all, {eval.metricName:'mae'})
r2 = eval.evaluate(predictions_all,{eval.metricName:'r2'})

res_1 = f' RMSE = {rmse} MAE = {mae} R2 = {r2}'
print(res_1)

 RMSE = 69332.3524277725 MAE = 49875.32583897082 R2 = 0.6392867554397212


Используем шаги предыдущего пайплайна, заменив один шаг на `assembler_num`

In [17]:
assemblerInputs_num = numericCols

assembler_num = VectorAssembler(inputCols=assemblerInputs_num, outputCol="features")
 
stages[2] = assembler_num

In [18]:
pipeline_num = Pipeline(stages=stages)

In [19]:
model = pipeline_num.fit(train_data)

predictions_num = model.transform(test_data)

predictedLabes = predictions_num.select("median_house_value", "prediction")

predictedLabes.show() 

23/03/19 21:29:52 WARN Instrumentation: [349e6ef1] regParam is zero, which might cause numerical instability and overfitting.
+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           78300.0| 79334.96793112345|
|           67000.0|121238.25034267036|
|           81300.0|119519.95579058118|
|           62500.0|136193.06332345633|
|          100600.0|159166.55825704942|
|          104200.0| 169093.7908605039|
|           86400.0|161490.38352680532|
|           74100.0|125969.78347366117|
|          128100.0| 193290.0225374885|
|          130600.0|188195.51557182334|
|           99600.0| 154208.8718324909|
|           92800.0| 179152.6015313766|
|           83000.0|145063.37091207644|
|           70500.0|150527.99132204428|
|           87500.0| 93758.25452290755|
|           82100.0|123779.12561258627|
|           99600.0|114506.27777479915|
|          135600.0|151429.89097918058|
|           78800.0| 98065.3487992

In [20]:
eval = RegressionEvaluator(labelCol = 'median_house_value')
rmse = eval.evaluate(predictions_num, {eval.metricName:'rmse'})
mae = eval.evaluate(predictions_num, {eval.metricName:'mae'})
r2 = eval.evaluate(predictions_num,{eval.metricName:'r2'})

res_2 = f' RMSE = {rmse} MAE = {mae} R2 = {r2}'
print(res_2)


 RMSE = 70545.61288125544 MAE = 51140.82521716775 R2 = 0.6265519133168591


In [23]:
# spark.stop()

# Анализ результатов

In [22]:
print('Linear Regression with all features',res_1, '\n', 'Linear Regression with numeric features', res_2)

Linear Regression with all features  RMSE = 69332.3524277725 MAE = 49875.32583897082 R2 = 0.6392867554397212 
 Linear Regression with numeric features  RMSE = 70545.61288125544 MAE = 51140.82521716775 R2 = 0.6265519133168591


Модель, обученная на полной тренировочной выборке лучше, пусть и не столь значительно.
Значения метрик достаточно низкие, для повышения необходимо провести более детальную предобработку данных, рассчет корреляций и выбросов.

Итоговые лучшие метрики: RMSE = 69332 MAE = 49875 R2 = 0.6393 