## Предсказание стоимости жилья

В проекте мы обучим модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. для этого:
 - Обучим модель и сделаем предсказания на тестовой выборке. 
 - Для оценки качества модели возьмем метрики RMSE, MAE и R2.

In [1]:
import pandas as pd #инициализирую сессию
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator

In [2]:
pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2023

spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) #подгружаю файл

                                                                                

# Подготовка данных

In [3]:
print(pd.DataFrame(df.dtypes, columns=['column', 'type']).head(10)) #типы данных по колонкам в датафрейме

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


In [4]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [5]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [6]:
for column in df.columns:
    check_col = F.col(column).cast("float").isNull()
    print(column, df.filter(check_col).count()) #определение столбца с пропусками

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 20640


In [7]:
df = df.na.fill({"total_bedrooms" : 0})

In [8]:
df.show(291) #удостоверилась, что удалила нормально

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [9]:
categorical_cols = ['ocean_proximity']
target = "median_house_value" #делим столбцы для своего удобства

In [10]:
def standart(data_fit, data_tr):
    numerical_cols = list(data_fit.columns)
    numerical_cols.remove(target)
    numerical_cols.remove(categorical_cols[0])
    
    indexer = StringIndexer(inputCols=categorical_cols, outputCols=[c+'_idx' for c in categorical_cols])
    indexer_model = indexer.fit(data_fit)
    
    data_fit = indexer_model.transform(data_fit)
    data_tr = indexer_model.transform(data_tr)
    
    encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], outputCols=[c+'_ohe' for c in categorical_cols])
    encoder_model = encoder.fit(data_fit)
    data_fit = encoder_model.transform(data_fit)
    data_tr = encoder_model.transform(data_tr)
    
    categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")
    data_fit = categorical_assembler.transform(data_fit)
    data_tr = categorical_assembler.transform(data_tr)
    
    numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
    data_fit = numerical_assembler.transform(data_fit) 
    data_tr = numerical_assembler.transform(data_tr)
    
    standardScaler = StandardScaler(inputCol='numerical_features',outputCol="numerical_features_scaled")
    standardScaler_model = standardScaler.fit(data_fit)
    data_fit = standardScaler_model.transform(data_fit) #стандартизируем количественные величины
    data_tr = standardScaler_model.transform(data_tr)
    
    all_features = ['categorical_features','numerical_features_scaled']

    final_assembler = VectorAssembler(inputCols=all_features, 
                                      outputCol="features") 
    data_fit = final_assembler.transform(data_fit)
    data_tr = final_assembler.transform(data_tr)

    data_tr.select(all_features).show() #образуем из полученных признаков общий вектор
    
    return(data_tr)

# Обучение моделей

In [11]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED) #разделяем весь датасет на обущающую и тестовую выборки

In [12]:
train_data_fit = train_data
train_data = standart(train_data_fit, train_data)
test_data = standart(train_data_fit, test_data)

                                                                                

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-62.029642565664...|
|       (4,[2],[1.0])|     [-62.004701012562...|
|       (4,[2],[1.0])|     [-62.004701012562...|
|       (4,[2],[1.0])|     [-61.989736080700...|
|       (4,[2],[1.0])|     [-61.984747770080...|
|       (4,[2],[1.0])|     [-61.979759459459...|
|       (4,[2],[1.0])|     [-61.969782838218...|
|       (4,[2],[1.0])|     [-61.969782838218...|
|       (4,[2],[1.0])|     [-61.969782838218...|
|       (4,[2],[1.0])|     [-61.959806216977...|
|       (4,[2],[1.0])|     [-61.959806216977...|
|       (4,[2],[1.0])|     [-61.959806216977...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.944841285116...|
|       (4,[2],[1.0]

In [13]:
print((train_data.count(), len(train_data.columns)))
print((test_data.count(), len(test_data.columns)))

(16519, 16)
(4121, 16)


In [14]:
lr = LinearRegression (labelCol=target, featuresCol='features') #Обучение на общем датасете

model = lr.fit(train_data) 

23/02/17 12:06:55 WARN Instrumentation: [cafdc5cc] regParam is zero, which might cause numerical instability and overfitting.
23/02/17 12:06:55 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/02/17 12:06:55 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/02/17 12:06:56 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/02/17 12:06:56 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [26]:
predictions = model.transform(test_data)
predictedLabes = predictions.select(target, "prediction")
predictedLabes.show() #получаем предсказания

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           78300.0|128127.55003829929|
|           67000.0| 152992.8721853674|
|           62500.0|167626.92525713542|
|          100600.0|193373.44964291784|
|          104200.0|201764.08576896158|
|           74100.0|  158735.689384931|
|          128100.0|222798.12250347738|
|          130600.0| 218097.6177249367|
|           92800.0|212735.80136220925|
|           83000.0| 179386.3353830569|
|           70500.0|173927.19349396438|
|           87500.0|129545.66611654102|
|           82100.0| 160425.6266830603|
|           99600.0| 152996.4984056668|
|          135600.0|173073.79626375646|
|           78800.0| 133831.4129515693|
|          111300.0|158267.37542068283|
|          119700.0|  174728.509569498|
|          153100.0| 72169.30734250252|
|           98100.0|164615.48319842736|
+------------------+------------------+
only showing top 20 rows



In [16]:
evaluation = RegressionEvaluator(labelCol=target, predictionCol="prediction")

# Root Mean Square Error
rmse = evaluation.evaluate(predictedLabes, {evaluation.metricName: "rmse"})
print("RMSE: %.3f" % rmse)

# Mean Absolute Error
mae = evaluation.evaluate(predictedLabes, {evaluation.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = evaluation.evaluate(predictedLabes, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 69623.683
MAE: 49846.656
r2: 0.642


Обучение на данных используя только числовые переменные, исключив категориальные

In [17]:
train_data_1, test_data_1 = df.drop('categorical_features').randomSplit([.8,.2], seed=RANDOM_SEED) 
#разделяем датасет на обущающую и тестовую выборки

In [18]:
train_data_1_fit = train_data_1
train_data_1 = standart(train_data_1_fit, train_data_1)
test_data_1 = standart(train_data_1_fit, test_data_1)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-62.029642565664...|
|       (4,[2],[1.0])|     [-62.004701012562...|
|       (4,[2],[1.0])|     [-62.004701012562...|
|       (4,[2],[1.0])|     [-61.989736080700...|
|       (4,[2],[1.0])|     [-61.984747770080...|
|       (4,[2],[1.0])|     [-61.979759459459...|
|       (4,[2],[1.0])|     [-61.969782838218...|
|       (4,[2],[1.0])|     [-61.969782838218...|
|       (4,[2],[1.0])|     [-61.969782838218...|
|       (4,[2],[1.0])|     [-61.959806216977...|
|       (4,[2],[1.0])|     [-61.959806216977...|
|       (4,[2],[1.0])|     [-61.959806216977...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.949829595737...|
|       (4,[2],[1.0])|     [-61.944841285116...|
|       (4,[2],[1.0]

In [19]:
lr_1 =  LinearRegression(labelCol= target, featuresCol='numerical_features_scaled') #Обучение на датасете

model_1 = lr_1.fit(train_data_1)

23/02/17 11:50:33 WARN Instrumentation: [91821538] regParam is zero, which might cause numerical instability and overfitting.


In [20]:
predictions_1 = model_1.transform(test_data_1)
predictedLabes_1 = predictions_1.select(target, "prediction")
predictedLabes_1.show() #получаем предсказания

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           78300.0| 80182.56233405648|
|           67000.0|120825.19001007453|
|           62500.0| 136376.8517781715|
|          100600.0|160668.03664235352|
|          104200.0| 169935.9386526933|
|           74100.0|126719.22105304385|
|          128100.0|193795.49440216227|
|          130600.0|187737.25901922444|
|           92800.0|180677.34426013194|
|           83000.0|145937.66108072642|
|           70500.0|151182.07990591135|
|           87500.0|  94791.5110046519|
|           82100.0|124666.87647333089|
|           99600.0|115557.01968534291|
|          135600.0|150204.96260606963|
|           78800.0| 97275.50987857953|
|          111300.0|119739.66134422971|
|          119700.0| 139781.9428509432|
|          153100.0| 35009.80922097247|
|           98100.0|124245.77735258965|
+------------------+------------------+
only showing top 20 rows



In [21]:
evaluation_1 = RegressionEvaluator(labelCol=target, predictionCol="prediction")

# Root Mean Square Error
rmse_1 = evaluation_1.evaluate(predictedLabes_1, {evaluation_1.metricName: "rmse"})
print("RMSE: %.3f" % rmse_1)

# Mean Absolute Error
mae_1 = evaluation_1.evaluate(predictedLabes_1, {evaluation_1.metricName: "mae"})
print("MAE: %.3f" % mae_1)

# r2 - coefficient of determination
r2_1 = evaluation_1.evaluate(predictedLabes_1, {evaluation_1.metricName: "r2"})
print("r2: %.3f" %r2_1)

RMSE: 70779.864
MAE: 51088.182
r2: 0.630


In [22]:
spark.stop()

# Анализ результатов

**Вывод:** По результатам работы двух моделей, с немного отличными друг от друга признаками, можно сказать, что модель без категориальных данных имеет точность выше, чем модель с этими данными.