# Предсказание стоимости жилья

## Описание проекта

В проекте нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучим модель и сделаем предсказания на тестовой выборке. Для оценки качества модели используем метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:
- `longitude` — широта;
- `latitude` — долгота;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве;
- `ocean_proximity` — близость к океану.

## Подготовка данных

In [1]:
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

Инициализируем локальную Spark-сессию и прочитаем содержимое файла.

In [2]:
RANDOM_SEED = 33

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Californication - Linear regression") \
                    .getOrCreate()

df = spark.read.load('/datasets/housing.csv', format='csv', sep=',', inferSchema=True, header='true')

                                                                                

In [3]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [4]:
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [5]:
pd.DataFrame(df.dtypes, columns=['column', 'type'])

Unnamed: 0,column,type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


In [6]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Удалим дубликаты и проверим на пропуски.

In [7]:
df = df.dropDuplicates()

In [8]:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



                                                                                

`total_bedrooms` имеет 207 пропусков, заменим их медианой.

In [9]:
tb_median = df.approxQuantile('total_bedrooms', [0.5], 0)[0]
tb_median

                                                                                

435.0

In [10]:
df = df.na.fill(value=tb_median, subset=["total_bedrooms"])

In [11]:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



                                                                                

### Разделение на выборки

In [12]:
train_data, test_data = df.randomSplit([.8, .2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())



16433 4207


                                                                                

### Трансформация категорийных признаков

Трансформируем категориальные признаки с помощью трансформера StringIndexer. Он переводит текстовые категории в числовое представление.

In [13]:
cat_cols = ['ocean_proximity']
num_cols = ['longitude','latitude', 'housing_median_age', 'total_rooms', 
            'total_bedrooms', 'population', 'households', 'median_income']
target = ['median_house_value']

In [14]:
indexer = StringIndexer(inputCols=cat_cols, 
                        outputCols=[c+'_idx' for c in cat_cols]) 
model = indexer.fit(train_data)
train_data = model.transform(train_data)
test_data = model.transform(test_data)

cols = [c for c in train_data.columns for i in cat_cols if (c.startswith(i))]
train_data.select(cols).show(3)

                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|      <1H OCEAN|                0.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows



Дополнительно можно сделать OHE-кодирование (One hot encoder) для категорий.

In [15]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in cat_cols],
                        outputCols=[c+'_ohe' for c in cat_cols])
model = encoder.fit(train_data)
train_data = model.transform(train_data)
test_data = model.transform(test_data)

cols = [c for c in train_data.columns for i in cat_cols if (c.startswith(i))]
train_data.select(cols).show(3)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|      <1H OCEAN|                0.0|      (4,[0],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



Финальный шаг преобразований — объединение признаков в один вектор, с которым ML-алгоритм умеет работать.

In [16]:
cat_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in cat_cols],
                                outputCol='categorical_features')

train_data = cat_assembler.transform(train_data)
test_data = cat_assembler.transform(test_data)

In [17]:
train_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)



### Трансформация числовых признаков

Произведем стандартизацию (шкалирование) значений — чтобы сильные выбросы не смещали предсказания модели.

In [18]:
num_assembler = VectorAssembler(inputCols=num_cols, outputCol='numerical_features')
train_data = num_assembler.transform(train_data)
test_data = num_assembler.transform(test_data)

In [19]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled')
model = standardScaler.fit(train_data)

train_data = model.transform(train_data)
test_data = model.transform(test_data)

                                                                                

In [20]:
train_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numerical_features: vector (nullable = true)
 |-- numerical_features_scaled: vector (nullable = true)



Финальный шаг — собрать трансформированные категорийные и числовые признаки.

In [21]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol='features') 
train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

train_data.select(all_features).show(3)
train_data.select('features').show(3)

                                                                                

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.832365530924...|
|       (4,[0],[1.0])|     [-61.109617203476...|
|       (4,[2],[1.0])|     [-61.054788020015...|
+--------------------+-------------------------+
only showing top 3 rows

+--------------------+
|            features|
+--------------------+
|[0.0,0.0,1.0,0.0,...|
|[1.0,0.0,0.0,0.0,...|
|[0.0,0.0,1.0,0.0,...|
+--------------------+
only showing top 3 rows



In [22]:
pd.DataFrame(train_data.dtypes, columns=['column', 'type'])

Unnamed: 0,column,type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


## Обучение моделей

Обучим и сравним модель используя все данные из файла и используя только числовые переменные, исключив категориальные.

In [23]:
lr = LinearRegression(labelCol='median_house_value', featuresCol='features', maxIter=10)
model = lr.fit(train_data)

22/12/09 11:19:20 WARN Instrumentation: [c7c7f93b] regParam is zero, which might cause numerical instability and overfitting.
22/12/09 11:19:21 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/09 11:19:21 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/12/09 11:19:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/12/09 11:19:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [24]:
predictions = model.transform(test_data)

predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           67900.0|122595.89795868425|
|          500001.0|451341.20104264654|
|          112500.0| 161072.1015965338|
|          133400.0|182138.22120002192|
|          137500.0|104880.58984675026|
|          132400.0|190358.92072507925|
|          154900.0|26496.076108738314|
|          500001.0| 341111.8871703581|
|          429100.0| 281352.9477849901|
|          139500.0|134220.97702931752|
|          127100.0|154525.86268744105|
|          196900.0|211390.94220880372|
|          214100.0|232627.49282042542|
|          343100.0|236803.72161215823|
|           98300.0| 98084.07690468617|
|          344200.0|298058.54352872726|
|          291500.0|202325.25385992136|
|          147900.0|237835.77072464302|
|          344100.0|343457.10986782657|
|          274600.0| 289394.3243488739|
+------------------+------------------+
only showing top 20 rows



In [25]:
rmse = RegressionEvaluator(labelCol='median_house_value', metricName='rmse').evaluate(predictions)
mae = RegressionEvaluator(labelCol='median_house_value', metricName='mae').evaluate(predictions)
r2 = RegressionEvaluator(labelCol='median_house_value', metricName='r2').evaluate(predictions)

                                                                                

In [26]:
result = pd.DataFrame(data = {'features': ['all'], 'rmse': [rmse], 'mae': [mae], 'r2': [r2]})

In [27]:
lr = LinearRegression(labelCol='median_house_value', featuresCol='numerical_features_scaled', maxIter=10)
model = lr.fit(train_data)

predictions = model.transform(test_data)
predictedLabes = predictions.select("median_house_value", "prediction")

rmse = RegressionEvaluator(labelCol='median_house_value', metricName='rmse').evaluate(predictions)
mae = RegressionEvaluator(labelCol='median_house_value', metricName='mae').evaluate(predictions)
r2 = RegressionEvaluator(labelCol='median_house_value', metricName='r2').evaluate(predictions)

22/12/09 11:20:02 WARN Instrumentation: [02d32d8e] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [28]:
result = result.append({'features': 'num', 'rmse': rmse, 'mae': mae, 'r2': r2}, ignore_index=True)

In [29]:
# остановим Spark сессию
spark.stop()

## Анализ результатов

In [30]:
result

Unnamed: 0,features,rmse,mae,r2
0,all,69208.659866,50187.564635,0.638066
1,num,70099.775327,51023.627846,0.628686


Результаты с категориальным признаком и без почти одинаковые. Возможно у него маленькая корреляция с таргет признаком.