## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:

    longitude — широта;
    latitude — долгота;
    housing_median_age — медианный возраст жителей жилого массива;
    total_rooms — общее количество комнат в домах жилого массива;
    total_bedrooms — общее количество спален в домах жилого массива;
    population — количество человек, которые проживают в жилом массиве;
    households — количество домовладений в жилом массиве;
    median_income — медианный доход жителей жилого массива;
    median_house_value — медианная стоимость дома в жилом массиве;
    ocean_proximity — близость к океану.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.
Инструкция по выполнению проекта

   1. Инициализируйте локальную Spark-сессию.
   2. Прочитайте содержимое файла /datasets/housing.csv.
   3. Выведите типы данных колонок датасета. Используйте методы pySpark.
   4. Выполните предобработку данных:
     
       * Исследуйте данные на наличие пропусков и заполните их, выбрав значения по своему усмотрению.
       * Преобразуйте колонку с категориальными значениями техникой One hot encoding.
   5. Постройте две модели линейной регрессии на разных наборах данных: 
            - используя все данные из файла;
            - используя только числовые переменные, исключив категориальные.
Для построения модели используйте оценщик LinearRegression из библиотеки MLlib.
Сравните результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2. Сделайте выводы.

### Подготовка данных

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2022



In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("California 1990 housing project") \
                    .getOrCreate()

df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

                                                                                

Спарк-сессия создана, датасет загружен. Теперь выведем типы данных колонок датасета с помощью методов pySpark. Почти все колонки имеют числовой тип данных double, и только 'ocean_proximity' - строковые значения.

In [3]:
df.printSchema()
print(pd.DataFrame(df.dtypes, columns=['column', 'type']).head(10))

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


Теперь проверим данные на пропуски.

In [4]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [5]:
columns = df.columns

for column in columns:
    check_col = F.col(column).isNull()
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


<font color='blue'><b>Комментарий ревьюера: </b></font> ✔️\
<font color='green'>Обнаружены пропуски!</font>

Обнаружили 207 пропусков в столбце total_bedrooms. Заполним их медианным значением по датасету.

In [6]:
median = df.approxQuantile('total_bedrooms', [0.5], 0)[0]
df = df.na.fill({'total_bedrooms': median})

In [7]:
for column in columns:
    check_col = F.col(column).isNull()
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Перейдем к кодированию. У нас есть одна колонка с категориальными значениями - ocean_proximity. Проведем для нее OneHot encoding. Но сначала нам понадобится трансформировать ее с помощью трансформера StringIndexer.

In [8]:
categorical_cols = ['ocean_proximity']
numerical_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = "median_house_value"


Сразу создадим датафрейм, в котором не будет категориальных признаков.

In [9]:
df_num = df.drop('ocean_proximity')

print(df_num)

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]


In [10]:
train, test = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train.count(), test.count())

                                                                                

16418 4222


In [11]:
train_num, test_num = df_num.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_num.count(), test_num.count())

16418 4222


In [12]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols], handleInvalid="keep") 
indexxer = indexer.fit(train)

                                                                                

In [13]:
train = indexxer.transform(train)

In [14]:
test = indexxer.transform(test)

In [15]:
cols = [c for c in train.columns for i in categorical_cols if (c.startswith(i))]
train.select(cols).show(5)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 5 rows



In [16]:
cols = [c for c in test.columns for i in categorical_cols if (c.startswith(i))]
test.select(cols).show(5)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 5 rows



In [17]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], 
                        outputCols=[c+'_ohe' for c in categorical_cols])
encoder_model = encoder.fit(train)

In [18]:
train = encoder_model.transform(train)
test = encoder_model.transform(test)

In [19]:
cols = [c for c in test.columns for i in categorical_cols if (c.startswith(i))]
test.select(cols).show(3)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



In [20]:
cat_assembler = \
        VectorAssembler(inputCols=["ocean_proximity_ohe"],
                                       outputCol="categorical_features")
train = cat_assembler.transform(train)

In [21]:
test_assembler = \
        VectorAssembler(inputCols=["ocean_proximity_ohe"],
                                       outputCol="categorical_features")
test = test_assembler.transform(test)

Шаг с categorical_assembler наверное даже не нужен, так как у нас лишь один категориальный признак.

Перейдем к подготовке числовых признаков. Начиная с этого шага будем дублировать шаги для обоих датафреймов сразу.

In [22]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
train = numerical_assembler.transform(train)
train_num = numerical_assembler.transform(train_num)

In [23]:
test = numerical_assembler.transform(test)
test_num = numerical_assembler.transform(test_num)

In [24]:
StandardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
scaler = StandardScaler.fit(train)
scaler_num = StandardScaler.fit(train_num)


                                                                                

In [25]:
train = scaler.transform(train)
train_num = scaler_num.transform(train_num)

In [26]:
test = scaler.transform(test)
test_num = scaler_num.transform(test_num)

In [27]:
print(train.columns)
print(train_num.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'categorical_features', 'numerical_features', 'numerical_features_scaled']
['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'numerical_features', 'numerical_features_scaled']


In [29]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
train = final_assembler.transform(train)

train.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.952887791441...|
|       (4,[2],[1.0])|     [-61.927977100733...|
|       (4,[2],[1.0])|     [-61.913030686308...|
+--------------------+-------------------------+
only showing top 3 rows



In [30]:
test = final_assembler.transform(test)

In [31]:
test.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.927977100733...|
|       (4,[2],[1.0])|     [-61.893102133741...|
|       (4,[2],[1.0])|     [-61.883137857458...|
+--------------------+-------------------------+
only showing top 3 rows



In [32]:
features_num = ['numerical_features_scaled']

final_assembler_num = VectorAssembler(inputCols=features_num, 
                                  outputCol="features") 
train_num = final_assembler_num.transform(train_num)

train_num.select(features_num).show(3)

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.952887791441...|
|     [-61.927977100733...|
|     [-61.913030686308...|
+-------------------------+
only showing top 3 rows



In [33]:
test_num = final_assembler_num.transform(test_num)

### Обучение моделей

Создадим модель линейной регрессии.

In [34]:
lr = LinearRegression(labelCol=target, featuresCol='features', maxIter=10, regParam=0.3, elasticNetParam=0.8)

lrModel = lr.fit(train)

24/03/24 18:37:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/03/24 18:37:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [35]:
lrModelNum = lr.fit(train_num)

Проверим показатели модели на тренировочных данных.

In [36]:
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 69339.545699
r2: 0.637291


In [37]:
trainingSummaryNum = lrModelNum.summary
print("RMSE: %f" % trainingSummaryNum.rootMeanSquaredError)
print("r2: %f" % trainingSummaryNum.r2)

RMSE: 69927.877248
r2: 0.631110


In [39]:
predictions = lrModel.transform(test)
predictions.show()
predictedLabes = predictions.select('median_house_value', 'prediction')
predictedLabes.show(11)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------------------+--------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|categorical_features|  numerical_features|numerical_features_scaled|            features|        prediction|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------------------+--------------------+------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|     

In [40]:
predictions_num = lrModelNum.transform(test_num)

predictedLabesNum = predictions_num.select('median_house_value', 'prediction')
predictedLabesNum.show(10)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101298.92550544953|
|           50800.0|185022.49009660957|
|           58100.0|109505.56520016352|
|           68400.0| 78896.52190991072|
|           72200.0|129260.54787339922|
|           67000.0|119501.86958910758|
|           81300.0|117718.90509561636|
|           70500.0| 133138.6735589006|
|           60000.0|113262.82711809129|
|          109400.0|118010.21779444395|
+------------------+------------------+
only showing top 10 rows



Создадим оценщика регрессии для вычисления требуемых метрик.

In [41]:
evaluator = RegressionEvaluator(
labelCol="median_house_value", metricName="rmse")
rmse = evaluator.evaluate(predictions)
rmse_num = evaluator.evaluate(predictions_num)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("Root Mean Squared Error (RMSE) on test_data_num = %g" % rmse_num)

Root Mean Squared Error (RMSE) on test data = 68985.5
Root Mean Squared Error (RMSE) on test_data_num = 69179.1


In [42]:
evaluator = RegressionEvaluator(
labelCol="median_house_value", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
mae_num = evaluator.evaluate(predictions_num)

print("MAE on test data = %g" % mae)
print("MAE on test_data_num = %g" % mae_num)

MAE on test data = 50088.6
MAE on test_data_num = 50873.8


In [43]:
evaluator = RegressionEvaluator(
labelCol="median_house_value", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
r2_num = evaluator.evaluate(predictions_num)
print("r2 on test data = %g" % r2)
print("r2 on test_data_num = %g" % r2_num)

r2 on test data = 0.648494
r2 on test_data_num = 0.646518


In [44]:
spark.stop()

### Анализ результатов

Root Mean Squared Error (RMSE) on test data = 68985.5


Root Mean Squared Error (RMSE) on test_data_num = 69179.1


Чем меньше среднеквадратичное отклонение, тем лучше модель регрессии. Соответственно, лучше справляется модель, учитывающая категориальный признак.

MAE on test data = 50088.6

MAE on test_data_num = 50873.8

Среднее абсолютное отклонение тоже ниже у модели, учитывающей категориальный признак.


r2 on test data = 0.648494

r2 on test_data_num = 0.646518

Значения метрики R2 в целом удовлетворительные, но на всех признаках сразу модель показывает результат на 0,02 лучше, чем только на числовых признаках.

Вывод, который можно сделать: при прогнозе стоимости недвижимости такой признак, как удаленность от океана (выраженный категориально), играет существенную роль, и при его удалении из модели ее качество падает.