# Предсказание стоимости жилья в жилом массиве

Требуется обучить модель линейной регрессии на данных о жилье в одном регионе в 1990 году. На основе этих данных нужно предсказать медианную стоимость дома в жилом массиве. Для оценки качества модели используйте метрики RMSE, MAE и R2.

## Инициализируем локальную Spark-сессию.
Для этого загружаем все необходимые модули.

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2022

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing - Linear regression") \
                    .getOrCreate()

## Загружаем и читаем датасет.

In [2]:
try:
    df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)
except:
    df = spark.read.option('header', 'true').csv('/housing_cost/housing.csv', inferSchema = True)

                                                                                

In [3]:
df.toPandas().head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [4]:
total_rows = df.count()
print("Количество строк:", total_rows)

Количество строк: 20640


In [5]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



В колонках датасета содержатся следующие данные:
* `longitude` — широта;
* `latitude` — долгота;
* `housing_median_age` — медианный возраст жителей жилого массива;
* `total_rooms` — общее количество комнат в домах жилого массива;
* `total_bedrooms` — общее количество спален в домах жилого массива;
* `population` — количество человек, которые проживают в жилом массиве;
* `households` — количество домовладений в жилом массиве;
* `median_income` — медианный доход жителей жилого массива;
* `median_house_value` — медианная стоимость дома в жилом массиве;
* `ocean_proximity` — близость к океану.

Все колонки кроме одной имеют числовой тип данных, только колонка `ocean_proximity` относится к категориальному типу данных.

## Подготовка данных

### Проверяем датафрейм на наличие пропусков

In [6]:
missing_values = df.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in df.columns])

missing_values.toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,207,0,0,0,0,0


Пропуски присутствуют только в колонке `total_bedrooms` — общее количество спален в домах жилого массива.

In [7]:
df_missing_rows = df.filter(F.col('total_bedrooms').isNull())

df_missing_rows.toPandas().head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.16,37.77,47.0,1256.0,,570.0,218.0,4.375,161900.0,NEAR BAY
1,-122.17,37.75,38.0,992.0,,732.0,259.0,1.6196,85100.0,NEAR BAY
2,-122.28,37.78,29.0,5154.0,,3741.0,1273.0,2.5762,173400.0,NEAR BAY
3,-122.24,37.75,45.0,891.0,,384.0,146.0,4.9489,247100.0,NEAR BAY
4,-122.1,37.69,41.0,746.0,,387.0,161.0,3.9063,178400.0,NEAR BAY


Заполним пропуски с помощью функции, которая вычисляет среднее значение результата деления значений в столбце `total_rooms` на значения столбца `total_bedrooms` в заполненных строках, затем запоняет пропуск произведением этого среднего значения и значения столбца `total_rooms` в этой строке, округлённом до целого значения. 

In [8]:
def fill_missing_total_bedrooms(df):
    avg_ratio = df.select((F.col('total_rooms') / F.col('total_bedrooms')).alias('ratio')).agg({'ratio': 'avg'}).first()[0]
    df_filled = df.withColumn('total_bedrooms', F.round(F.when(F.col('total_bedrooms').isNull(), F.col('total_rooms') / avg_ratio).otherwise(F.col('total_bedrooms'))))
    
    return df_filled

In [9]:
df = fill_missing_total_bedrooms(df)

Проверяем, что пропуски заполнены.

In [10]:
missing_values = df.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in df.columns])

missing_values.toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,0,0,0,0,0,0


Разделяем признаки по типу, отдельно выделяем целевой признак.

In [11]:
categorical_cols = 'ocean_proximity'
numerical_cols = [c for c in df.columns if c != 'median_house_value' and c != 'ocean_proximity']
target = 'median_house_value'

Преобразуем колонку `ocean_proximity`, содержащую текстовые значения из категориального типа в числовой техникой One hot encoding, перед этим трансформируем категориальные признаки с помощью трансформера StringIndexer. 

In [12]:
stringIndexer = StringIndexer(inputCol=categorical_cols, outputCol=categorical_cols+'_idx') 

df = stringIndexer.fit(df).transform(df)

cols = [c for c in df.columns if c.startswith(categorical_cols)]
df.select(cols).show(3)

                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 3 rows



In [13]:
encoder = OneHotEncoder(inputCol='ocean_proximity_idx', outputCol=categorical_cols+'_ohe')
df = encoder.fit(df).transform(df)

cols = [c for c in df.columns if c.startswith(categorical_cols)]
df.select(cols).show(3)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



Объединяем признаки в один вектор с помощью VectorAssembler:

In [14]:
categorical_assembler = \
        VectorAssembler(inputCols=['ocean_proximity_ohe'], outputCol="categorical_features")
df = categorical_assembler.transform(df)

In [15]:
pd.DataFrame(df.take(3),
             columns=df.columns)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe,categorical_features
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)"
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)"
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)"


Трансформируем числовые признаки.

In [16]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df = numerical_assembler.transform(df)

In [17]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df)

                                                                                

In [18]:
df.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double'),
 ('ocean_proximity', 'string'),
 ('ocean_proximity_idx', 'double'),
 ('ocean_proximity_ohe', 'vector'),
 ('categorical_features', 'vector'),
 ('numerical_features', 'vector'),
 ('numerical_features_scaled', 'vector')]

Объединяем трансформированные категорийные и числовые признаки с помощью VectorAssembler.

In [19]:
all_features = ['categorical_features', 'numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol="features")
df = final_assembler.transform(df)

df.select(F.col('categorical_features'), F.col('numerical_features_scaled')).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



## Обучение моделей

### Строим модель регрессии на всех данных из файла

#### Разделим данные на выборки

In [20]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

                                                                                

16418 4222


                                                                                

Обучаем модель линейной регрессии.

In [21]:
lr_1 = LinearRegression(labelCol=target, featuresCol='features', maxIter=10, regParam=0.1)

model_1 = lr_1.fit(train_data)

24/01/18 13:35:33 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/01/18 13:35:33 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/01/18 13:35:33 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/01/18 13:35:33 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Сохраним трансформированную таблицу с колонкой предсказания первой модели.

In [22]:
predictions_1 = model_1.transform(test_data)

predictedLabes_1 = predictions_1.select("median_house_value", "prediction")
predictedLabes_1.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0| 152790.2983901268|
|           50800.0| 215669.5624681404|
|           58100.0| 143288.1240519723|
|           68400.0| 132090.6853544712|
|           72200.0|163040.97519626655|
|           67000.0|154688.04109336343|
|           81300.0|151648.88854959887|
|           70500.0| 163063.6838059891|
|           60000.0| 144079.7293307637|
|          109400.0|170361.78454652987|
|           74100.0|151317.40605343832|
|           74700.0|167066.03992267605|
|           90000.0|208698.04859960778|
|          104200.0|199437.58549395436|
|           74100.0| 156257.0650897557|
|           67500.0|147619.40096757188|
|          103100.0| 47774.62647579424|
|           92500.0|165485.98441965878|
|          128100.0|221370.14637060184|
|           99600.0|186255.79779619025|
+------------------+------------------+
only showing top 20 rows



                                                                                

Оценим качество алгоритмов регрессии, используем трансформер RegressionEvaluator, который работает с DataFrame API и имеет набор метрик для подсчёта.

In [23]:
evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction')

rmse_1 = evaluator.evaluate(predictions_1, {evaluator.metricName: 'rmse'})
mae_1 = evaluator.evaluate(predictions_1, {evaluator.metricName: 'mae'})
r2_1 = evaluator.evaluate(predictions_1, {evaluator.metricName: 'r2'})

print("RMSE:", rmse_1)
print("MAE:", mae_1)
print("R2:", r2_1)

                                                                                

RMSE: 68486.87949972406
MAE: 49813.172169881334
R2: 0.6535567081263631


                                                                                

### Строим модель регрессии только на числовых признаках

In [24]:
lr_2 = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled', maxIter=10, regParam=0.1)

model_2 = lr_2.fit(train_data)

In [25]:
predictions_2 = model_2.transform(test_data)

predictedLabes_2 = predictions_2.select("median_house_value", "prediction")
predictedLabes_2.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101895.56527095195|
|           50800.0|184435.35130482772|
|           58100.0|110634.92538774107|
|           68400.0| 80724.77041470027|
|           72200.0|129195.53439829359|
|           67000.0|121117.32195687527|
|           81300.0| 117143.8093772633|
|           70500.0|129406.32919594925|
|           60000.0|111918.56237223698|
|          109400.0|117923.20420971466|
|           74100.0|118874.63206190336|
|           74700.0|133892.75410843687|
|           90000.0|175110.77771594562|
|          104200.0|165771.86226405064|
|           74100.0| 122400.7697408311|
|           67500.0|113396.20200956007|
|          103100.0| -7203.68965132162|
|           92500.0|139822.45041208668|
|          128100.0|190792.16844343068|
|           99600.0|151971.89274652116|
+------------------+------------------+
only showing top 20 rows



In [26]:
evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction')

rmse_2 = evaluator.evaluate(predictions_2, {evaluator.metricName: 'rmse'})
mae_2 = evaluator.evaluate(predictions_2, {evaluator.metricName: 'mae'})
r2_2 = evaluator.evaluate(predictions_2, {evaluator.metricName: 'r2'})

print("RMSE:", rmse_2)
print("MAE:", mae_2)
print("R2:", r2_2)

                                                                                

RMSE: 69219.86034761296
MAE: 50777.544532438216
R2: 0.6461014064294923


## Анализ результатов

Сведём полученные данные в одной таблице.

In [27]:
metrics = pd.DataFrame(data=[('all_features', round(rmse_1, 3), round(mae_1, 3), round(r2_1, 3)),
                            ('numerical_features', round(rmse_2, 3), round(mae_2, 3), round(r2_2, 3))],
                            columns=['model', 'RMSE', 'MAE', 'R2'])
metrics

Unnamed: 0,model,RMSE,MAE,R2
0,all_features,68486.879,49813.172,0.654
1,numerical_features,69219.86,50777.545,0.646


Остановливаем SparkSession.

In [28]:
spark.stop()

Метрики RMSE и MAE должны стремиться к минимальному значению, а метрика R2 напротив, должна стремится к единице. 

Как видно из таблицы модель, обученная на всех данных имеет лучшие метрики качества, чем модель, обученная на только на числовых признаках. 

Поэтому можно сделать вывод, что модель лучше обучается на всех доступных признаках.