## Предсказание стоимости жилья

### Описание проекта

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

### Описание данных  


В колонках датасета содержатся следующие данные:  

`longitude` — широта;  
`latitude` — долгота;  
`housing_median_age` — медианный возраст жителей жилого массива;  
`total_rooms` — общее количество комнат в домах жилого массива;  
`total_bedrooms` — общее количество спален в домах жилого массива;  
`population` — количество человек, которые проживают в жилом массиве;  
`households` — количество домовладений в жилом массиве;  
`median_income` — медианный доход жителей жилого массива;  
`median_house_value` — медианная стоимость дома в жилом массиве;  
`ocean_proximity` — близость к океану.  

### План выполнения проекта  
1. Инициализируйте локальную Spark-сессию.
2. Прочитайте содержимое файла /datasets/housing.csv.
3. Выведите типы данных колонок датасета. Используйте методы pySpark.
4. Выполните предобработку данных:
    - исследуйте данные на наличие пропусков и заполните их, выбрав значения по своему усмотрению.
    - преобразуйте колонку с категориальными значениями техникой One hot encoding.
5. Постройте две модели линейной регрессии на разных наборах данных:
используя все данные из файла;
    - используя только числовые переменные,  
    - исключив категориальные.
    Для построения модели используйте оценщик LinearRegression из библиотеки MLlib.
6. Сравните результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2. Сделайте выводы.

## Подготовка данных

### Откроем и сохраним файл данных

In [1]:
# Импортируем нужные библиотеки
import pandas as pd 
import numpy as np
import seaborn as sns
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml import Pipeline

from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

RANDOM_SEED = 30112023

Инициализируем локальную Spark-сессию:

In [2]:
spark = SparkSession.builder \
                    .appName("California Housing") \
                    .getOrCreate()

Прочитаем и сохраним содержимое файла '\/datasets/housing.csv':

In [3]:
df_housing = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

                                                                                

In [4]:
# выведем схемы датафрейма:
df_housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [5]:
# Посморим первые 10 строк датафремаЖ
df_housing.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [6]:
# Более детально изучим данные. Для этого вызовем метод describe:
df_housing.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Просмотрим значения, содержащиеся в категориальной переменной:

In [7]:
df_housing.groupBy('ocean_proximity').count().toPandas()

                                                                                

Unnamed: 0,ocean_proximity,count
0,ISLAND,5
1,NEAR OCEAN,2658
2,NEAR BAY,2290
3,<1H OCEAN,9136
4,INLAND,6551


***Вывод:***  
В датафрейме 10 столбцов и 20640 записей.  
В большинстве колонок хранятся количественные данные, кроме одной — ocean_proximity. Она хранит 5 категориальных значений. Пять жилых массивов - находятся на острове. Почти половина -9136- находится в часе от океана.
Медианный возраст жителя жилого массива 28,6. Медианная стоимость дома жилого массива: min - 14999, max - 500 001.

### Подготовка данных

Исследуем данные на наличие пропусков:

In [8]:
df_housing.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_housing.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Есть пропущенные значения в столбце `total_bedrooms`. Примерно 1%.

In [9]:
#Разделим наши наборы данных на тренировочные и тестовые.
#train, test = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)

In [10]:
# Выведем количество записей в выборках на экран при помощи метода count()
#print(train.count(), test.count())

In [11]:
# Заполним их медианными значениями:
imputer = Imputer(strategy='median', inputCols=['total_bedrooms'], outputCols=['total_bedrooms'])
#df_housing = imputer.fit(df_housing).transform(df_housing)
#train = imputer.fit(train)
#test = train.transform(test)

In [12]:
# Проверяем:
#df_housing.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_housing.columns]).show()
#test.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in test.columns]).show()

In [13]:
# Проверим статистику:
#df_housing.describe().toPandas()
#test.describe().toPandas()

Среднее и стандартное отклонение почти не изменились.

***Вывод:*** найдены пропуски в данных и заполнили их медианными значениями.

### Преобразование категориальных признаков

Преобразуем колонку с категориальными значениями техникой One hot encoding.   

Разделим колонки на два типа: числовые и текстовые, которые представляют категориальные данные.

In [14]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

Трансформируем категориальные признаки в числовое представление с помощью трансформера StringIndexer:

In [15]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 
#df_h = indexer.fit(df_housing).transform(df_housing)

In [16]:
# Проверяем:
#cols = [c for c in df_h.columns for i in categorical_cols if (c.startswith(i))]
#df_h.select(cols).show(5)

Выполним OHE-кодирование для категорий:

In [17]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
#df_h = encoder.fit(df_h).transform(df_h)

In [18]:
# Проверяем:
#cols = [c for c in df_h.columns for i in categorical_cols if (c.startswith(i))]
#df_h.select(cols).show(3)

### Преобразование числовых признаков

Для числовых признаков применим StandardScaler, чтобы сильные выбросы не смещали предсказания модели.  

Соберём вектор числовых признаков в отдельный столбец.

In [19]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
#df_h = numerical_assembler.transform(df_h) 

Применим к столбцу с вектором числовых признаков StandardScaler.

In [20]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
#df_h = standardScaler.fit(df_h).transform(df_h)

In [21]:
#Проверяем:
#df_h.printSchema()

Cобираем трансформированные категорийные и числовые признаки с помощью VectorAssembler:

In [22]:
all_features = ['ocean_proximity_ohe','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol='features')
#df_h = final_assembler.transform(df_h)

In [23]:
#Проверяем:
#df_h.select('features').show(5)
#df_h.select('numerical_features_scaled').show(5)

***Вывод:*** 
Преобразовали числовые и категориальные признаки в вектора. Создали два набора данных: один включающий все признаки, второй только количественные.

## Обучение моделей

По условиям задачи необходимо сравнить метрики RMSE, MAE, R2 на следующих вариациях датасета:  
  - со всеми признаками;  
  - без категориального признака.

Разделим наши наборы данных на тренировочные и тестовые.

In [24]:
train, test = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)

In [25]:
# Выведем количество записей в выборках на экран при помощи метода count()
print(train.count(), test.count())

16538 4102


In [26]:
# Создаем список, куда будем вносить результаты
data_table = [
    ['features_used', 'RMSE', 'MAE', 'R2']
]

# Перебираем варианты используемых признаков
for col in ['features', 'numerical_features_scaled']:
    # обучаем модель
    lr = LinearRegression(featuresCol=col, labelCol=target, regParam=0.1)
    pipeline = Pipeline(stages=[imputer, indexer, encoder, numerical_assembler, standardScaler, final_assembler, lr])
    model = pipeline.fit(train)
    
    #запишем предсказания на тестовой выборке
    predictions = model.transform(test)
    
    #выделим предсказания и ответы в отдельную переменную
    results = predictions.select(['prediction', target])
    
    #соберем все в RDD
    results_collect = results.collect()
    results_list = [ (float(i[0]), float(i[1])) for i in results_collect]
    scoreAndLabels = spark.sparkContext.parallelize(results_list)
    
    #запишем метрики в отдельную переменную
    metrics = RegressionMetrics(scoreAndLabels)
    
    #добавим результаты в список
    data_table.append([col,
                       metrics.rootMeanSquaredError,
                       metrics.meanAbsoluteError,
                       metrics.r2])

23/11/30 10:48:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/30 10:48:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/11/30 10:48:18 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/11/30 10:48:18 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

***Вывод:***

Разделили данные на обучающие и тестовые выборки, обучили модели линейной регрессии

## Анализ результатов

Сравним результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2.

In [27]:
pd.DataFrame(data_table[1:], columns=data_table[0])

Unnamed: 0,features_used,RMSE,MAE,R2
0,features,68292.078678,50039.48638,0.654481
1,numerical_features_scaled,69442.435728,51446.486288,0.642743


***Вывод:***  
   - RMSE и MAE измеряют разницу между предсказанным значением медианной цены дома и его реальной ценой. Реальная цена по набору лежит в диапазоне от 15000 до 500000. Ошибки в случае с использованием всего набора данных чуть меньше. Но не существенно.  
   - Коэффициент детерминации, показывающий долю корректных предсказаний нашей модели выше у модели учитывающей все признаки.    
   - Модели показывают неплохой результат. На тестовом наборе данных лучший результат показала модель обученная на всех признаках, как на количественных так и на категориальном признаках.

In [28]:
spark.stop()

## Общий вывод

В предоставленном наборе данных (размером 20640 строк на 10 колонок) была проведена предобработка данных для сравнения результатов работы модели Линейной регресии на двух наборах данных по метрикам RMSE, MAE и R2:

1. Инициализировали локальную Spark-сессию.
2. Прочитали содержимое файла /datasets/housing.csv.   
3. Выведили типы данных колонок датасета. Используйте методы pySpark.
4. Выполнили предобработку данных:
    - исследовали данные на наличие пропусков и заполнили их медианным значение.
    - преобразули колонку с категориальными значениями техникой One hot encoding.
5. Построили две модели линейной регрессии на разных наборах данных:
    - используя все данные из файла;
    - используя только числовые переменные, исключив категориальные.
6. Сравнили результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2: 
    - RMSE и MAE измеряют разницу между предсказанным значением медианной цены дома и его реальной ценой. Реальная цена по набору лежит в диапазоне от 15000 до 500000. Ошибки в случае с использованием всего набора данных чуть меньше. Но не существенно.  
    - Коэффициент детерминации, показывающий долю корректных предсказаний нашей модели выше у модели учитывающей все признаки.    
    - Модели показывают неплохой результат. 
    - ***На тестовом наборе данных лучший результат показала модель обученная на всех признаках, как на количественных так и на категориальном признаках.***
