## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

# Подготовка данных

In [46]:
#Импортируем все нужные модули из pyspark.ml.feature для трансформации признаков,
#алгоритм линейной регрессии из pyspark.ml.regression и модули из pyspark.ml.evaluation для оценки качества модели.

import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.sql import functions as F, Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import DataFrame
from pyspark.ml.regression import LinearRegression

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
        
RANDOM_SEED = 2022

spark = SparkSession.builder \
                    .master("local") \
                    .appName("HAUSING - Logistic regression") \
                    .getOrCreate()

df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True, nullValue="NA")
print("End")

End


In [47]:
#заполним пропущенные значения в столбце total_bedrooms на среднее значение  по столбцу
exclude = ["total_bedrooms"]
#mean = cases_df['total_bedrooms'].mean()
#df = df.fillna(mean)
mean = df.select(F.mean('total_bedrooms')).collect()[0][0]
df = df.na.fill({'total_bedrooms': mean})

In [48]:
# посмотрим на таблицу, преобразованную в таблицу Pandas
cases_df = df.toPandas()
cases_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20640 entries, 0 to 20639
Data columns (total 10 columns):
longitude             20640 non-null float64
latitude              20640 non-null float64
housing_median_age    20640 non-null float64
total_rooms           20640 non-null float64
total_bedrooms        20640 non-null float64
population            20640 non-null float64
households            20640 non-null float64
median_income         20640 non-null float64
median_house_value    20640 non-null float64
ocean_proximity       20640 non-null object
dtypes: float64(9), object(1)
memory usage: 1.6+ MB


In [49]:
#посмотрим на типы всех наших колонок
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [51]:
#Разделим колонки на два типа: числовые и текстовые, которые представляют категориальные данные,
#отдельно создадим переменную под столбец с целевым значением
categorical_col = 'ocean_proximity'
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 
                   'median_income']
target = "median_house_value" 

#создадим отдельный набор данных, используя только числовые переменные, исключив категориальные
df_num = df['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 
                   'median_income', 'median_house_value']

In [52]:
#В первую очередь трансформируем категориальные признаки с помощью трансформера StringIndexer. 
#(Он переводит текстовые категории в числовое представление).

indexer = StringIndexer(inputCol=categorical_col, 
                        outputCol=categorical_col+'_idx')
df = indexer.fit(df).transform(df)

#cols = [c for c in df.columns for i in categorical_col if (c.startswith(i))]
#df.select(cols).show(3)

In [53]:
#Дополнительно создадим OHE-кодирование для категорий.
encoder = OneHotEncoderEstimator(inputCols=[categorical_col+'_idx'],
                        outputCols=[categorical_col+'_ohe'])
df = encoder.fit(df).transform(df)

#cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
#df.select(cols).show(3)

In [54]:
 #объединеним категориальные признаки в один вектор, с которым ML-алгоритм умеет работать
categorical_assembler = \
        VectorAssembler(inputCols=[categorical_col +'_ohe'],
                                        outputCol="categorical_features")
df = categorical_assembler.transform(df)

In [55]:
 #объединеним числовые признаки в один вектор, с которым ML-алгоритм умеет работать
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                                                            outputCol="numerical_features")
df = numerical_assembler.transform(df)
df_num = numerical_assembler.transform(df_num)

In [56]:
# проведем шкалирование значений — чтобы сильные выбросы не смещали предсказания модели
standardScaler = StandardScaler(inputCol='numerical_features',
                                                                outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df)
df_num = standardScaler.fit(df_num).transform(df_num)

In [57]:
#выведем название всех колонок для всего набора данных
print(df.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'categorical_features', 'numerical_features', 'numerical_features_scaled']


In [58]:
#выведем название всех колонок для набора данных с числовыми признаками
print(df_num.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'numerical_features', 'numerical_features_scaled']


In [59]:
#соберем трансформированные категорийные и числовые признаки
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



In [60]:
#собрать трансформированные числовые признаки
features_num = ['numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=features_num, 
                                  outputCol="features_num") 
df_num = final_assembler.transform(df_num)

df_num.select(features_num).show(3)

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.007269596069...|
|     [-61.002278409814...|
|     [-61.012260782324...|
+-------------------------+
only showing top 3 rows



***Вывод***:

Заполнили пропущенные значения в столбце total_bedrooms на среднее значение  по столбцу, трансформировали категориальные признаки, создали OHE-кодирование для категорий, объединенили категориальные признаки в один вектор и числовые признаки в другой вектор, провели шкалирование значений числовых признаков, трансформировали категорийные и числовые признаки.


# Обучение моделей

In [61]:
#Разделяем  датасет на две части — выборку для обучения и выборку для тестирования качества модели
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

16437 4203


In [62]:
#Разделяем  датасет числовых признаков на две части — выборку для обучения и выборку для тестирования качества модели
train_data_num, test_data_num = df_num.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data_num.count(), test_data_num.count())

16437 4203


In [63]:
#Построим модель линейной регрессии на наборе данных, используя все данные из файла;

lr = LinearRegression(labelCol= 'median_house_value', featuresCol = 'features', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))

Coefficients: [17386.430795751403,-42907.646586252515,26250.999191614977,14845.127873281883,-23673.416384793836,-23520.779171806957,13928.424690788983,210.91685191627477,28791.435833964173,-49348.35198000037,25614.623162511383,72262.85193905383]
Intercept: -1000156.2856539406


In [64]:
#Построим модель линейной регрессии на наборе данных, используя только числовые переменные, исключив категориальные.
#Для построения модели используйте оценщик LinearRegression из библиотеки MLlib.
lr_num = LinearRegression(labelCol= target, featuresCol = 'features_num',  maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model_num = lr_num.fit(train_data_num)
print("Coefficients: %s" % str(lr_model_num.coefficients))
print("Intercept: %s" % str(lr_model_num.intercept))

Coefficients: [-68531.938070535,-72805.95765339056,15996.687794895974,-8348.40585674149,34076.863487206974,-53847.0966943031,34478.58288001748,76625.04909315362]
Intercept: -2871869.1386776133


In [65]:
#использую summury для статистики для числовых и строковых столбцов на тренировочной выборке
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 69342.855671
r2: 0.639466
MAE: 50202.693350


In [66]:
#использую summury для статистики для числовых и строковых столбцов на тренировочной выборке только ля числовых переменных
trainingSummaryNum = lr_model_num.summary
print("RMSE: %f" % trainingSummaryNum.rootMeanSquaredError)
print("r2: %f" % trainingSummaryNum.r2)
print("MAE: %f" % trainingSummaryNum.meanAbsoluteError)

RMSE: 70172.882592
r2: 0.630783
MAE: 51124.357613


In [67]:
#Для построения модели используем оценщик LinearRegression из библиотеки MLlib evaluate
lr_predictions = lr_model.transform(test_data)
lr_predictions.select("prediction",target,"features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol=target,metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+------------------+--------------------+
|        prediction|median_house_value|            features|
+------------------+------------------+--------------------+
| 144464.2682678937|           85800.0|[0.0,0.0,1.0,0.0,...|
|207143.88575844374|          111400.0|[0.0,0.0,1.0,0.0,...|
| 157074.7809646111|           58100.0|[0.0,0.0,1.0,0.0,...|
| 150118.7533728031|           64600.0|[0.0,0.0,1.0,0.0,...|
|187107.79636004497|           82800.0|[0.0,0.0,1.0,0.0,...|
+------------------+------------------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.640376


In [68]:
#Для построения модели используем оценщик LinearRegression из библиотеки MLlib evaluate на числовых переменных
lr_predictions_num = lr_model_num.transform(test_data_num)
lr_predictions_num.select("prediction",target,"features_num").show(5)
lr_evaluator_num = RegressionEvaluator(predictionCol="prediction", \
                 labelCol=target,metricName="r2")
print("R Squared (R2) on test data num = %g" % lr_evaluator_num.evaluate(lr_predictions_num))

+------------------+------------------+--------------------+
|        prediction|median_house_value|        features_num|
+------------------+------------------+--------------------+
| 75114.75630027708|           85800.0|[-62.040445150874...|
|170158.36659766315|          111400.0|[-62.020480405854...|
|112067.09445788525|           58100.0|[-61.995524474579...|
|103033.32668885449|           64600.0|[-61.980550915813...|
|141942.49020201433|           82800.0|[-61.975559729558...|
+------------------+------------------+--------------------+
only showing top 5 rows

R Squared (R2) on test data num = 0.63094


***Вывод:***  

На тренировочной выборке для числовых переменных показатели RMSE и MAE выше чем на наборе данных с категориальными переменными, а показатель R2 ниже.

# Анализ результатов

In [69]:
#Считаем RMSE, R2, MAE на тестовой выборке
test_result = lr_model.evaluate(test_data)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)
print("R2 on test data = %g" % test_result.r2)
print("Root Mean Absolute Error (MAE) on test data = %g" % test_result.meanAbsoluteError)

Root Mean Squared Error (RMSE) on test data = 68981
R2 on test data = 0.640376
Root Mean Absolute Error (MAE) on test data = 49522.3


In [70]:
#Считаем RMSE, R2, MAE на тестовой выборке для числовых переменных
test_result_num = lr_model_num.evaluate(test_data_num)
print("Root Mean Squared Error (RMSE) on test data num = %g" % test_result_num.rootMeanSquaredError)
print("R2 on test data num = %g" % test_result_num.r2)
print("Root Mean Absolute Error (MAE) on test data num = %g" % test_result_num.meanAbsoluteError)

Root Mean Squared Error (RMSE) on test data num = 69880.1
R2 on test data num = 0.63094
Root Mean Absolute Error (MAE) on test data num = 50611.2


***Вывод***:  

На тестовой выборке для числовых переменных показатели RMSE и MAE выше чем на наборе данных с категориальными переменными, а показатель R2 ниже.