<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Подготовка-данных" data-toc-modified-id="Подготовка-данных-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Подготовка данных</a></span></li><li><span><a href="#Обучение-моделей" data-toc-modified-id="Обучение-моделей-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Обучение моделей</a></span></li><li><span><a href="#Анализ-результатов" data-toc-modified-id="Анализ-результатов-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Анализ результатов</a></span></li></ul></div>

# Предсказание стоимости жилья

В проекте нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Необходимо обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используются метрики RMSE, MAE и R2.

**Описание датасета:**<br>

`longitude` — широта;<br>
`latitude` — долгота;<br>
`housing_median_age` — медианный возраст жителей жилого массива;<br>
`total_rooms` — общее количество комнат в домах жилого массива;<br>
`total_bedrooms` — общее количество спален в домах жилого массива;<br>
`population` — количество человек, которые проживают в жилом массиве;<br>
`households` — количество домовладений в жилом массиве;<br>
`median_income` — медианный доход жителей жилого массива;<br>
`median_house_value` — медианная стоимость дома в жилом массиве;<br>
`ocean_proximity` — близость к океану.<br>

## Подготовка данных

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql.functions import round

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

In [2]:
RANDOM_SEED = 12345

**Инициализируем локальную Spark-сессию.**

In [3]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing") \
                    .getOrCreate()

**Читаем содержимое файла.**

In [4]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)

                                                                                

In [5]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

**Типы данных колонок датасета.**

In [6]:
df.printSchema() 

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Исключим из датасета столбцы `longitude` и `latitude`.

Изменим форматы столбцов.

In [8]:
df.select('median_income').describe().toPandas()

                                                                                

Unnamed: 0,summary,median_income
0,count,20640.0
1,mean,3.8706710029070246
2,stddev,1.899821717945263
3,min,0.4999
4,max,15.0001


In [9]:
exclude = ['ocean_proximity', 'median_income']
selected_columns = [col for col in df.columns if col not in exclude]
for column in selected_columns:
    df = df.withColumn(column, F.col(column).cast(IntegerType()))
df.printSchema() 

root
 |-- longitude: integer (nullable = true)
 |-- latitude: integer (nullable = true)
 |-- housing_median_age: integer (nullable = true)
 |-- total_rooms: integer (nullable = true)
 |-- total_bedrooms: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- households: integer (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: integer (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Проверим дубликаты.

In [10]:
print('Количество записей в изначальном датасете:', df.count())
print('Количество уникальных записей:', df.distinct().count())

Количество записей в изначальном датасете: 20640




Количество уникальных записей: 20640


                                                                                

Дубликаты отсутсвуют.

Проверим пропуски.

In [11]:
columns = df.columns
for column in columns:
    check_col = F.col(column).isNull()
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [12]:
columns = df.columns
for column in columns:
    check_col = df.filter(isnan(col(column)))
    print(column, check_col.count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Заменим пропуски на средние значения.

In [13]:
mean = np.round(df.select(F.mean('total_bedrooms')).collect()[0][0])
df = df.na.fill({'total_bedrooms': mean})
print('total_bedrooms', df.filter(F.col('total_bedrooms').isNull()).count())

total_bedrooms 0


Создадим столбцы с дополнительными признаками.

In [14]:
df = df.withColumn('rooms_per_household', round(F.col('total_rooms')/ F.col('households'),4))
df = df.withColumn('population_in_household', round(F.col('population') / F.col('households'),4))
df = df.withColumn('bedroom_index', round(F.col('total_bedrooms') / F.col('total_rooms'),4))

In [15]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-----------------------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|rooms_per_household|population_in_household|bedroom_index|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-----------------------+-------------+
|     -122|      37|                41|        880|           129|       322|       126|       8.3252|            452600|       NEAR BAY|             6.9841|                 2.5556|       0.1466|
|     -122|      37|                21|       7099|          1106|      2401|      1138|       8.3014|            358500|       NEAR BAY|             6.2381|                 2.1098|       0.1558|
|     -122|      37|

In [16]:
df.printSchema() 

root
 |-- longitude: integer (nullable = true)
 |-- latitude: integer (nullable = true)
 |-- housing_median_age: integer (nullable = true)
 |-- total_rooms: integer (nullable = true)
 |-- total_bedrooms: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- households: integer (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: integer (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- population_in_household: double (nullable = true)
 |-- bedroom_index: double (nullable = true)



Разделим призанки на категориальные, числовые и таргет.

In [17]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['housing_median_age',
                   'total_rooms', 
                   'total_bedrooms',
                   'population',
                   'households',
                   'median_income',
                   'rooms_per_household',
                   'population_in_household',
                   'bedroom_index'
                  ]
target = 'median_house_value' 

Разделим выборки.

In [18]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

[Stage 57:>                                                         (0 + 1) / 1]

16431 4209


                                                                                

 Трансформируем категориальный признак в числовое представление. 

In [19]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 
ind = indexer.fit(train_data)

train_data = ind.transform(train_data)
test_data = ind.transform(test_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
train_data.select(cols).show(3) 

cols = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]
test_data.select(cols).show(3) 

                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows



Создадим OHE-кодирование.

In [20]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
enc = encoder.fit(train_data)

train_data = enc.transform(train_data)
test_data = enc.transform(test_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
train_data.select(cols).show(3) 

cols = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]
test_data.select(cols).show(3) 

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



In [21]:
categorical_assembler = \
        VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")
train_data = categorical_assembler.transform(train_data)
test_data = categorical_assembler.transform(test_data)

Трансформируем числовые признаки.

In [22]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)

standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
sclr = standardScaler.fit(train_data)
train_data = sclr.transform(train_data)
test_data = sclr.transform(test_data)

                                                                                

In [23]:
train_data.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'rooms_per_household',
 'population_in_household',
 'bedroom_index',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'categorical_features',
 'numerical_features',
 'numerical_features_scaled']

In [24]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

train_data.select(all_features).show(3) 
test_data.select(all_features).show(3) 

                                                                                

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [0.95175529883372...|
|       (4,[2],[1.0])|     [0.95175529883372...|
|       (4,[2],[1.0])|     [1.03106824040320...|
+--------------------+-------------------------+
only showing top 3 rows

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [1.34832000668110...|
|       (4,[2],[1.0])|     [1.82419765609797...|
|       (4,[2],[1.0])|     [1.82419765609797...|
+--------------------+-------------------------+
only showing top 3 rows



**Вывод**

- в датасете отсутсвуют  дубликаты
- 207 пропущенных значение в столбце `total_bedrooms` заменены на среднее значение
- удалены столбцы с широтой и долготой
- добавлены столбцы с дополнительными признаками: `rooms_per_household`, `population_in_household`, `bedroom_index`
- произвели трансформацию категориальных и числовых признаков

## Обучение моделей

Разделим выборки.

In [25]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

Обучим модель на всех признаках.

In [26]:
lr_all = LinearRegression(labelCol=target, featuresCol='features')
model_all= lr_all.fit(train_data) 

23/03/25 10:56:07 WARN Instrumentation: [89e0db79] regParam is zero, which might cause numerical instability and overfitting.
23/03/25 10:56:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/03/25 10:56:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/03/25 10:56:09 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/03/25 10:56:09 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Обучим модель только на числовых признаках.

In [27]:
lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
model_num = lr_num.fit(train_data) 


23/03/25 10:56:12 WARN Instrumentation: [7b668689] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

**Вывод**

- обучили две модели Линейной регрессии: на всех признаках (категориальные и числовые), и только на числовых.

## Анализ результатов

Получим предсказания для модели, обученной на всех признаках (числовых и категориальных).

In [28]:
predictions_all = model_all.transform(test_data)

Рассчитаем метрики.

In [29]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_all)
print("RMSE (все призанки) = %f" % rmse)

evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions_all)
print("MAE (все призанки) = %f" % mae)

evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions_all)
print("R2 (все призанки) = %f" % r2)

RMSE (все призанки) = 67049.601153
MAE (все призанки) = 48903.774295
R2 (все призанки) = 0.650634


Получим предсказания для модели, обученной только на числовых признаках.

In [30]:
predictions_num = model_num.transform(test_data)

Рассчитаем метрики.

In [31]:
evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_num)
print("RMSE (числовые признаки) = %f" % rmse)

evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions_num)
print("MAE (числовые признаки) = %f" % mae)

evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions_num)
print("R2 (числовые признаки)= %f" % r2)

RMSE (числовые признаки) = 72744.280699
MAE (числовые признаки) = 53644.787947
R2 (числовые признаки)= 0.588769


**Вывод**

- модель, обученная на всех признаках (категориальных и чесловых) показала лучшие результаты:<br>

    RMSE = 67049.601153<br>
    MAE = 48903.774295<br>
    R2 = 0.650634<br>

In [32]:
spark.stop()