# California housing prices 
.
Versão com o PySpark do kernel [California housing prices](https://github.com/robson-rsp/datascience/blob/main/regression/california_housing_prices.ipynb). Este notebook foi escrito no ambiente do Databricks e exportado como arquivo IPython .ipynb. Os textos daqui são cópias dos originais do link.
---
A tarefa neste kernel é criar um modelo de Machine Learning que estime preços de imóveis no estado da Califórnia baseado em características, como:

*   latitude/longitude: Localização do imóvel
*   housing_median_age: Idade média dos imóveis naquela quadra
*   totalRooms: Número total de cômodos dentro de um quarteirão
*   totalBedrooms: Número total de quartos dentro de um quarteirão
*   population: Número total de pessoas que moram em uma quadra
*   households: Número total de habitações em uma quadra
*   medianIncome: Renda média das famílias de uma quadra (medida em dezenas de milhares de dólares americanos)
*   medianHouseValue: Valor médio dos imóveis de uma quadra (medido em dólares americanos)
*   oceanProximity: Localização do imóvel em relação ao oceano/mar

Obs. A média populacional desses quarteirões é de 600 até 3000 pessoas.

Fonte: [Kaggle](https://www.kaggle.com/datasets/camnugent/california-housing-prices)

In [0]:

housing = spark.read.format("csv") \
          .option("inferSchema", "True") \
          .option("header", "True") \
          .option("sep", ",") \
          .load("/FileStore/tables/housing.csv")

# EDA

In [0]:
housing.sample(fraction=0.01).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.26,37.83,52.0,1656.0,420.0,718.0,382.0,2.6768,182300.0,NEAR BAY
1,-122.25,37.81,39.0,854.0,242.0,389.0,228.0,3.1250,237500.0,NEAR BAY
2,-122.19,37.80,48.0,1694.0,259.0,610.0,238.0,4.7440,257300.0,NEAR BAY
3,-122.28,37.87,46.0,3022.0,696.0,1293.0,675.0,2.5430,220700.0,NEAR BAY
4,-122.14,37.71,27.0,3094.0,866.0,1364.0,789.0,2.6101,181700.0,NEAR BAY
...,...,...,...,...,...,...,...,...,...,...
190,-119.03,36.08,19.0,2471.0,431.0,1040.0,426.0,3.2500,80600.0,INLAND
191,-119.34,34.39,27.0,669.0,131.0,314.0,106.0,2.4659,231300.0,NEAR OCEAN
192,-119.27,34.28,52.0,2239.0,420.0,941.0,397.0,4.1250,349000.0,NEAR OCEAN
193,-119.26,34.27,36.0,1972.0,382.0,1029.0,411.0,3.7337,209000.0,NEAR OCEAN


In [0]:
print(f'shape={housing.count(), len(housing.columns)}')

shape=(20640, 10)


In [0]:
housing.summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,25%,-121.8,33.93,18.0,1447.0,296.0,787.0,280.0,2.5625,119600.0,
5,50%,-118.49,34.26,29.0,2127.0,435.0,1166.0,409.0,3.5347,179700.0,
6,75%,-118.01,37.71,37.0,3146.0,647.0,1724.0,605.0,4.7426,264700.0,
7,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Verificando a ocorrência de campos NaN.

In [0]:
for name in housing.columns:
    mask = housing[name].isNull()
    print(f'{name}: {housing.where(mask).count()}')

longitude: 0
latitude: 0
housing_median_age: 0
total_rooms: 0
total_bedrooms: 207
population: 0
households: 0
median_income: 0
median_house_value: 0
ocean_proximity: 0


# Data visualization

Os gráficos foram criados utilizando as ferramentas de visualização nativas do Databricks. O resultado não pode ser visto em um arquivo .ipynb

# Train/Test split

In [0]:
train_set, test_set = housing.randomSplit([0.8, 0.2])

# Feature engineering

In [0]:
from pyspark.ml            import Pipeline
from pyspark.ml.feature    import Imputer, StandardScaler, StringIndexer, VectorAssembler
from pyspark.sql.functions import *

Imports necessários para criar uma classe custom transformer.

In [0]:
from pyspark                 import keyword_only
from pyspark.ml              import Transformer
from pyspark.ml.param.shared import HasInputCols, HasOutputCol, Params
from pyspark.sql.functions   import udf

## Categorical features

**Etapa ##:** Atributos nominais: Codificar

In [0]:
stringindexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_')
#stringindexer_model = stringindexer.fit(train_set)
#train_set = stringindexer_model.transform(train_set)
#test_set  = stringindexer.transform(test_set)

## Numerical features

**Etapa ##:** NaN values: Imputar

In [0]:
imputer = Imputer(strategy='mean', inputCol='total_bedrooms', outputCol='total_bedrooms_')
#imputer_model = imputer.fit(train_set)
#train_set = imputer_model.transform(train_set)
#test_set  = imputer_model.transform(test_set)

**Etapa ##:** Os atributos 'total_rooms' e 'total_bedrooms' não fazem sentido se eu deixá-los da forma como estão. 'total_rooms' refere-se à quantidade total de cômodos que há em cada quadra. 'total_bedrooms' refere-se à quantidade total de quartos que há em cada quadra. Vou criar novos atributos a partir dos existentes. Vou fazer isso dentro de uma classe para usar a classe Pipeline.
1. **mean_bedrooms_households:** Quantidade média de quartos por habitação.
1. **mean_rooms_households:** Quantidade média de cômodos por habitação.
1. **mean_population_households:** Quantidade média de moradores por imóvel.

In [0]:
class MeanTransformer(Transformer, HasInputCols, HasOutputCol):
    @keyword_only
    def __init__(self, inputCols=None, outputCol=None):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
    
    @keyword_only
    def setParams(self, inputCols=None, outputCol=None):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def setInputCols(self, inputCols=None):
        self.setParams(inputCols=inputCols)

    def setOutputCol(self, outputCol):
        self.setParams(outputCol=outputCol)

    def _transform(self, dataframe):
        cols_in  = self.getInputCols()
        col_out = self.getOutputCol()
        dataframe = dataframe.withColumn(col_out, dataframe[cols_in[0]] / dataframe[cols_in[1]])
        return dataframe



meantransformer_bedrooms   = MeanTransformer(inputCols=['total_bedrooms_', 'households'], outputCol='mean_bedrooms_households')
meantransformer_rooms      = MeanTransformer(inputCols=['total_rooms', 'households'], outputCol='mean_rooms_households')
meantransformer_population = MeanTransformer(inputCols=['population', 'households'], outputCol='mean_population_households')

**Etapa ##:** Unir todas as colunas em uma única que será utilizada pelos modelos de Machine Learning do PySpark. Essa etapa dever ser feita antes da padronização dos atributos com StandardScaler.

In [0]:
cols = ['longitude', 'latitude', 'housing_median_age', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity_', 'total_bedrooms_', 'mean_bedrooms_households', 'mean_rooms_households', 'mean_population_households']

vectorassembler = VectorAssembler(inputCols=cols, outputCol='features')
#train_set = vectorassembler.transform(train_set)
#test_set  = vectorassembler.transform(test_set)

**Etapa ##:** Padronização dos atributos com StandardScaler.

In [0]:
standardscaler = StandardScaler(inputCol='features', outputCol='features_')
#standardscaler_model = standardscaler.fit(train_set)
#train_set = standardscaler_model.transform(train_set)
#test_set  = standardscaler_model.transform(test_set)

**Etapa final: Criação do pipeline de transformações.**

In [0]:
s = [stringindexer, imputer, meantransformer_bedrooms, meantransformer_rooms, meantransformer_population, vectorassembler, standardscaler]
pipe = Pipeline(stages=s)

pipe_model = pipe.fit(train_set)
train_set = pipe_model.transform(train_set)
test_set  = pipe_model.transform(test_set)

Selecionar X e y dos conjuntos de treino e teste. Vou renomear "X_" para "X"  e "median_house_value" para "y" assim os conjuntos ficarão de acordo com o padrão de nomenclatura.

In [0]:
train_set = train_set.select('features_', 'median_house_value')
test_set  = test_set.select('features_', 'median_house_value')

train_set = train_set.withColumnRenamed(existing='features_', new='features')
test_set  = test_set.withColumnRenamed(existing='features_', new='features')

train_set = train_set.withColumnRenamed(existing='median_house_value', new='label')
test_set  = test_set.withColumnRenamed(existing='median_house_value', new='label')

# Model training


In [0]:
from pyspark.ml.regression import GBTRegressor, LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning     import CrossValidator, ParamGridBuilder, TrainValidationSplit

## Baseline model
Como o PySpark não possui uma classe como DummyRegressor do Scikit Learn, vou criar o resultado que ela teria caso existisse. O padrão dela é prever sempre a média aritmética da label do conjunto de treinamento como sua previsão. Portanto é isso que farei, mas sem o uso de classes.

In [0]:
m = train_set.select(mean('label')).collect()[0][0] # média aritmética de y
test_set = test_set.withColumn('label_mean', lit(m))
test_set.show(5)

+--------------------+--------+------------------+
|            features|   label|        label_mean|
+--------------------+--------+------------------+
|[-62.114299111500...| 85800.0|206590.76259866424|
|[-62.079319216586...| 73200.0|206590.76259866424|
|[-62.059330705207...| 90100.0|206590.76259866424|
|[-62.049336449517...|128900.0|206590.76259866424|
|[-62.049336449517...| 81300.0|206590.76259866424|
+--------------------+--------+------------------+
only showing top 5 rows



In [0]:
re = RegressionEvaluator(labelCol='label', predictionCol='label_mean', metricName='mse')
print(f'mse: {re.evaluate(test_set)}')

mse: 13403085885.292728


## LinearRegression

In [0]:
lr = LinearRegression(featuresCol='features', labelCol='label', elasticNetParam=0.0)
lr_model = lr.fit(train_set)
y_pred = lr_model.transform(test_set)

re = RegressionEvaluator(labelCol='label', metricName='mse')
print(f'mse: {re.evaluate(y_pred)}')
y_pred.show(5)

mse: 6.368562421788888e-14
+--------------------+--------+------------------+------------------+
|            features|   label|        label_mean|        prediction|
+--------------------+--------+------------------+------------------+
|[-62.114299111500...| 85800.0|206590.76259866424| 85799.99999982974|
|[-62.079319216586...| 73200.0|206590.76259866424|  73199.9999999148|
|[-62.059330705207...| 90100.0|206590.76259866424| 90100.00000015456|
|[-62.049336449517...|128900.0|206590.76259866424|128900.00000014986|
|[-62.049336449517...| 81300.0|206590.76259866424| 81300.00000011758|
+--------------------+--------+------------------+------------------+
only showing top 5 rows



## RandomForestRegressor

In [0]:
rf = RandomForestRegressor(featuresCol='features', labelCol='label')
rf_model = rf.fit(train_set)
y_pred = rf_model.transform(test_set)

re = RegressionEvaluator(labelCol='label', metricName='mse')
print(f'mse: {re.evaluate(y_pred)}')
y_pred.show(5)

mse: 547733845.0808902
+--------------------+--------+------------------+------------------+
|            features|   label|        label_mean|        prediction|
+--------------------+--------+------------------+------------------+
|[-62.114299111500...| 85800.0|206590.76259866424| 88670.77434897746|
|[-62.079319216586...| 73200.0|206590.76259866424|  85596.8692974285|
|[-62.059330705207...| 90100.0|206590.76259866424|124801.03990763181|
|[-62.049336449517...|128900.0|206590.76259866424|134614.72171378962|
|[-62.049336449517...| 81300.0|206590.76259866424| 90471.62220277236|
+--------------------+--------+------------------+------------------+
only showing top 5 rows



## GBTRegressor

In [0]:
gb = GBTRegressor(featuresCol='features', labelCol='label')
gb_model = gb.fit(train_set)
y_pred = gb_model.transform(test_set)

re = RegressionEvaluator(labelCol='label', metricName='mse')
print(f'mse: {re.evaluate(y_pred)}')
y_pred.show(5)

mse: 32984045.136473138
+--------------------+--------+------------------+------------------+
|            features|   label|        label_mean|        prediction|
+--------------------+--------+------------------+------------------+
|[-62.114299111500...| 85800.0|206590.76259866424| 80768.99026902682|
|[-62.079319216586...| 73200.0|206590.76259866424| 70643.15084067232|
|[-62.059330705207...| 90100.0|206590.76259866424|  90629.8447474631|
|[-62.049336449517...|128900.0|206590.76259866424|130071.80141523496|
|[-62.049336449517...| 81300.0|206590.76259866424| 80768.99026902682|
+--------------------+--------+------------------+------------------+
only showing top 5 rows



### Hyperparameter tuning
Vou tentar melhorar um pouco o desempenho do modelo. Vou utilizar a técnica abaixo apenas para aprendizado e portfolio pois o desempenho da regressão linear é o suficiente.

In [0]:
gbt = GBTRegressor(featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder().addGrid(gbt.impurity, ['variance'])\
                              .addGrid(gbt.lossType, ['squared', 'absolute'])\
                              .addGrid(gbt.maxDepth, [5, 10, 20]).build()

crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='label', metricName='mse'),
                          numFolds=2)

#crossval_model = crossval.fit(train_set)

A classe CrossValidator é extremamente custosa mesmo em datasets pequenos como o usado neste notebook. Quem sabe seja porque estou usando a versão gratuíta do Databricks. Por isso vou usar a classe TrainValidationSplit que vai avaliar cada combinação de parâmetros apenas um vez não importando a quantidade de folds.

In [0]:
gbt = GBTRegressor(featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder().addGrid(gbt.impurity, ['variance'])\
                              .addGrid(gbt.lossType, ['squared', 'absolute']).build()

tvs = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           trainRatio=0.8)

tvs_model = tvs.fit(train_set)
y_pred = tvs_model.transform(test_set)

re = RegressionEvaluator(labelCol='label', metricName='mse')
print(f'mse: {re.evaluate(y_pred)}')
y_pred.show(5)

mse: 32984045.136473138
+--------------------+--------+------------------+------------------+
|            features|   label|        label_mean|        prediction|
+--------------------+--------+------------------+------------------+
|[-62.114299111500...| 85800.0|206590.76259866424| 80768.99026902682|
|[-62.079319216586...| 73200.0|206590.76259866424| 70643.15084067232|
|[-62.059330705207...| 90100.0|206590.76259866424|  90629.8447474631|
|[-62.049336449517...|128900.0|206590.76259866424|130071.80141523496|
|[-62.049336449517...| 81300.0|206590.76259866424| 80768.99026902682|
+--------------------+--------+------------------+------------------+
only showing top 5 rows



## Best model
O melhos modelo, de longe, é o LinearRegression.