# Предсказание стоимости жилья

## Подготовка данных

### Инициализация spark-сессии

Импортирую нужные модули для инициализации  spark-сессии, чтения и манипуляции данных, трансформации признаков, алгоритм логистической регрессии из pyspark.ml.classification и модули из pyspark.ml.evaluation для оценки качества модели:

In [1]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2022

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local")\
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

### Прочитение содержимого файла

Прочитаю файл `/datasets/housing.csv.` методом load():

In [3]:
df_housing = spark.read.load('/datasets/housing.csv', format='csv', sep=',', inferSchema=True, header='true')
print('Схема таблицы "df_housing":')
df_housing.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

Схема таблицы "df_housing":
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

In [4]:
print(df_housing.show(10))

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [5]:
print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']).head(10)) 

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


Таблица содержит 10 столбцов с типами `double` и `string`

Исследую базовые описательные статистики таблицы:

In [6]:
df_housing.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Датасет содержит 20640 строк. Максимальная медианная стоимость дома - 500 тысяч долларов.

### Предобработка данных

#### Исследование пропусков

In [7]:
df_housing.show(100)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [8]:
columns = df_housing.columns

for column in columns:
    check_col = F.col(column).isNull()
    print(column, df_housing.filter(check_col).count())
    
from pyspark.sql.functions import col,sum
df_housing.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_housing.columns)).show()

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



В данных есть 207 пропущенных значений в столбце `total_bedrooms`. Вероятнее всего данные пропуски относятся к квартирам-студиям, поэтому пропуски заполню нулями

In [9]:
df_housing = df_housing.na.fill(0)

#### Преобразование данных

Создаю несколько новых столбцов с признаками, которые могут быть подходящими для модели машинного обучения:

* `rooms_per_household` - отношение количества комнат total_rooms к количеству домовладений households.
* `population_in_household` - отношение количества жителей population к количеству домовладений households.
* `bedroom_index` - отношение количества спален total_bedrooms к общему количеству комнат total_rooms.

In [10]:
df_housing = df_housing.withColumn('rooms_per_household', F.col('total_rooms') / F.col('households')).withColumn('population_in_household', F.col('population') / F.col('households')).withColumn('bedroom_index', F.col('total_bedrooms') / F.col('total_rooms'))

df_housing.printSchema()

root
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- housing_median_age: double (nullable = false)
 |-- total_rooms: double (nullable = false)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = false)
 |-- households: double (nullable = false)
 |-- median_income: double (nullable = false)
 |-- median_house_value: double (nullable = false)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- population_in_household: double (nullable = true)
 |-- bedroom_index: double (nullable = true)



In [11]:
exclude = ['longitude', 'latitude']
selected_columns = [col for col in df_housing.columns if col not in exclude]
path = './california_housing_w_features'


# df_housing.select(selected_columns).write.format('csv').save(path,мусещк header=True)

In [12]:
X = df_housing

X_train, X_test = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)

Трансформирую категориальные признаки с помощью трансформера StringIndexer

In [13]:
from pyspark.ml.feature import OneHotEncoder

categorical_cols = ['ocean_proximity']
categorical_indice = ['ocean_proximity_idx']
categorical_codes = 'categorical_features'

numerical_cols = list(set(X.columns) - set(categorical_cols) - set(['median_house_value']))
# Ниже из сообщения оь ошибке обнаружилось, что в тестовой выборке встречается редкий таргет, которого нет в тренировочной выборке.
# По умолчанию при встрече такого значения индексер выдает ошибку. Чтобы этого избежать, я добавила отдельную категорию handleInvalid='keep', которая будет сигнализировать о том,
# что такого модель еще не видела, при этом модель будет выдавать хоть-какие-то предсказания для таких объектов (но не факт, что корректные).
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=categorical_indice,
                        handleInvalid='keep') 

indexer = indexer.fit(X_train)

X_train = indexer.transform(X_train)
X_test = indexer.transform(X_test)

                                                                                

In [14]:
print(numerical_cols)

['total_rooms', 'housing_median_age', 'bedroom_index', 'rooms_per_household', 'total_bedrooms', 'median_income', 'longitude', 'population', 'latitude', 'households', 'population_in_household']


Преобразую столбец с категориальными значениями техникой One hot encoding:

In [15]:
encoder = OneHotEncoder(inputCols=categorical_indice, outputCols=['categorical_features'], handleInvalid='keep') 
encoder = encoder.fit(X_train)

assembler_numerical = VectorAssembler(inputCols=numerical_cols, outputCol='numerical_features')
assembler_all = VectorAssembler(inputCols=numerical_cols + ['categorical_features'], outputCol='all_features')

In [16]:
X_train = encoder.transform(X_train)
X_test = encoder.transform(X_test)

Склеиваю фичи в один столбец

In [17]:
X_train = assembler_numerical.transform(X_train)
X_test = assembler_numerical.transform(X_test)

X_train = assembler_all.transform(X_train)
X_test = assembler_all.transform(X_test)

# Обучение моделей

In [18]:
from pyspark.ml.regression import LinearRegression

model_numerical = LinearRegression(featuresCol='numerical_features', labelCol='median_house_value')
model_numerical = model_numerical.fit(X_train)

model_all = LinearRegression(featuresCol='all_features', labelCol='median_house_value')
model_all = model_all.fit(X_train)

23/02/06 18:59:27 WARN Instrumentation: [1562b62b] regParam is zero, which might cause numerical instability and overfitting.
23/02/06 18:59:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/02/06 18:59:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/02/06 18:59:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/02/06 18:59:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
23/02/06 18:59:31 WARN Instrumentation: [f30f5735] regParam is zero, which might cause numerical instability and overfitting.
23/02/06 18:59:32 WARN Instrumentation: [f30f5735] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


Применение обученных моделей для тренировочных и тестовых датасетов:

In [19]:
predictions_all_X_train = model_all.transform(X_train)
predictions_numerical_X_train = model_numerical.transform(X_train)

predictions_all_X_test= model_all.transform(X_test)
predictions_numerical_X_test = model_numerical.transform(X_test)

Измеряю метрики RMSE, MAE и R2:

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

Создаю объекты для каждой метрики

In [21]:
rmse_eval = RegressionEvaluator(labelCol='median_house_value', metricName='rmse')
mae_eval = RegressionEvaluator(labelCol='median_house_value', metricName='mae')
r2_eval = RegressionEvaluator(labelCol='median_house_value', metricName='r2')

In [22]:
print('rmse_train_all:', rmse_eval.evaluate(predictions_all_X_train))
print('rmse_numerical_train:', rmse_eval.evaluate(predictions_numerical_X_train))
print()
print('rmse_test_all:', rmse_eval.evaluate(predictions_all_X_test))
print('rmse_numerical_test:', rmse_eval.evaluate(predictions_numerical_X_test))

rmse_train_all: 68194.86599560683


                                                                                

rmse_numerical_train: 68984.8717330931

rmse_test_all: 67815.0600878124
rmse_numerical_test: 68321.40692240147


При использовании численных данных модель ошибается на 68321 долларов. При использовании всего набора фичей модель ошибается немного меньше, на 67815 долларов.

In [23]:
print('mae_train_all:', mae_eval.evaluate(predictions_all_X_train))
print('mae_numerical_train:', mae_eval.evaluate(predictions_numerical_X_train))
print()
print('mae_test_all:', mae_eval.evaluate(predictions_all_X_test))
print('mae_numerical_test:', mae_eval.evaluate(predictions_numerical_X_test))

mae_train_all: 49229.73468125497
mae_numerical_train: 50169.34263196681

mae_test_all: 49378.95435612814
mae_numerical_test: 50102.47502720972


Среднее отклонение предсказания модели от таргетов на численных признаках - 50102 долларов.
Среднее отклонение предсказания модели от таргетов на всем наборе признаков - 49378 долларов.

In [24]:
print('r2_train_all:', r2_eval.evaluate(predictions_all_X_train))
print('r2_numerical_train:', r2_eval.evaluate(predictions_numerical_X_train))
print()
print('r2_test_all:', r2_eval.evaluate(predictions_all_X_test))
print('r2_numerical_test:', r2_eval.evaluate(predictions_numerical_X_test))

r2_train_all: 0.6491678548576989
r2_numerical_train: 0.6409923189206566

r2_test_all: 0.6603202157445335
r2_numerical_test: 0.6552287834349364


Самая ваджная метрика R2, потому что она показывает насколько лучше моя модель относительно среднего. Моя модель на тренировочных выборках лучше константной на 64%, на тестовых на 65%.

На всех фичах данных метрика лучше чем только на численных. Вероятно, использовать все фичи луше, чем только численные.

# Анализ результатов

Модель, которая построена на всех признаках, работает лучше, чем модель, построенная лишь на числовых признаках. Это означает, что категориальные признаки добавляют точности модели. Переобучения в данных нет, поскольку метрики на тренировочной и тестовой выборках отличаются слабо.