# Предсказание медианной стоимости квартир в калифорнийских жилых массивах

**Описание данных**:

- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- **median_house_value** — медианная стоимость дома в жилом массиве (**таргет**);
- ocean_proximity — близость к океану.



## Импорты

In [None]:
RANDOM_SEED = 42

In [None]:
!pip install pyspark
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



## Загрузка данных

In [None]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("RealEstateML") \
                    .getOrCreate()

In [None]:
df = pd.read_csv('https://code.s3.yandex.net/datasets/housing.csv')
df = spark.createDataFrame(df)

## Первичное изучение данных

In [None]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



## Предобработка данных

### Nan

In [None]:
columns = df.columns

for column in columns:
    check_col = F.col(column).cast('float')
    print(column, df.filter(check_col.isin([None, np.nan, 'NULL'])).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [None]:
bedrooms_median = np.median(
    [
        row['total_bedrooms'] for row in df.filter(df.total_bedrooms.isNotNull() & ~F.isnan(df.total_bedrooms)).select("total_bedrooms").collect()
    ]
    )
bedrooms_median

435.0

In [None]:
# Заменим Nan на медиану
df = df.withColumn('total_bedrooms', F.when(df.total_bedrooms.isNull() | F.isnan(df.total_bedrooms), bedrooms_median).otherwise(df.total_bedrooms))

In [None]:
df.filter(F.col('total_bedrooms').isin([None, np.nan, 'NULL'])).count()

0

## Моделирование


### С категориальным столбцом

In [None]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

16560 4080


In [None]:
test_data.select('ocean_proximity').distinct().show()

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



In [None]:
train_data.select('ocean_proximity').distinct().show()

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



**Уникальные значения кат. столбцов в выборках совпадают**

In [None]:
categorical_cols = ['ocean_proximity']
numerical_cols = ['longitude',
                  'latitude',
                  'housing_median_age',
                  'total_rooms',
                  'total_bedrooms',
                  'population',
                  'households',
                  'median_income']
target = 'median_house_value'

In [None]:
indexer = StringIndexer(inputCols=categorical_cols,
                        outputCols=[c+'_idx' for c in categorical_cols])

encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])

categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")

numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol="numerical_features")

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled",
                                withMean=True)

final_assembler = VectorAssembler(inputCols=['categorical_features', 'numerical_features_scaled'],
                                  outputCol="features")

In [None]:
lr = LinearRegression(labelCol=target, featuresCol='features')
rf = RandomForestRegressor(labelCol=target, featuresCol="features")

In [None]:
pipeline_lr = Pipeline(stages=[indexer, encoder, categorical_assembler,
                               numerical_assembler, standardScaler,
                               final_assembler, lr])

pipeline_rf = Pipeline(stages=[indexer, encoder, categorical_assembler,
                               numerical_assembler, standardScaler,
                               final_assembler, rf])

In [None]:
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 50]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [None]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")

crossval_lr = CrossValidator(estimator=pipeline_lr,
                             estimatorParamMaps=paramGrid_lr,
                             evaluator=evaluator,
                             numFolds=3)

crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=evaluator,
                             numFolds=3)

In [None]:
cv_model_lr = crossval_lr.fit(train_data)
cv_model_rf = crossval_rf.fit(train_data)

# Прогнозирование на тестовых данных
predictions_lr = cv_model_lr.transform(test_data)
predictions_rf = cv_model_rf.transform(test_data)

# Оценка моделей
rmse_lr = evaluator.evaluate(predictions_lr)
r2_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "r2"})
mae_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "mae"})

rmse_rf = evaluator.evaluate(predictions_rf)
r2_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "r2"})
mae_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "mae"})

In [None]:
print(f'LinearRegression - RMSE: {rmse_lr}, R^2: {r2_lr}, MAE: {mae_lr}')
print(f'RandomForest - RMSE: {rmse_rf}, R^2: {r2_rf}, MAE: {mae_rf}')

LinearRegression - RMSE: 70786.60253817277, R^2: 0.637843817946491, MAE: 50864.70418634916
RandomForest - RMSE: 51363.522028492414, R^2: 0.8093207838702373, MAE: 35280.34973507946


In [None]:
best_lr_model = cv_model_lr.bestModel.stages[-1]
print("Best hyperparameters for Linear Regression:")
print(f" - regParam: {best_lr_model._java_obj.getRegParam()}")
print(f" - elasticNetParam: {best_lr_model._java_obj.getElasticNetParam()}")

best_rf_model = cv_model_rf.bestModel.stages[-1]
print("Best hyperparameters for Random Forest:")
print(f" - numTrees: {best_rf_model.getNumTrees}")
print(f" - maxDepth: {best_rf_model.getMaxDepth()}")

Best hyperparameters for Linear Regression:
 - regParam: 1.0
 - elasticNetParam: 0.0
Best hyperparameters for Random Forest:
 - numTrees: 50
 - maxDepth: 15


### Без категориального столбца

In [None]:
learn_df = df.drop('ocean_proximity')

In [None]:
train_data, test_data = learn_df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

16560 4080


**Уникальные значения кат. столбцов в выборках совпадают**

In [None]:
numerical_cols = ['longitude',
                  'latitude',
                  'housing_median_age',
                  'total_rooms',
                  'total_bedrooms',
                  'population',
                  'households',
                  'median_income']
target = 'median_house_value'

In [None]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol="numerical_features")

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled",
                                withMean=True)

final_assembler = VectorAssembler(inputCols=['numerical_features_scaled'],
                                  outputCol="features")

In [None]:
lr = LinearRegression(labelCol=target, featuresCol='features')
rf = RandomForestRegressor(labelCol=target, featuresCol="features")

In [None]:
pipeline_lr = Pipeline(stages=[numerical_assembler, standardScaler, final_assembler, lr])
pipeline_rf = Pipeline(stages=[numerical_assembler, standardScaler, final_assembler, rf])

In [None]:
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 50]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [None]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")

crossval_lr = CrossValidator(estimator=pipeline_lr,
                             estimatorParamMaps=paramGrid_lr,
                             evaluator=evaluator,
                             numFolds=3)

crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=evaluator,
                             numFolds=3)

In [None]:
cv_model_lr = crossval_lr.fit(train_data)
cv_model_rf = crossval_rf.fit(train_data)

# Прогнозирование на тестовых данных
predictions_lr = cv_model_lr.transform(test_data)
predictions_rf = cv_model_rf.transform(test_data)

# Оценка моделей
rmse_lr = evaluator.evaluate(predictions_lr)
r2_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "r2"})
mae_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "mae"})

rmse_rf = evaluator.evaluate(predictions_rf)
r2_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "r2"})
mae_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "mae"})

In [None]:
print(f'LinearRegression - RMSE: {rmse_lr}, R^2: {r2_lr}, MAE: {mae_lr}')
print(f'RandomForest - RMSE: {rmse_rf}, R^2: {r2_rf}, MAE: {mae_rf}')

LinearRegression - RMSE: 71797.610620775, R^2: 0.6274249667117286, MAE: 51804.79138750769
RandomForest - RMSE: 53179.38648617812, R^2: 0.7956002259092048, MAE: 36882.26410052106


In [None]:
best_lr_model = cv_model_lr.bestModel.stages[-1]
print("Best hyperparameters for Linear Regression:")
print(f" - regParam: {best_lr_model._java_obj.getRegParam()}")
print(f" - elasticNetParam: {best_lr_model._java_obj.getElasticNetParam()}")

best_rf_model = cv_model_rf.bestModel.stages[-1]
print("Best hyperparameters for Random Forest:")
print(f" - numTrees: {best_rf_model.getNumTrees}")
print(f" - maxDepth: {best_rf_model.getMaxDepth()}")

Best hyperparameters for Linear Regression:
 - regParam: 1.0
 - elasticNetParam: 1.0
Best hyperparameters for Random Forest:
 - numTrees: 50
 - maxDepth: 15


## Выводы

**Удаление столбца с близостью жилья к океану негативно сказалось на метриках модели, что подтверждает гипотезу о том, что существует явная зависимость цены от этого показателя.**

**В процессе предобработки отсутствующие значения в столбце total_bedrooms  были заменены на медиану**

**Был составлен пайплайн для работы с признаками, в него вошли OneHotEncoder для кат. признака и StandartScaler для числовых**

**В результате кросс-валидации была выбрана модель Random forest с параметрами:**
- numTrees: 50
- maxDepth: 15

**Итоговые метрики модели:**
- **RMSE**: 51363.522028492414
- **R^2**: 0.8093207838702373
- **MAE**: 35280.34973507946