In [1]:
# импортирую библиотеки
import pandas as pd 
import numpy as np


import pyspark
from pyspark.sql import Row, SparkSession
from pyspark.sql.window import Window 
from pyspark.sql.types import *
import pyspark.sql.functions as F, round
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

# Подготовка данных

In [2]:
# инициализирую Spark-Session
spark = SparkSession.builder.master("local").appName("Prediction the cost of housing").getOrCreate()

In [3]:
# считываю файл 
df = spark.read.load('/datasets/housing.csv', format='csv', sep=',', inferSchema=True, header='true')

                                                                                

In [4]:
# просматриваю тип данных
print(pd.DataFrame(df.dtypes, columns=['column', 'type']).head(10))
df.show()

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500

In [5]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
# просматриваю информацию 
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Уже можно сказать, что есть пропуски в столбце total_defrooms, но нужно проверить с  помощью spark

In [7]:
# проверяю пропуски
columns = df.columns

for column in columns:
    nan_col = F.col(column).isNull()
    print(column, df.filter(nan_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [8]:
# просматриваю пропуски
df.filter(df.total_bedrooms.isNull()).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.16,37.77,47.0,1256.0,,570.0,218.0,4.3750,161900.0,NEAR BAY
1,-122.17,37.75,38.0,992.0,,732.0,259.0,1.6196,85100.0,NEAR BAY
2,-122.28,37.78,29.0,5154.0,,3741.0,1273.0,2.5762,173400.0,NEAR BAY
3,-122.24,37.75,45.0,891.0,,384.0,146.0,4.9489,247100.0,NEAR BAY
4,-122.10,37.69,41.0,746.0,,387.0,161.0,3.9063,178400.0,NEAR BAY
...,...,...,...,...,...,...,...,...,...,...
202,-119.19,34.20,18.0,3620.0,,3171.0,779.0,3.3409,220500.0,NEAR OCEAN
203,-119.18,34.19,19.0,2393.0,,1938.0,762.0,1.6953,167400.0,NEAR OCEAN
204,-118.88,34.17,15.0,4260.0,,1701.0,669.0,5.1033,410700.0,<1H OCEAN
205,-118.75,34.29,17.0,5512.0,,2734.0,814.0,6.6073,258100.0,<1H OCEAN


In [9]:
# с помощью оконной функции и na.fill заменяю пропуски
window = Window().partitionBy(['total_rooms'])

df = df.withColumn("total_bedrooms_mean_by_rooms",F.avg(F.col('total_bedrooms')).over(window))

df = df.withColumn("total_bedrooms_filled",
                   F.when(F.col('total_bedrooms').isNull(),F.col('total_bedrooms_mean_by_rooms'))
                   .otherwise(F.col('total_bedrooms')))

df = df.withColumn("total_bedrooms_filled", round("total_bedrooms_filled"))

mean = df.select(round(F.avg('total_bedrooms_filled'))).collect()[0][0]

df = df.na.fill({'total_bedrooms_filled': mean})

                                                                                

In [10]:
# просматриваю как заменились пропуски
df.filter(df.total_bedrooms.isNull()).toPandas()

                                                                                

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,total_bedrooms_mean_by_rooms,total_bedrooms_filled
0,-120.98,37.66,10.0,934.0,,401.0,255.0,0.9336,127100.0,INLAND,222.750000,223.0
1,-120.32,37.29,38.0,576.0,,478.0,112.0,2.3382,59600.0,INLAND,131.500000,132.0
2,-122.42,40.44,16.0,994.0,,495.0,181.0,2.1875,76400.0,INLAND,269.750000,270.0
3,-122.14,37.67,37.0,3342.0,,1635.0,557.0,4.7933,186900.0,NEAR BAY,666.000000,666.0
4,-118.38,34.05,49.0,702.0,,458.0,187.0,4.8958,333600.0,<1H OCEAN,185.500000,186.0
...,...,...,...,...,...,...,...,...,...,...,...,...
202,-119.45,36.61,24.0,1302.0,,693.0,243.0,3.7917,90500.0,INLAND,281.833333,282.0
203,-119.69,36.83,32.0,1098.0,,726.0,224.0,1.4913,54600.0,INLAND,275.000000,275.0
204,-122.44,37.80,52.0,3830.0,,1310.0,963.0,3.4801,500001.0,NEAR BAY,791.000000,791.0
205,-117.14,32.71,52.0,500.0,,480.0,108.0,1.8696,91100.0,NEAR OCEAN,127.333333,127.0


In [11]:
# создаю новые столбцы
df = df.withColumn('rooms_per_household', (F.col('total_rooms') / F.col('households')).cast(DoubleType()))
df = df.withColumn('population_in_household', (F.col('population') / F.col('households')).cast(DoubleType()))
df = df.withColumn('bedroom_index', (F.col('total_bedrooms_filled') / F.col('total_rooms')).cast(DoubleType()))

- Отношение количества комнат `total_rooms` к количеству домовладений `households`. Назвал колонку `rooms_per_household`.
- Отношение количества жителей `population` к количеству домовладений `households`. Назвал колонку `population_in_household`.
- Отношение количества спален `total_bedrooms` к общему количеству комнат `total_rooms`. Назвал колонку `bedroom_index`.

In [12]:
# перевожу категориальную фичу из строки в числа
indexer = StringIndexer(inputCols=['ocean_proximity'], 
                        outputCols=['ocean_proximity_idx']) 
df = indexer.fit(df).transform(df)

                                                                                

In [13]:
# произвожу OHE
encoder = OneHotEncoder(inputCols=['ocean_proximity_idx'],
                        outputCols=['ocean_proximity_ohe'])
df = encoder.fit(df).transform(df)

In [14]:
df.toPandas()

                                                                                

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,total_bedrooms_mean_by_rooms,total_bedrooms_filled,rooms_per_household,population_in_household,bedroom_index,ocean_proximity_idx,ocean_proximity_ohe
0,-118.17,34.10,37.0,299.0,89.0,318.0,92.0,1.3125,145800.0,<1H OCEAN,82.333333,89.0,3.250000,3.456522,0.297659,0.0,"(1.0, 0.0, 0.0, 0.0)"
1,-118.26,34.03,49.0,299.0,90.0,287.0,68.0,1.2096,100000.0,<1H OCEAN,82.333333,90.0,4.397059,4.220588,0.301003,0.0,"(1.0, 0.0, 0.0, 0.0)"
2,-117.94,33.78,40.0,299.0,68.0,163.0,70.0,3.0125,166100.0,<1H OCEAN,82.333333,68.0,4.271429,2.328571,0.227425,0.0,"(1.0, 0.0, 0.0, 0.0)"
3,-118.44,33.99,44.0,305.0,72.0,156.0,70.0,5.9641,275000.0,<1H OCEAN,72.000000,72.0,4.357143,2.228571,0.236066,0.0,"(1.0, 0.0, 0.0, 0.0)"
4,-118.98,35.35,21.0,496.0,131.0,511.0,124.0,1.7614,33200.0,INLAND,131.000000,131.0,4.000000,4.120968,0.264113,1.0,"(0.0, 1.0, 0.0, 0.0)"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
20635,-117.15,34.83,30.0,5370.0,1062.0,2778.0,944.0,3.0990,66800.0,INLAND,1062.000000,1062.0,5.688559,2.942797,0.197765,1.0,"(0.0, 1.0, 0.0, 0.0)"
20636,-122.44,37.75,21.0,5457.0,1247.0,2304.0,1180.0,4.5469,409700.0,NEAR BAY,1247.000000,1247.0,4.624576,1.952542,0.228514,3.0,"(0.0, 0.0, 0.0, 1.0)"
20637,-121.64,37.14,14.0,5487.0,1024.0,2823.0,979.0,4.1750,229800.0,<1H OCEAN,1024.000000,1024.0,5.604699,2.883555,0.186623,0.0,"(1.0, 0.0, 0.0, 0.0)"
20638,-118.00,33.97,30.0,6540.0,991.0,3124.0,953.0,6.0663,372600.0,<1H OCEAN,999.500000,991.0,6.862539,3.278069,0.151529,0.0,"(1.0, 0.0, 0.0, 0.0)"


### Подготовка фичей для первой модели(все признаки)

In [15]:
# переменная с числовыми признаками
numeric_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                'population', 'households', 'median_income',
                'total_bedrooms_filled', 'rooms_per_household', 'population_in_household', 
                'bedroom_index']

# переменная с категориальным признаком
categorical_assembler =  VectorAssembler(inputCols=['ocean_proximity_ohe'],
                                        outputCol="categorical_features")
df_one = categorical_assembler.transform(df) 

numerical_assembler = VectorAssembler(inputCols=numeric_cols,
                                      outputCol="numerical_features")
df_one = numerical_assembler.transform(df_one)

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")
df_one = standardScaler.fit(df_one).transform(df_one) 

                                                                                

### Подготовка фичей для второй модели(без категориального признака)

In [16]:
# переменная с числовыми признаками
numeric_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                'population', 'households', 'median_income',
                'total_bedrooms_filled', 'rooms_per_household', 'population_in_household', 
                'bedroom_index']

numerical_assembler = VectorAssembler(inputCols=numeric_cols,
                                      outputCol="numerical_features")

df_two = numerical_assembler.transform(df) 

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")
df_two = standardScaler.fit(df_two).transform(df_two) 


                                                                                

<div style="border-radius: 15px; box-shadow: 4px 4px 4px; border: 3px solid indigo; padding: 15px"> 
<h2 align="center"> Вывод <a class="tocSkip"> </h2>
В данной главе были просмотрены данные, найдены пропуски и заполнены по группам. Также преобразована категориальная колонка ocean_proximity.

# Обучение моделей

### Первая модель(все признаки)

In [17]:
# создаю выборку для обучения модели
features_one = ['numerical_features_scaled', 'categorical_features']

target = 'median_house_value'


final_assembler_one = VectorAssembler(inputCols=features_one, 
                                      outputCol="features") 

df_one = final_assembler_one.transform(df_one)

In [18]:
df_one.select(features_one).show(3) 

+-------------------------+--------------------+
|numerical_features_scaled|categorical_features|
+-------------------------+--------------------+
|     [-58.980847976499...|       (4,[0],[1.0])|
|     [-59.025768652795...|       (4,[0],[1.0])|
|     [-58.866050692632...|       (4,[0],[1.0])|
+-------------------------+--------------------+
only showing top 3 rows



In [19]:
# задаю random_seed и делю выборку на 80 и 20
RANDOM_SEED = 777

train_data_one, test_data_one = df_one.randomSplit([.8,.2], seed=RANDOM_SEED)

In [20]:
# просматриваю деление, имеем 16470 строк для обучения и 4170 для проверки
print(train_data_one.count(), test_data_one.count())



16470 4170


                                                                                

In [21]:
# обучаю линейную регрессию
lr = LinearRegression(labelCol=target, featuresCol='features', regParam=0.5)

model = lr.fit(train_data_one) 

22/04/03 22:17:16 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/03 22:17:16 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/04/03 22:17:23 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/04/03 22:17:23 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [22]:
# просматриваю предсказание модели
predictions = model.transform(test_data_one)

predictedLabes = predictions.select(["prediction", target])#КОД РЕВЬЮЕРА
predictedLabes.show() 

+------------------+------------------+
|        prediction|median_house_value|
+------------------+------------------+
|233932.49250473315|          269200.0|
|254201.83882431872|          339700.0|
| 279649.5826158263|          158200.0|
|108882.67673560698|           83900.0|
|117890.55080193141|           53200.0|
| 328052.8794147703|          275000.0|
|198426.59207819356|          217200.0|
|  278392.689591439|          381200.0|
|205380.47557643335|          234100.0|
|136117.64086359367|          133600.0|
|121790.64851125749|          117600.0|
|129629.72803935921|          145800.0|
| 176125.5761008705|          119600.0|
| 294664.4295156887|          278700.0|
| 194080.7707334836|          336800.0|
| 359621.4017744935|          356200.0|
|129355.82289745053|           94300.0|
| 97721.06995718041|           59600.0|
|261486.72256571753|          286700.0|
|251735.83740368485|          203400.0|
+------------------+------------------+
only showing top 20 rows



### Вторая модель(без категориального признака)

In [23]:
# создаю выборку для обучения модели
features_two = ['numerical_features_scaled']

target = 'median_house_value'


final_assembler_one = VectorAssembler(inputCols=features_two, 
                                      outputCol="features") 

df_two = final_assembler_one.transform(df_two)

In [24]:
df_two.select(features_two).show(3) 

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-58.980847976499...|
|     [-59.025768652795...|
|     [-58.866050692632...|
+-------------------------+
only showing top 3 rows



In [25]:
# задаю random_seed и делю выборку на 80 и 20
RANDOM_SEED = 777

train_data_two, test_data_two = df_two.randomSplit([.8,.2], seed=RANDOM_SEED)

In [26]:
# просматриваю деление, имеем 16470 строк для обучения и 4170 для проверки
print(train_data_two.count(), test_data_two.count())



16470 4170


                                                                                

In [27]:
# обучаю линейную регрессию
lr = LinearRegression(labelCol=target, featuresCol='features', regParam=0.5)

model = lr.fit(train_data_two) 

                                                                                

In [28]:
# просматриваю предсказание модели
predictions_two = model.transform(test_data_two)

predictedLabes_two = predictions_two.select([ "prediction", target])
predictedLabes_two.show() 

+------------------+------------------+
|        prediction|median_house_value|
+------------------+------------------+
|220385.32900370192|          269200.0|
| 249675.6015689373|          339700.0|
| 291075.6828733203|          158200.0|
|113806.13667949196|           83900.0|
|138133.33572148904|           53200.0|
| 329935.1741443067|          275000.0|
| 193038.2443308453|          217200.0|
| 279106.5935451449|          381200.0|
| 201838.4837657502|          234100.0|
|130081.83532831818|          133600.0|
| 114181.7679948546|          117600.0|
|121687.84564504214|          145800.0|
|194204.93574217614|          119600.0|
|286418.12433721963|          278700.0|
|193030.93915740633|          336800.0|
| 353642.6565334243|          356200.0|
|138431.70487036603|           94300.0|
|106866.35075569432|           59600.0|
|257124.24472554075|          286700.0|
| 242526.9504041029|          203400.0|
+------------------+------------------+
only showing top 20 rows



<div style="border-radius: 15px; box-shadow: 4px 4px 4px; border: 3px solid indigo; padding: 15px"> 
<h2 align="center"> Вывод <a class="tocSkip"> </h2>
Были преобразованы признакми в 2: числовые и категориальные, а также обучены 2 модели линейной регрессии: 1 - со всеми данными, 2 - без категориальной фичи.

# Анализ результатов

### Первая модель(все признаки)

In [29]:
# создаю RDD, чтобы посмотреть результаты метрик
predicted_collect = predictedLabes.collect()
predict_list = [ (float(i[0]), float(i[1])) for i in predicted_collect]
predictAndLabels = spark.sparkContext.parallelize(predict_list)

metrics = RegressionMetrics(predictAndLabels)

                                                                                

In [30]:
# просматриваю метрики, а именно: MAE, R2, RMSE
print("RMSE = %s" % metrics.rootMeanSquaredError)

print("R-squared = %s" % metrics.r2)

print("MAE = %s" % metrics.meanAbsoluteError)

RMSE = 69203.58846424552
R-squared = 0.6516668766192801
MAE = 49522.72714131923


### Вторая модель(без категориального признака)

In [31]:
# создаю RDD, чтобы посмотреть результаты метрик
predicted_collect = predictedLabes_two.collect()
predict_list = [ (float(i[0]), float(i[1])) for i in predicted_collect]
predictAndLabels_two = spark.sparkContext.parallelize(predict_list)

metrics = RegressionMetrics(predictAndLabels_two)

                                                                                

In [32]:
# просматриваю метрики, а именно: MAE, R2, RMSE
print("RMSE = %s" % metrics.rootMeanSquaredError)

print("R-squared = %s" % metrics.r2)

print("MAE = %s" % metrics.meanAbsoluteError)

RMSE = 69700.69633780871
R-squared = 0.6466445629994498
MAE = 50130.32250520992


<div style="border-radius: 15px; box-shadow: 4px 4px 4px; border: 3px solid indigo; padding: 15px"> 
<h2 align="center"> Вывод <a class="tocSkip"> </h2>
Исходя из различных метрик регрессии можно сказать, что обе модели предсказали одинаково, как ни странно :)