## Описание проекта

Необходимо обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Для оценки качества модели будем использовать метрики RMSE, MAE и R2.

## Подготовка данных

Инициализируем локальную Spark-сессию:

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder \
                    .master("local") \
                    .appName("California Housing Prices - Linear regression") \
                    .getOrCreate()

Прочитаем содержимое файла:

In [2]:
df = spark.read.load('/datasets/housing.csv', format="csv", sep=",", inferSchema=True, header=True)
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Выведем типы данных колонок датасета, используя методы pySpark.

In [3]:
print(pd.DataFrame(df.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


In [4]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

## Предобработка данных

Исследуем данные на наличие пропусков и заполним их средним значением:

In [5]:
columns = df.columns

for column in columns:
    print(column, df.where(F.isnan(column) | F.col(column).isNull()).count()) 

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [6]:
df = df.na.fill(df.select(F.mean(df['total_bedrooms'])).collect()[0][0])

In [7]:
columns = df.columns

for column in columns:
    print(column, df.where(F.isnan(column) | F.col(column).isNull()).count()) 

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Пропущенные значения обработаны.

In [8]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

### Трансформация категорийных признаков

Преобразуем колонку с категориальными значениями:

In [9]:
indexer = StringIndexer(inputCol='ocean_proximity', 
                        outputCol='ocean_proximity_idx') 
df = indexer.fit(df).transform(df)

In [10]:
encoder = OneHotEncoder(inputCol='ocean_proximity_idx',
                        outputCol='ocean_proximity_ohe')
df = encoder.transform(df)

In [11]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|        

### Трансформация числовых признаков

In [12]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                                                            outputCol="numerical_features")
df = numerical_assembler.transform(df)

In [13]:
standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol='numerical_features_scaled')
df = standardScaler.fit(df).transform(df)

In [14]:
print(df.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'numerical_features', 'numerical_features_scaled']


In [15]:
all_features = ['ocean_proximity_ohe','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3)

+-------------------+-------------------------+
|ocean_proximity_ohe|numerical_features_scaled|
+-------------------+-------------------------+
|      (4,[3],[1.0])|     [-61.007269596069...|
|      (4,[3],[1.0])|     [-61.002278409814...|
|      (4,[3],[1.0])|     [-61.012260782324...|
+-------------------+-------------------------+
only showing top 3 rows



## Обучение моделей

**Разделение на выборки**

In [16]:
train, test = df.randomSplit([.75,.25], seed=50)
print(train.count(), test.count())

15483 5157


**1-я модель (на всех данных)**

In [17]:
lin_reg = LinearRegression(labelCol=target, featuresCol='features')

model = lin_reg.fit(train)

In [18]:
predictions = model.transform(test)

In [19]:
predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show() 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0|112775.02561120596|
|          106700.0|217000.51716408506|
|           78300.0|123949.82666503266|
|           58100.0|139845.54929901334|
|           68400.0|128588.37228447525|
|           69000.0|175334.48692065058|
|           74600.0| 101689.4989769524|
|          107000.0|189365.89968481427|
|           70200.0|168960.48869066592|
|           82800.0|170780.10405364633|
|           62500.0| 164765.5282365447|
|           75500.0| 157775.3736198349|
|           69500.0|110257.09505755082|
|           75000.0|103499.03062554495|
|          100600.0|190565.93900943873|
|           57500.0|138671.47686157282|
|           76800.0| 146524.8069010866|
|           66800.0| 131665.3384734667|
|          130600.0|216816.69767593034|
|          122400.0|124116.79892787198|
+------------------+------------------+
only showing top 20 rows



In [20]:
evaluation = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction')

# Root Mean Square Error
rmse = evaluation.evaluate(predictions, {evaluation.metricName: 'rmse'})
print('RMSE: %.3f' % rmse)

# Mean Absolute Error
mae = evaluation.evaluate(predictions, {evaluation.metricName: 'mae'})
print('MAE: %.3f' % mae)

# Coefficient of Determination
r2 = evaluation.evaluate(predictions, {evaluation.metricName: 'r2'})
print('R2: %.3f' % r2)

RMSE: 66979.137
MAE: 49074.114
R2: 0.649


**2-я модель (только числовые признаки)**

In [21]:
print(df.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'numerical_features', 'numerical_features_scaled', 'features']


In [22]:
train = train.drop('ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'features')
test = test.drop('ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'features')

In [23]:
lin_reg = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')

model = lin_reg.fit(train)

In [24]:
predictions = model.transform(test)

In [25]:
predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show() 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0| 64264.05228924099|
|          106700.0| 191744.0279452675|
|           78300.0| 76829.47979936935|
|           58100.0|109128.55047097476|
|           68400.0| 79686.44044737285|
|           69000.0|144267.39697506418|
|           74600.0| 51850.39417120721|
|          107000.0|161088.68991208728|
|           70200.0|137826.49449833762|
|           82800.0|139192.40682131844|
|           62500.0| 134059.3510107263|
|           75500.0|127419.04430415668|
|           69500.0| 60488.17634099815|
|           75000.0| 50586.07096783817|
|          100600.0|158427.90378290508|
|           57500.0|104711.65859676572|
|           76800.0|115980.07596153207|
|           66800.0|101523.10843930952|
|          130600.0|187276.34619002696|
|          122400.0| 71282.29426170001|
+------------------+------------------+
only showing top 20 rows



In [26]:
evaluation = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction')

# Root Mean Square Error
rmse = evaluation.evaluate(predictions, {evaluation.metricName: 'rmse'})
print('RMSE: %.3f' % rmse)

# Mean Absolute Error
mae = evaluation.evaluate(predictions, {evaluation.metricName: 'mae'})
print('MAE: %.3f' % mae)

# Coefficient of Determination
r2 = evaluation.evaluate(predictions, {evaluation.metricName: 'r2'})
print('R2: %.3f' % r2)

RMSE: 68063.775
MAE: 50367.215
R2: 0.637


## Анализ результатов

Модель, построенная с использованием только числовых признаков показала результат хуже, чем модель, включающая категориальный признак.