# Предсказание стоимости жилья

## Описание исследования

### Цель и задачи

**Цель:** построить модель прогнозирования медианной стоимости дома в жилом массиве.

**Задачи:**
* провести загрузку и обзор данных;
* провести подготовку данных (изменение типов, заполнение пропусков, трансформация);
* построить и обучить модели линейной регрессии и оценить их качество (метрики RMSE, MAE и R2).

### Описание данных

Файл housing.csv содержит данные о жилье в Калифорнии в 1990 году, которые включают следующие признаки:

* longitude — широта;
* latitude — долгота;
* housing_median_age — медианный возраст жителей жилого массива (примечание: вероятнее всего, имеется в виду медианный возраст домов, а не жителей);
* total_rooms — общее количество комнат в домах жилого массива;
* total_bedrooms — общее количество спален в домах жилого массива;
* population — количество человек, которые проживают в жилом массиве;
* households — количество домовладений в жилом массиве;
* median_income — медианный доход жителей жилого массива;
* median_house_value — медианная стоимость дома в жилом массиве (целевой признак);
* ocean_proximity — близость к океану.

## Импорт библиотек и запуск Spark-сессии

In [None]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

In [None]:
RANDOM_SEED = 42

In [None]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("California Housing") \
                    .getOrCreate()

## Загрузка и обзор данных

In [None]:
df_housing = spark.read.option('header', 'true').csv('...', inferSchema = True)

                                                                                

In [None]:
df_housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [None]:
df_housing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Признаки housing_median_age, total_rooms, total_bedrooms, population, households по сути являются целочисленными и могут быть преобразованы.

In [None]:
df_housing.summary().toPandas().set_index('summary').T \
          .astype({'mean': 'float', 'stddev': 'float'})

                                                                                

summary,count,mean,stddev,min,25%,50%,75%,max
longitude,20640,-119.569704,2.003532,-124.35,-121.8,-118.49,-118.01,-114.31
latitude,20640,35.631861,2.135952,32.54,33.93,34.26,37.71,41.95
housing_median_age,20640,28.639486,12.585558,1.0,18.0,29.0,37.0,52.0
total_rooms,20640,2635.763081,2181.615252,2.0,1447.0,2127.0,3146.0,39320.0
total_bedrooms,20433,537.870553,421.38507,1.0,296.0,435.0,647.0,6445.0
population,20640,1425.476744,1132.462122,3.0,787.0,1166.0,1724.0,35682.0
households,20640,499.53968,382.329753,1.0,280.0,409.0,605.0,6082.0
median_income,20640,3.870671,1.899822,0.4999,2.5625,3.5347,4.7426,15.0001
median_house_value,20640,206855.816909,115395.615874,14999.0,119600.0,179700.0,264700.0,500001.0
ocean_proximity,20640,,,<1H OCEAN,,,,NEAR OCEAN


Описательная статистика не показывает явных аномалий в значениях признаков (если под housing_median_age все-таки понимать медианный возраст домов).

Рассмотрим уникальные значения признака ocean_proximity.

In [None]:
df_housing.select('ocean_proximity').distinct().show()



+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



                                                                                

Всего 5 значений, неявных дубликатов нет.

## Подготовка данных

### Изменение типов данных

Изменим типы данных в housing_median_age, total_rooms, total_bedrooms, population и households на целочисленные.

In [None]:
columns = ['housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households']

for column in columns:
    df_housing = df_housing.withColumn(column, df_housing[column].cast(IntegerType()))

df_housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: integer (nullable = true)
 |-- total_rooms: integer (nullable = true)
 |-- total_bedrooms: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- households: integer (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Типы изменены.

### Обработка пропусков

Подсчитаем количество пропусков в каждом столбце.

In [None]:
columns = df_housing.columns

for column in columns:
    print(column, df_housing.filter(df_housing[column].isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Обнаружено 207 пропусков в total_bedrooms. Заменим их по следующей формуле:

total_bedrooms = total_rooms * avg(total_bedrooms / total_rooms)

In [None]:
df_housing = df_housing.withColumn('bedrooms_ratio', F.col('total_bedrooms') /  F.col('total_rooms'))
bedrooms_ratio_avg = df_housing.groupBy().avg('bedrooms_ratio').take(1)[0][0]
df_housing = df_housing.withColumn('suggested_total_bedrooms', bedrooms_ratio_avg * F.col('total_rooms'))
df_housing = df_housing.withColumn('total_bedrooms', F.coalesce('total_bedrooms', 'suggested_total_bedrooms').cast(IntegerType()))
df_housing = df_housing.drop('bedrooms_ratio', 'suggested_total_bedrooms')

In [None]:
columns = df_housing.columns

for column in columns:
    print(column, df_housing.filter(df_housing[column].isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Пропуски устранены.

### Добавление признаков

Добавим признаки:
* число комнат на одно домовладение;
* число жителей в одном домовладении;
* отношение числа спален к общему числу комнат.

In [None]:
df_housing = df_housing.withColumn('rooms_per_household', F.col('total_rooms')/F.col('households'))
df_housing = df_housing.withColumn('population_per_household', F.col('population')/F.col('households'))
df_housing = df_housing.withColumn('bedroom_index', F.col('total_bedrooms')/F.col('total_rooms'))

df_housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: integer (nullable = true)
 |-- total_rooms: integer (nullable = true)
 |-- total_bedrooms: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- households: integer (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- population_per_household: double (nullable = true)
 |-- bedroom_index: double (nullable = true)



### Трансформация признаков

Разделим данные на тренировочную (75%) и тестовую (25%) выборки.

In [None]:
train_data, test_data = df_housing.randomSplit([.75,.25], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

15500 5140


Сохраним исходный train_data (понадобится в будущем для пайплайна).

In [None]:
train_data_orig = train_data

Разделим столбцы на типы.

In [None]:
categorical_col = 'ocean_proximity'
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', \
                   'total_bedrooms', 'population', 'households', 'median_income', \
                   'rooms_per_household', 'population_per_household', 'bedroom_index']
target = 'median_house_value'

Трансформируем категориальные признаки с помощью трансформера StringIndexer

In [None]:
indexer = StringIndexer(inputCol=categorical_col,
                        outputCol=categorical_col+'_idx',
                        handleInvalid='skip'
                       )

train_data = indexer.fit(train_data).transform(train_data)

train_data.select(categorical_col+'_idx').show(3)

                                                                                

+-------------------+
|ocean_proximity_idx|
+-------------------+
|                2.0|
|                2.0|
|                2.0|
+-------------------+
only showing top 3 rows



Закодируем с помощью OneHotEncoder.

In [None]:
encoder = OneHotEncoder(inputCol=categorical_col+'_idx',
                        outputCol=categorical_col+'_ohe',
                        handleInvalid='keep',
                        dropLast=True
                       )
train_data = encoder.fit(train_data).transform(train_data)

train_data.select(categorical_col+'_ohe').show(3)

+-------------------+
|ocean_proximity_ohe|
+-------------------+
|      (5,[2],[1.0])|
|      (5,[2],[1.0])|
|      (5,[2],[1.0])|
+-------------------+
only showing top 3 rows



Проведем масштабирование числовых признаков.

In [None]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol='numerical_features')
train_data = numerical_assembler.transform(train_data)

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")
train_data = standardScaler.fit(train_data).transform(train_data)

                                                                                

In [None]:
train_data.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'rooms_per_household',
 'population_per_household',
 'bedroom_index',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'numerical_features',
 'numerical_features_scaled']

Cоберем все трансформированные признаки в вектора.

In [None]:
all_features = ['ocean_proximity_ohe','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features,
                                  outputCol='features_vector_all')
train_data = final_assembler.transform(train_data)

train_data.select('features_vector_all').show(3)

+--------------------+
| features_vector_all|
+--------------------+
|[0.0,0.0,1.0,0.0,...|
|[0.0,0.0,1.0,0.0,...|
|[0.0,0.0,1.0,0.0,...|
+--------------------+
only showing top 3 rows



                                                                                

## Обучение моделей

### Построение модели (все признаки)

Инициализируем модель.

In [None]:
lr_all= LinearRegression(labelCol=target, featuresCol='features_vector_all')

Сформируем пайплайн.

In [None]:
pipeline_all = Pipeline(stages=[indexer, encoder, numerical_assembler, standardScaler, final_assembler, lr_all])

Обучим модель на тренировочных данных.

In [None]:
train_data = train_data_orig

In [None]:
model_all = pipeline_all.fit(train_data)

24/10/18 06:29:07 WARN Instrumentation: [d4429d51] regParam is zero, which might cause numerical instability and overfitting.
24/10/18 06:29:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/10/18 06:29:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/10/18 06:29:08 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/10/18 06:29:08 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
24/10/18 06:29:08 WARN Instrumentation: [d4429d51] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

Рассчитаем прогноз по тестовым данным.

In [None]:
predictions = model_all.transform(test_data)

Сравним прогнозы модели с фактическими значениями.

In [None]:
predicted_values = predictions.select(target, 'prediction')
predicted_values.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|141070.95117979078|
|          106700.0|208419.22512170626|
|           73200.0| 114155.8161535575|
|           78300.0|118362.74095215462|
|           90100.0|187551.75832259795|
|           67000.0|147182.33354985062|
|           86400.0|177986.33904408012|
|           70500.0| 155867.5552616478|
|           60000.0|193487.78904044395|
|           75500.0|158737.61068140622|
|          109400.0| 158557.5952445669|
|           85100.0|173717.64104787633|
|           92700.0| 176060.9773799493|
|           80500.0| 179541.0197153655|
|           96000.0|  162475.556768307|
|           75500.0|131402.24326224625|
|           75000.0| 92704.42324697645|
|          100600.0|182298.19835487613|
|           74100.0|150981.18114304077|
|           66800.0|130784.45854256907|
+------------------+------------------+
only showing top 20 rows



Рассчитаем метрики RMSE, MAE и R2.

In [None]:
def compute_metrics(predicted_values):
    metrics = {}
    metrics['RMSE'] = RegressionEvaluator(predictionCol='prediction',
                                          labelCol=target,
                                          metricName='rmse'
                                         ).evaluate(predicted_values)

    metrics['MAE'] = RegressionEvaluator(predictionCol='prediction',
                                         labelCol=target,
                                         metricName='mae'
                                        ).evaluate(predicted_values)

    metrics['R2'] = RegressionEvaluator(predictionCol='prediction',
                                        labelCol=target,
                                        metricName='r2'
                                       ).evaluate(predicted_values)
    return metrics

metrics = compute_metrics(predicted_values)

df_metrics_all = pd.DataFrame.from_dict(metrics,
                                        orient='index',
                                        columns=['all_features']
                                       )
df_metrics_all

Unnamed: 0,all_features
RMSE,69507.472482
MAE,49590.334253
R2,0.644045


Модель показала невысокие метрики.

### Построение модели (только числовые признаки)

Инициализируем модель только с числовыми признаками

In [None]:
lr_numeric = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')

Обновим пайплайн.

In [None]:
pipeline_numerical = Pipeline(stages=[numerical_assembler, standardScaler, lr_numeric])

Обучим модель.

In [None]:
model_numerical = pipeline_numerical.fit(train_data)

24/10/18 06:29:15 WARN Instrumentation: [fc7c5cf1] regParam is zero, which might cause numerical instability and overfitting.


Сделаем прогноз а тестовой выборке.

In [None]:
predictions = model_numerical.transform(test_data)

In [None]:
predicted_values = predictions.select(target, 'prediction')
predicted_values.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0| 97215.64453976694|
|          106700.0| 183991.8275730987|
|           73200.0| 69567.91299050767|
|           78300.0| 75684.56591266766|
|           90100.0|158534.82029258646|
|           67000.0|118324.38211245881|
|           86400.0|151000.48688858794|
|           70500.0|126610.38510251511|
|           60000.0|172405.10137913935|
|           75500.0| 131163.3858633628|
|          109400.0|113407.67666679062|
|           85100.0|147449.01770643098|
|           92700.0|147113.70280819898|
|           80500.0|152153.31436240347|
|           96000.0|130122.41700290097|
|           75500.0| 98117.30022368627|
|           75000.0| 44761.40289472975|
|          100600.0| 152385.3749035853|
|           74100.0|121779.62134737289|
|           66800.0|103421.30984601425|
+------------------+------------------+
only showing top 20 rows



Рассчитаем метрики.

In [None]:
metrics = compute_metrics(predicted_values)

df_metrics_num = pd.DataFrame.from_dict(metrics,
                                        orient='index',
                                        columns=['num_features']
                                       )
df_metrics_num

Unnamed: 0,num_features
RMSE,70345.573274
MAE,50308.277758
R2,0.635409


Метрики данной модели ниже (хотя и несильно), чем у модели со всеми признаками.

### Подбор параметров модели

Выполним подбор гиперпараметров для модели со всеми признаками. Будем перебирать параметры:
* фактор регуляризации regParam;
* максимальное число итераций maxIter;
* фактор вида регуляризации elasticNetParam.

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr_all.regParam, [1, 0.5, 0.01]) \
    .addGrid(lr_all.maxIter, [200, 100, 10]) \
    .addGrid(lr_all.elasticNetParam, [1, 0.5, 0]) \
    .build()

Целевая метрика - RMSE.

In [None]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol=target,
                                metricName='rmse'
                               )

Проведем подбор параметров с кросс-валидацией.

In [None]:
crossval = CrossValidator(estimator=pipeline_all,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          seed=RANDOM_SEED
                         )

In [None]:
cv_model = crossval.fit(train_data)

                                                                                

Выведем параметры лучшей модели.

In [None]:
cv_model.bestModel.stages[-1].extractParamMap()

{Param(parent='LinearRegression_5ccb1db14a61', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_5ccb1db14a61', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0,
 Param(parent='LinearRegression_5ccb1db14a61', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_5ccb1db14a61', name='featuresCol', doc='features column name.'): 'features_vector_all',
 Param(parent='LinearRegression_5ccb1db14a61', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_5ccb1db14a61', name='labelCol', doc='label column name.'): 'median_house_value',
 Param(parent='LinearRegression_5ccb1db14a61', name='loss', doc='The loss function to be optimized. Supported options: squaredError,

Модель использует L2-регуляризацию с фактором регуляризации 0.5.

Сделаем прогноз на тестовой выборке и рассчитаем метрики.

In [None]:
predictions = cv_model.transform(test_data)
predicted_values = predictions.select(target, 'prediction')

In [None]:
metrics = compute_metrics(predicted_values)

df_metrics_tuned = pd.DataFrame.from_dict(metrics,
                                          orient='index',
                                          columns=['tuned']
                                         )
df_metrics_tuned

Unnamed: 0,tuned
RMSE,69412.226681
MAE,49589.665063
R2,0.64502


Сравним метрики всех полученных моделей.

In [None]:
df_metrics_all.join([df_metrics_num, df_metrics_tuned])

Unnamed: 0,all_features,num_features,tuned
RMSE,69507.472482,70345.573274,69412.226681
MAE,49590.334253,50308.277758,49589.665063
R2,0.644045,0.635409,0.64502


У модели с подбором параметров все метрики незначительно лучше, чем у модели со всеми признаками. Подбор параметров не привел к значимому улучшению качества модели.

### Анализ коэффициентов

Выведем коэффициенты модели для признаков.

In [None]:
# Получаем словарь с индексами и именами
dict_all = predictions.schema['features_vector_all'].metadata['ml_attr']['attrs']

# Отдельно обрабатываем числовые признаки (т.к. по ним дополнительно проводилось масштабирование)
df_num = pd.DataFrame(dict_all['numeric']).sort_values('idx')

dict_num = predictions.schema['numerical_features'].metadata['ml_attr']['attrs']
df_num_orig = pd.DataFrame(dict_num['numeric']).sort_values('idx')
df_num['name'] = df_num_orig['name']

# Отдельно обрабатываем признак ocean_proximity после трансформации OHE
df_bin = pd.DataFrame(dict_all['binary']).sort_values('idx')

# Соединяем два датафрейма
coefficients = pd.concat([df_bin, df_num]).set_index('idx')

coefficients['coefficients'] = cv_model.bestModel.stages[-1].coefficients
coefficients

Unnamed: 0_level_0,name,coefficients
idx,Unnamed: 1_level_1,Unnamed: 2_level_1
0,ocean_proximity_ohe_<1H OCEAN,8719.469828
1,ocean_proximity_ohe_INLAND,-25130.757437
2,ocean_proximity_ohe_NEAR OCEAN,12948.922175
3,ocean_proximity_ohe_NEAR BAY,3696.068381
4,ocean_proximity_ohe_ISLAND,147742.037925
5,longitude,-57680.278237
6,latitude,-58835.403413
7,housing_median_age,13523.632578
8,total_rooms,9861.291652
9,total_bedrooms,2781.055176


Значительное положительное влияние на стоимость жилья оказывают:
* медианный доход (median_income);
* число домохозяйств в блоке (households);
* соотношение числа спален к общему числу комнат (bedroom_index).

Отрицательно влияют:
* широта и долгота (latitude и longitude) - чем севернее и восточнее, тем ниже стоимость;
* число жителей блока (population).

Кроме того, стоимость жилья возрастает по мере приближения к океану, а самая высокая - на островах (ISLAND).

Закроем Spark-сессию.

In [None]:
spark.stop()

## Анализ результатов и выводы

Выполненные задачи:
* проведена загрузка и обзор данных;
* типы данных в housing_median_age, total_rooms, total_bedrooms, population и households изменены на Int;
* пропуски в total_bedrooms заполнены исходя из среднего соотношения total_bedrooms и total_rooms;
* сформирован пайплайн для трансформации и векторизации признаков;
* построены и обучены две модели линейной регрессии: с полным набором признаков и только с числовыми признаками;
* лучшее качество показала модель с полным набором признаков, для нее был осуществлен подбор гиперпараметров с кросс-валидацией, который однако не привел к значимому улучшению качества;
* метрики модели с полным набором признаков и подобранными гиперпараметрами: RMSE=69412, MAE=49590, R2=0.645.

Выводы на основе полученной модели:
* на стоимость жилья в наибольшей степени влияют median_income, households, bedroom_index (положительно), а также latitude, longitude и population (отрицательно);
* стоимость жилья возрастает по мере приближения к океану.

Вместе с тем качество полученной модели невысокое, и полученные на ее основе выводы следует использовать с осторожностью.