# Предсказание стоимости жилья с помощью PySpark

Нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году.  
И предсказать медианную стоимость дома в жилом массиве.
Метрики: `RMSE`, `MAE`, `R2`.

# Подготовка данных

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
# from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.regression import LinearRegression

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import OneHotEncoder    


In [2]:
RANDOM_SEED = 2022

In [3]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Practice project") \
                    .getOrCreate()

In [4]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

                                                                                

In [5]:
# Посмотрим на столбцы и их типы данных
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



В колонках датасета содержатся следующие данные:
* `'longitude'` — широта;
* `'latitude'` — долгота;
* `'housing_median_age'` — медианный возраст жителей жилого массива;
* `'total_rooms'` — общее количество комнат в домах жилого массива;
* `'total_bedrooms'` — общее количество спален в домах жилого массива;
* `'population'` — количество человек, которые проживают в жилом массиве;
* `'households'` — количество домовладений в жилом массиве;
* `'median_income'` — медианный доход жителей жилого массива;
* `'median_house_value'` — медианная стоимость дома в жилом массиве;
* `'ocean_proximity'` — близость к океану.

In [6]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [7]:
# Поиск пропусков в данных
for column in df.columns:
    check_col = F.col(column).isNull()
    print(column, df.filter(check_col == True).count())

                                                                                

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [8]:
# Можем заполнить пропуски "Missing" или -999
# df = df.na.fill("Missing", ["total_bedrooms"])

In [9]:
# Можем заполнить пропуски total_bedrooms медианным значением 

# imputer = Imputer(
#           inputCols= ["total_bedrooms"],
#           outputCols = ["total_bedrooms_imputed"]
#     ).setStrategy("median")


# df = imputer.fit(df).transform(df)

<!-- # # Создадим оконную функцию
# window = Window().partitionBy("households")

# # Заполним пропущенные значения средним по группам ""
# data_filled = data.withColumn("mean", F.mean("total_bedrooms").over(window)) \
#               .withColumn("total_bedrooms", F.when(F.col("total_bedrooms").isNull(),  F.col("mean")) \
#               .otherwise(F.col("total_bedrooms"))) \
#               .drop("mean") -->

In [10]:
# Попробуем заполнить по группе

# Создаем окно
window = Window().partitionBy(['ocean_proximity'])
# Создаем столбец со средним в окне
df = df.withColumn("total_bedrooms_mean_by_ocean_p",F.avg(F.col('total_bedrooms')).over(window))
# Создаем условие, что если значение в total_bedrooms isNull , 
# то берем значение из total_bedrooms_mean_by_ocean_p
# В противном случае оставляем как есть
df = df.withColumn("total_bedrooms_filled",
                   F.when(F.col('total_bedrooms').isNull(),F.col('total_bedrooms_mean_by_ocean_p'))
                   .otherwise(F.col('total_bedrooms')))

In [11]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------------+---------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_bedrooms_mean_by_ocean_p|total_bedrooms_filled|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------------+---------------------+
|  -118.32|   33.35|              27.0|     1675.0|         521.0|     744.0|     331.0|       2.1579|          450000.0|         ISLAND|                         420.4|                521.0|
|  -118.33|   33.34|              52.0|     2359.0|         591.0|    1100.0|     431.0|       2.8333|          414700.0|         ISLAND|                         420.4|                591.0|
|  -118.32|   33.33|              52.0|     2

                                                                                

In [12]:
# OHE
indexer = StringIndexer(inputCols=['ocean_proximity'], 
                        outputCols=['ocean_proximity_idx']) 
df = indexer.fit(df).transform(df)

encoder = OneHotEncoder(inputCols=['ocean_proximity_idx'],
                        outputCols=['ocean_proximity_ohe'])
df = encoder.fit(df).transform(df)

                                                                                

In [13]:
df.select('ocean_proximity_idx').distinct().show()



+-------------------+
|ocean_proximity_idx|
+-------------------+
|                0.0|
|                1.0|
|                4.0|
|                3.0|
|                2.0|
+-------------------+



                                                                                

In [14]:
df.select('ocean_proximity_ohe').distinct().show()



+-------------------+
|ocean_proximity_ohe|
+-------------------+
|      (4,[2],[1.0])|
|      (4,[0],[1.0])|
|          (4,[],[])|
|      (4,[3],[1.0])|
|      (4,[1],[1.0])|
+-------------------+



                                                                                

In [15]:
# Создадим список с числовыми признаками
num_cols = ['longitude', 'latitude', 'housing_median_age',
        'total_rooms', 'population', 'households', 
        'median_income',
        'total_bedrooms_filled']

target = 'median_house_value'

# cat_cols = ['ocean_proximity_ohe']

In [16]:
# Соберем числовые признаки в один вектор
numerical_assembler = VectorAssembler(inputCols=num_cols, outputCol="numerical_features")
df = numerical_assembler.transform(df) 

# Стандартизируем эти признаки
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df) 

                                                                                

In [17]:
# Объединим числовые признаки с категориальными
all_features = ['ocean_proximity_ohe','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3) 

+-------------------+-------------------------+
|ocean_proximity_ohe|numerical_features_scaled|
+-------------------+-------------------------+
|          (4,[],[])|     [-59.055715770325...|
|          (4,[],[])|     [-59.060706956580...|
|          (4,[],[])|     [-59.055715770325...|
+-------------------+-------------------------+
only showing top 3 rows



In [18]:
# train, test split

train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 



16530 4110


                                                                                

# Обучение моделей

In [19]:
# Обучим модель на всех данных. Числовых и категориальных
lr = LinearRegression(featuresCol = 'features', labelCol=target, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)

22/04/02 08:27:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/02 08:27:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [20]:
# Функция оценки модели
def model_eval(model,test_data,target_col_name):
    predictions = model.transform(test_data)
    predictedLabes = predictions.select(target_col_name, "prediction")
    evaluator = RegressionEvaluator(labelCol=target_col_name, predictionCol="prediction")
    rmse = evaluator.evaluate(predictedLabes,{evaluator.metricName: "rmse"})
    r2 = evaluator.evaluate(predictedLabes, {evaluator.metricName: "r2"})
    mae = evaluator.evaluate(predictedLabes, {evaluator.metricName: "mae"})
    return rmse,r2,mae

In [21]:
# Оценим модель и сохраним метрики в переменные для дальнейшего сравнения
rmse,r2,mae = model_eval(lr_model,test_data,target)
print(rmse,r2,mae)



67694.8674479923 0.6673098128867803 49534.91211901142


                                                                                

In [22]:
# Обучим другую модель, только на числовых признаках
lr2 = LinearRegression(featuresCol = 'numerical_features_scaled', labelCol=target, regParam=0.3, elasticNetParam=0.8)
lr2_model = lr2.fit(train_data)

                                                                                

In [23]:
# Оценим вторую модель и сохраним метрики
rmse2,r22,mae2 = model_eval(lr2_model,test_data,target)
print(rmse2,r22,mae2)



68501.72462810247 0.6593318609468255 50628.9271515048


                                                                                

In [24]:
# Построим сводную таблицу по метрикам
scores = {"RMSE":[rmse,rmse2],
         "r2":[r2,r22],
         "MAE":[mae,mae2],
         }
scores = pd.DataFrame(scores,index=['lr1','lr2']).T


# Анализ результатов

In [25]:
scores

Unnamed: 0,lr1,lr2
RMSE,67694.867448,68501.724628
r2,0.66731,0.659332
MAE,49534.912119,50628.927152


**RMSE** - это квадратный корень среднеквадратичной ошибки модели. Чем больше этот коэф., тем "дальше" предсказание модели находится от тестовых данных. При этом сам коэф. описывает те единицы, в которых и нужно предсказать регрессию. Т.е. если это денежные единицы, то коэф. 67694 говорит о денежных единицах.

**R2** - коэффициент детерминации. Чем ближе к 1, тем лучше предсказывает модель. Коэф. 0.66 у первой модели говорит о том, что среднеквадратичная ошибка модели на 44% меньше, чем MSE предсказания по среднему.

**MAE** - это средняя абсолютная ошибка (отклонение). В ней в отличии от MSE не происходит возведения в квадрат,а вместо этого беруться значения модуля разницы между предсказанным и фактическим значением. Тоже говорит о тех значениях, в которых и необходимо предсказать регрессию. Чем ближе к 0, тем лучше.

**Вывод**  
В нашем случае по всем метрикам первая модель, в которой использовались категориальные признаки, лучше чем вторая, в которой были только числовые.