# Preparação do ambiente Spark

In [None]:
!pip -qq install pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
    .master('local[*]')\
    .appName("Regressão com Spark")\
    .getOrCreate()

# Importação dos dados

A base de dados contém informações de 73.615 imóveis à venda no Rio de Janeiro. O arquivo é do tipo JSON as colunas estão aninhadas, como podemos verificar no schema:

In [None]:
!mkdir assets
! wget -qq -O ./assets/imoveis.json https://github.com/kamillafsilva/preditor_preco_imovel/blob/main/assets/imoveis.json?raw=true

In [None]:
raw = spark.read.option("multiline", "true").json('./assets/imoveis.json')

In [None]:
raw.show(5, truncate = False)

+--------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ident                     |listing                                                                                                                                                          |
+--------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{775564-BOJSMVON, Website}|{{Rio de Janeiro, {-22.909429, -43.413557}, Taquara, Zona Oeste}, {0, 0, 0, 1, 0, 62, 0, 0, 62}, {45000, {150, 0}}, {Outros, Residencial}}                       |
|{660895-AUENKNYY, Website}|{{Rio de Janeiro, {-22.869698, -43.509141}, Santíssimo, Zona Oeste}, {1, 2, 0, 1, 0, 0, 0, 0, 44}, {45000, {120, 0}}, {Apartamento, Residencial}}                |
|{751522-JESYFEQL, Website}|{{Rio de Janeiro,

In [None]:
raw.printSchema()

root
 |-- ident: struct (nullable = true)
 |    |-- customerID: string (nullable = true)
 |    |-- source: string (nullable = true)
 |-- listing: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- location: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- lon: double (nullable = true)
 |    |    |-- neighborhood: string (nullable = true)
 |    |    |-- zone: string (nullable = true)
 |    |-- features: struct (nullable = true)
 |    |    |-- bathrooms: long (nullable = true)
 |    |    |-- bedrooms: long (nullable = true)
 |    |    |-- floors: long (nullable = true)
 |    |    |-- parkingSpaces: long (nullable = true)
 |    |    |-- suites: long (nullable = true)
 |    |    |-- totalAreas: string (nullable = true)
 |    |    |-- unitFloor: long (nullable = true)
 |    |    |-- unitsOnTheFloor: long (nullable = true)
 |    |    |-- usableAreas: string (nullable =

Os campos de interesse para o modelo estão armazenados na `struct` ***listing***.

# Preparação dos dados

Como vimos acima, precisaremos extrair as informações de dentro da `struct` ***listing*** para trabalhar com essa base. As informações de endereço, por exemplo, podem ser extraídas com o comando `listing.address.*`:

In [None]:
from pyspark.sql import functions as f

In [None]:
raw = raw\
  .select('ident.customerID', 'listing.types.*', 'listing.features.*', 'listing.address.*', 'listing.prices.tax.condo', 'listing.prices.price')

Agora podemos analisar as colunas com mais facilidade

In [None]:
raw.show(5, truncate = False)

+---------------+-----------+-----------+---------+--------+------+-------------+------+----------+---------+---------------+-----------+--------------+------------------------+------------------------+----------+-----+-----+
|customerID     |unit       |usage      |bathrooms|bedrooms|floors|parkingSpaces|suites|totalAreas|unitFloor|unitsOnTheFloor|usableAreas|city          |location                |neighborhood            |zone      |condo|price|
+---------------+-----------+-----------+---------+--------+------+-------------+------+----------+---------+---------------+-----------+--------------+------------------------+------------------------+----------+-----+-----+
|775564-BOJSMVON|Outros     |Residencial|0        |0       |0     |1            |0     |62        |0        |0              |62         |Rio de Janeiro|{-22.909429, -43.413557}|Taquara                 |Zona Oeste|150  |45000|
|660895-AUENKNYY|Apartamento|Residencial|1        |2       |0     |1            |0     |0       

In [None]:
raw.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- usage: string (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- floors: long (nullable = true)
 |-- parkingSpaces: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- totalAreas: string (nullable = true)
 |-- unitFloor: long (nullable = true)
 |-- unitsOnTheFloor: long (nullable = true)
 |-- usableAreas: string (nullable = true)
 |-- city: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- condo: string (nullable = true)
 |-- price: string (nullable = true)



## Limpeza

Apenas o campo ***location*** que contem a latitude e longitude do imóvel ainda está como `struct`. Pensando na utilização final do modelo, não parece interessante exigir que o cliente forneça essa informação. Logo, podemos remover esse campo com o comando `drop`:  

In [None]:
raw  = raw.drop('location')

Somente 8 dos imóveis não são da cidade do Rio de Janeiro. Dado isso, podemos remover esse campo também:

In [None]:
raw\
  .select('city')\
  .groupBy('city')\
  .count()\
  .show()

+------------------+-----+
|              city|count|
+------------------+-----+
|      Cachoeirinha|    1|
|       São Gonçalo|    2|
|         Queimados|    3|
|São João de Meriti|    1|
|    Rio de Janeiro|73607|
|Armação dos Búzios|    1|
+------------------+-----+



In [None]:
raw  = raw.drop('city')

O campo ***neighborhood*** (vizinhança) possuí 158 categorias diferentes e o campo ***zone*** já fornece uma informação similar resumida em apenas 4 categorias (trataremos o valor faltante posteriormente). Optaremos então em utilizar somente a zona:

In [None]:
raw\
  .select('neighborhood')\
  .distinct()\
  .count()

158

In [None]:
raw\
  .select('zone')\
  .groupBy('zone')\
  .count()\
  .show()

+------------+-----+
|        zone|count|
+------------+-----+
|  Zona Norte|15191|
|  Zona Oeste|37116|
|Zona Central| 1921|
|    Zona Sul|19222|
|            |  165|
+------------+-----+



In [None]:
raw  = raw.drop('neighborhood')

O modelo será utilizado para clientes que buscam um imóvel para morar. No campo ***usage*** vemos que também temos imóveis comerciais nessa base. Podemos eliminar esses registros filtrando apenas os imóveis residenciais com o comando `where`:

In [None]:
raw\
  .select('usage')\
  .groupBy('usage')\
  .count()\
  .show()

+-----------+-----+
|      usage|count|
+-----------+-----+
|  Comercial| 4019|
|Residencial|69596|
+-----------+-----+



In [None]:
raw = raw\
  .where("usage == 'Residencial'")\
  .drop('usage')

Similarmente, o campo ***unit*** possuí uma categoria "Outros" além de "Apartamento" e "Casa" para i tipo do imóvel. Manteremos na base apenas as duas últimas categorias:

In [None]:
raw\
  .select('unit')\
  .groupBy('unit')\
  .count()\
  .show()

+-----------+-----+
|       unit|count|
+-----------+-----+
|     Outros| 1190|
|Apartamento|59106|
|       Casa| 9300|
+-----------+-----+



In [None]:
raw = raw\
  .where("unit != 'Outros'")

## Dados faltantes

Acima já identificamos que o campo ***zone*** possuí dados faltantes, mas outras informações também podem ter esse problema. Antes de fazer essa verificação, precisamos ajustar o tipo de algumas colunas. Os campos ***price***, ***condo***, ***iptu***, ***totalAreas*** e ***usableAreas*** estão como `string` mas são dados numéricos. Podemos converter esses dados aplicando a função `cast`:

In [None]:
from pyspark.sql.types import DoubleType, StringType

In [None]:
raw = raw\
  .withColumn('usableAreas', raw['usableAreas'].cast(DoubleType()))\
  .withColumn('totalAreas', raw['totalAreas'].cast(DoubleType()))\
  .withColumn('price', raw['price'].cast(DoubleType()))\
  .withColumn('condo', raw['condo'].cast(DoubleType()))

Feito o ajuste dos tipos, podemos contar a quantidade de dados faltantes na colunas numéricas utilizando as funções `isnull` e `isnan`:

In [None]:
num = [f.name for f in raw.schema.fields if not isinstance(f.dataType, StringType)] #campos numéricos

In [None]:
raw\
  .select([f.count(f.when(f.isnull(c) | f.isnan(c), 1)).alias(c) for c in num])\
  .show()

+---------+--------+------+-------------+------+----------+---------+---------------+-----------+-----+-----+
|bathrooms|bedrooms|floors|parkingSpaces|suites|totalAreas|unitFloor|unitsOnTheFloor|usableAreas|condo|price|
+---------+--------+------+-------------+------+----------+---------+---------------+-----------+-----+-----+
|        0|       0|     0|            0|     0|         0|        0|              0|          0| 5821|    0|
+---------+--------+------+-------------+------+----------+---------+---------------+-----------+-----+-----+



O campo ***condo*** apresentam valores faltantes. Para utilizar esses dados no modelo precisaremos imputá-los ou descartá-los na hora do ajuste. Outra verificação interessante é a contagem de valores zerados nos campos númericos:

In [None]:
raw\
  .select([f.count(f.when(f.col(c) == 0, 1)).alias(c) for c in num])\
  .show()

+---------+--------+------+-------------+------+----------+---------+---------------+-----------+-----+-----+
|bathrooms|bedrooms|floors|parkingSpaces|suites|totalAreas|unitFloor|unitsOnTheFloor|usableAreas|condo|price|
+---------+--------+------+-------------+------+----------+---------+---------------+-----------+-----+-----+
|        0|      40| 53534|        10478| 19701|      6357|    50651|          55421|          0| 5636|    0|
+---------+--------+------+-------------+------+----------+---------+---------------+-----------+-----+-----+



Identificamos 40 imóveis sem quartos e também que os campos ***floors***, ***unitFloor*** e ***unitsOnTheFloor*** possuem mais de 70% dos valores iguais a zero. Manteremos na base os registros que possuírem pelo menos um quarto ou uma suíte e removeremos as colunas ***floors***, ***unitFloor*** e ***unitsOnTheFloor*** .



In [None]:
raw = raw\
  .where('bedrooms > 0 or suites > 0')\
  .drop('floors', 'unitFloor', 'unitsOnTheFloor', 'totalAreas')

Para os campos categóricos verificamos a presença de valores nulos ou vazios (`f.col(c) == ''`):

In [None]:
cat = [f.name for f in raw.schema.fields if isinstance(f.dataType, StringType)] #campos categóricos

In [None]:
raw\
  .select([f.count(f.when(f.isnull(c) | (f.col(c) == ''), 1)).alias(c) for c in cat])\
  .show()

+----------+----+----+
|customerID|unit|zone|
+----------+----+----+
|         0|   0| 153|
+----------+----+----+



Identicamos então que os campos ***iptu***, ***condo*** e ***zone*** possuem dados faltantes. Dado que poucos imóveis estão sem zona, iremos eliminar esses registros:

In [None]:
raw = raw.where(f.col('zone') != '')

terminada esta etapa, salvamos a basa de dados processada:

In [None]:
raw = raw.withColumnRenamed('price', 'target')

In [None]:
raw.write.parquet(path = './assets/processed')

# Construção do modelo

## Pré-processamento dos dados

In [None]:
processed = spark.read.parquet('./assets/processed')

In [None]:
processed.show(5)

+---------------+-----------+---------+--------+-------------+------+-----------+------------+-----+-------+
|     customerID|       unit|bathrooms|bedrooms|parkingSpaces|suites|usableAreas|        zone|condo| target|
+---------------+-----------+---------+--------+-------------+------+-----------+------------+-----+-------+
|660895-AUENKNYY|Apartamento|        1|       2|            1|     0|       44.0|  Zona Oeste|120.0|45000.0|
|568886-ZIBFOMCC|Apartamento|        2|       3|            1|     1|       60.0|  Zona Oeste|400.0|50000.0|
|792086-NWNQTDYL|Apartamento|        1|       1|            1|     0|       33.0|  Zona Norte|  0.0|45336.0|
|951104-MACIAPIS|Apartamento|        2|       3|            1|     1|       70.0|Zona Central|350.0|45000.0|
|375665-GHMFEZXX|Apartamento|        2|       3|            1|     1|       70.0|Zona Central|350.0|50000.0|
+---------------+-----------+---------+--------+-------------+------+-----------+------------+-----+-------+
only showing top 5 

In [None]:
processed.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- parkingSpaces: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- usableAreas: double (nullable = true)
 |-- zone: string (nullable = true)
 |-- condo: double (nullable = true)
 |-- target: double (nullable = true)



### Imputação simples

Para os campos númericos imputaremos os valores faltantes com a mediana por ser uma estatística mais robusta

In [None]:
from pyspark.ml.feature import Imputer

In [None]:
cols = ['condo']
imputer = Imputer(
    inputCols = cols,
    outputCols = ['{}_imputed'.format(a) for a in cols]
).setStrategy('median')

In [None]:
imputed = imputer \
          .fit(processed)\
          .transform(processed)\
          .drop(*cols)

In [None]:
num = [f.name for f in imputed.schema.fields if not isinstance(f.dataType, StringType)] #campos numéricos

cat = [f.name for f in imputed.schema.fields if isinstance(f.dataType, StringType)] #campos categóricos
cat.remove('customerID')

### Transformação logarítmica

A tabela abaixo mostra as estatísticas de resumo dos campos numéricos:

In [None]:
imputed\
  .select(*num)\
  .summary('min', 'mean', 'max')\
  .toPandas()

Unnamed: 0,summary,bathrooms,bedrooms,parkingSpaces,suites,usableAreas,target,condo_imputed
0,min,1.0,0.0,0.0,0.0,10.0,45000.0,0.0
1,mean,2.451725452253203,2.724733925587123,1.4407306418037351,1.1460990412525287,139.19273169730553,1240157.4305860966,3402.438590318703
2,max,22.0,30.0,144.0,13.0,1911.0,10000000.0,9900000.0


Esses campos apresentam assimetria a direta (i.e. possuem valores muito superiores a média). Temos, por exemplo, um condomínio de R\$ 9,9 milhões quando a média é R\$ 3402,44. Há casos inclusive em que o valor do condomínio é superior ao valor do imóvel:

In [None]:
imputed\
  .where('condo_imputed > target')\
  .show(5)

+---------------+-----------+---------+--------+-------------+------+-----------+----------+--------+-------------+
|     customerID|       unit|bathrooms|bedrooms|parkingSpaces|suites|usableAreas|      zone|  target|condo_imputed|
+---------------+-----------+---------+--------+-------------+------+-----------+----------+--------+-------------+
|557028-JWYITRUF|Apartamento|        1|       2|            1|     1|       43.0|Zona Oeste|120000.0|     260000.0|
|002068-FPTMQOYX|Apartamento|        1|       2|            1|     0|       55.0|Zona Norte|130000.0|     320000.0|
|500202-FYBDZFBP|Apartamento|        1|       2|            1|     0|       60.0|Zona Oeste|158000.0|     320000.0|
|151777-KDMYFGJE|Apartamento|        1|       1|            1|     0|       36.0|Zona Oeste|170000.0|     390000.0|
|406414-PYQFKQZT|Apartamento|        1|       1|            1|     0|       45.0|Zona Norte|179000.0|     416000.0|
+---------------+-----------+---------+--------+-------------+------+---

Vamos assumir aqui que o público alvo da aplicação final tem interesse em imóveis de até 2 milhões e condomínio máximo de 10 mil. Fazendo isso controlaremos o efeito negativo de campos com valores muito distantes da média:

In [None]:
imputed\
  .where('target <= 2000000 and condo_imputed <= 10000 and condo_imputed < target')\
  .count()

57095

In [None]:
imputed = imputed\
  .where('target <= 2000000 and condo_imputed <= 10000 and condo_imputed < target')

In [None]:
imputed\
  .select(*num)\
  .summary('min', 'mean', 'max')\
  .toPandas()

Unnamed: 0,summary,bathrooms,bedrooms,parkingSpaces,suites,usableAreas,target,condo_imputed
0,min,1.0,0.0,0.0,0.0,10.0,45000.0,0.0
1,mean,2.138365881425694,2.509554251685787,1.206515456694982,0.8886242227865838,104.5622558893073,742033.417917506,792.5891934495139
2,max,20.0,30.0,144.0,13.0,1650.0,2000000.0,8630.0


Feita essa restrição, aplicaremos a transformação `log` para suavizar a assimetria desses dados, inclusive na variável resposta (***target***)

In [None]:
transformed = imputed
for col in num:
  transformed = transformed.withColumn(col, f.log(imputed[col] + 1).alias(col))

Usaremos 70% dos imovéis como treino e o restante será usado para testar o modelo

In [None]:
treino, teste = transformed.randomSplit([0.7, 0.3], seed = 42)

### OneHot Encoding, Vetorização e Escalonamento

Nessa etapa fizemos a binarização dos campos categóricos usado o *one-hot-encoding* e transformamos todos os campos de entrada em um único vetor com o `VectorAssembler`. Esse ultimo passo é obrigatório para utilizar os modelos do `pyspark`

Os campos numéricos possuem diferentes escalas o que pode introduzir viés no modelo. Usaremos o método Min-Max para o escalonamento, fazendo com que todos esses campos variem entre 0 e 1.  

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

In [None]:
features = num.copy()
features.remove('target')

In [None]:
indexer = StringIndexer(inputCols=cat, outputCols=['{}_indexed'.format(a) for a in cat])
encoder = OneHotEncoder(inputCols=indexer.getOutputCols(), outputCols=['{}_onehot'.format(a) for a in cat])
assembler = VectorAssembler(inputCols = features + encoder.getOutputCols(), outputCol = 'features')
minmax = MinMaxScaler(inputCol='features', outputCol='features_minmax')

Essas transformações podem ser organizadas numa *pipeline* para aplicar todas as etapas de uma vez só

In [None]:
pipeline = Pipeline(stages=[indexer, encoder, assembler, minmax])
pipeline_fitted = pipeline.fit(treino)

In [None]:
treino_minmax = pipeline_fitted.transform(treino).select('features_minmax', 'target')

O novo campo ***features*** é um vetor contando todas os variáveis de entrada do modelo

## Treinando modelos Random Forest

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd

O desempenho dos modelos será avaliado com as métricas $R^{2}$ e $RSME$. A função `regression_report` retorna essas métricas e as predições do modelo.

In [None]:
def regression_report(model, data):
  label = model.getLabelCol()

  r2_evaluator = RegressionEvaluator(labelCol = label, predictionCol="prediction",metricName=  'r2')
  rmse_evaluator = RegressionEvaluator(labelCol = label, predictionCol="prediction",metricName=  'rmse')

  preds = model.transform(data)

  preds_inverse = preds
  for col in ['target', 'prediction']:
    preds_inverse = preds_inverse.withColumn(col, f.round(f.exp(preds[col]) - 1, 2).alias(col))

  return [preds_inverse, {'r2':r2_evaluator.evaluate(preds_inverse), 'rmse':rmse_evaluator.evaluate(preds_inverse)}]

In [None]:
rf = RandomForestRegressor(labelCol = 'target', featuresCol='features_minmax', seed = 42)
rf_fitted = rf.fit(treino_minmax)

In [None]:
rf_report = regression_report(rf_fitted, treino_minmax)[1]
print(rf_report)

{'r2': 0.6876206019273026, 'rmse': 259605.7937610618}


### Ajuste de hiperparâmetro com `hyperopt`

Com a biblioteca `hyperopt` conseguimos fazer uma busca estocástica pelos melhores hiperparâmetros do modelo Random Forest. A função `best_hyperparam` retorna a combinação dos parâmetros `maxDepth`, `maxBins` e `numTrees` que resulta no menor valor de $RMSE$ dentro de espaço fornecido para busca (`search_space`)

In [None]:
from hyperopt import tpe, hp, fmin, STATUS_OK,Trials
from hyperopt.early_stop import no_progress_loss

In [None]:
def objective_function(search_space):

    estimator = RandomForestRegressor(labelCol="target", featuresCol='features_minmax', **search_space)
    rmse_evaluator = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName=  'rmse')

    model = estimator.fit(treino_minmax)
    preds = model.transform(treino_minmax)
    rmse = rmse_evaluator.evaluate(preds)
    return {"loss": rmse, "status": STATUS_OK}


In [None]:
search_space = {"maxDepth": hp.quniform("maxDepth",5, 20, 1),
                "maxBins": hp.quniform("maxBins",10, 50, 1),
                "numTrees": hp.quniform("numTrees",10, 50, 1)}
trials = Trials()

In [None]:
best_hyperparam = fmin(
    fn = objective_function,
    space = search_space,
    algo = tpe.suggest,
    max_evals = 30,
    trials = trials,
    early_stop_fn = no_progress_loss(10)
    )

 67%|██████▋   | 20/30 [16:39<08:19, 50.00s/trial, best loss: 0.21161180267402033]


In [None]:
print("Best: {}".format(best_hyperparam))

Best: {'maxBins': 46.0, 'maxDepth': 19.0, 'numTrees': 48.0}


Agora podemos utilizar os valores dos hiperparâmetros encontrados para ajustar o novo modelo

In [None]:
rf_hopt = RandomForestRegressor(labelCol="target", featuresCol='features_minmax', **best_hyperparam)
rf_hopt_fitted = rf_hopt.fit(treino_minmax)

In [None]:
rf_hopt_report = regression_report(rf_hopt_fitted, treino_minmax)[1]
print(rf_hopt_report)

{'r2': 0.8891194656230331, 'rmse': 154668.13325665452}


O modelo otimizado teve um desempenho melhor, mas o erro continua alto

In [None]:
pd.DataFrame([rf_report, rf_hopt_report], index = ['rf', 'rf_hopt']).round(3)

Unnamed: 0,r2,rmse
rf,0.688,259605.794
rf_hopt,0.889,154668.133


O desempenho um pouco pior no conjunto teste é esperado, ainda assim o modelo otimizado foi superior

In [None]:
teste_minmax = pipeline_fitted.transform(teste).select('features_minmax', 'target')
rf_report_teste = regression_report(rf_fitted, teste_minmax)[1]
rf_hopt_report_teste = regression_report(rf_hopt_fitted, teste_minmax)[1]

In [None]:
pd.DataFrame([rf_report_teste, rf_hopt_report_teste], index = ['rf', 'rf_hopt']).round(2)

Unnamed: 0,r2,rmse
rf,0.68,264873.43
rf_hopt,0.79,215736.42


### Importância das variáveis

O atributo `featureImportances` contem a importância estimada de cada variável dentro do modelo

In [None]:
string_index = pipeline_fitted.stages[0].labelsArray
labels = []
for item in string_index:
  labels = labels + list(item)[:-1]

all_features = features + labels

As variáveis mais importantes foram a área útil do imóvel e o valor do condomínio. Entre as categóricas, a informação mais relevante foi a do imóvel estar localizado na Zona Sul

In [None]:
importances = rf_hopt_fitted.featureImportances
dict_importances = {}
for idx, importance in enumerate(importances):
    dict_importances[all_features[idx]] = round(importance, 2)

pd.DataFrame([dict_importances])

Unnamed: 0,bathrooms,bedrooms,parkingSpaces,suites,usableAreas,condo_imputed,Apartamento,Zona Oeste,Zona Norte,Zona Sul
0,0.06,0.08,0.05,0.08,0.31,0.26,0.01,0.01,0.02,0.11


## Exportando o modelo

Tanto o modelo quanto a pipeline de pré-processamento podem ser salvos para utilizarmos posteriormente

In [None]:
final_pipeline = Pipeline(stages= [indexer, encoder, assembler, minmax])
final_pipeline_fitted = final_pipeline.fit(treino)
final_pipeline_fitted.save('pipeline_fitted')

In [None]:
rf_hopt_fitted.save('rf_fitted')

In [None]:
#!zip -qq -r ./pipeline.zip ./pipeline_fitted
#!zip -qq -r ./model.zip ./rf_fitted

# Colocando o modelo em produção

Nessa seção apresentamos duas opções para que usuários tenham acesso ao modelo desenvolvido. A primeira é um aplicativo web utilizando a biblioteca Gradio e a segunda é uma API construída com a FastAPI. Aqui as duas soluções rodarão localmente mas ambas podem ser implementadas numa plataforma de nuvem, por exemplo.

## Importação do modelo

O primeiro passo é importar o modelo e a pipeline que foram construídos e salvos anteriormente

In [None]:
! wget -qq -O model.zip https://github.com/kamillafsilva/preditor_preco_imovel/blob/main/assets/model.zip?raw=true
! wget -qq -O pipeline.zip https://github.com/kamillafsilva/preditor_preco_imovel/blob/main/assets/pipeline.zip?raw=true

In [None]:
!unzip -qq model.zip
!unzip -qq pipeline.zip

## Criando um aplicativo web com o Gradio

In [None]:
!pip -qq install pyspark gradio

In [None]:
from pyspark.sql import SparkSession

from pyspark.sql import functions as f
from pyspark.sql.types import StringType

from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml import PipelineModel

import gradio as gr

Na construção do aplicativo precisamos passar como parâmetro uma função que recebe as caracteríticas do imóvel desejado pelo usuário e retorna o valor estimado desse imóvel.

In [None]:
def predict(unit, bathrooms, bedrooms, parkingSpaces, suites, usableAreas, zone, condo):

  spark = SparkSession.builder\
    .master('local[*]')\
    .appName("Deploy com Gradio")\
    .getOrCreate()

  loaded_pipeline = PipelineModel.load('pipeline_fitted')
  loaded_model = RandomForestRegressionModel.load('rf_fitted')

  input =  [(unit, bathrooms, bedrooms, parkingSpaces, suites, usableAreas, zone, condo, 0)]
  col_names = ['unit', 'bathrooms', 'bedrooms', 'parkingSpaces', 'suites', 'usableAreas', 'zone', 'condo_imputed', 'target']
  x = spark.createDataFrame(data = input, schema = col_names)

  num = [f.name for f in x.schema.fields if not isinstance(f.dataType, StringType)]

  x_transformed = x
  for col in num:
    x_transformed = x_transformed.withColumn(col, f.log(x[col] + 1).alias(col))

  x_scaled = loaded_pipeline.transform(x).select('features_minmax', 'target')
  pred = loaded_model.transform(x_scaled)

  inv_pred = pred
  inv_pred = pred.withColumn('prediction', (f.exp(pred['prediction']) - 1).alias('inv_prediction'))

  y = round(inv_pred.collect()[0][2], 0)

  spark.stop()

  return y

Além da função `predict`, também passamos para a classe `Interface` os dados de entrada do modelo (`inputs`) especificando o tipo de cada um. Por fim específicamos o tipo de retorno esperado (`outputs`), nesse caso, o retorno da função `predict` é um valor númerico (preço do imóvel)

In [None]:
demo = gr.Interface(title=  'Preditor de preço de imóveis',
    fn=predict,
    inputs=[gr.Dropdown(['Apartamento', 'Casa'], label='Tipo do imóvel'),
        gr.Slider(minimum=0, maximum= 50, step=1, label='Quantidade de banheiros'),
        gr.Slider(minimum=0, maximum= 50, step=1, label='Quantidade de quartos'),
        gr.Slider(minimum=0, maximum= 50, step=1, label='Quantidade de vagas de garagem'),
        gr.Slider(minimum=0, maximum= 50, step=1, label='Quantidade de suítes'),
        gr.Slider(minimum=0, maximum= 500, step=5, label='Área útil do imóvel'),
        gr.Dropdown(['Zona Oeste', 'Zona Norte', 'Zona Sul', 'Zona Central'], label='Zona de localização do imóvel'),
        gr.Slider(minimum=0, maximum= 10000, step=250, label='Valor do condomínio'),
    ],
    outputs=gr.Number(label='Valor estimado do imóvel')
)

O método `lauch` abre o aplicativo criado, nele os usuário podem informar as caracteríticas do imóvel e ver o valor estimado clicando no botão ***Submit***

In [None]:
demo.launch(share= True )

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://dc17d9457f2942472c.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




## Criando uma API com a biblioteca FastAPI

In [None]:
!pip -qq install pyspark fastapi uvicorn nest-asyncio pyngrok

In [None]:
import nest_asyncio
from pyngrok import ngrok
import uvicorn

Para criação da API precisamos instanciar um objeto da classe **`FastAPI`** e sobrescrever um de seus métodos para criação de *endpoints*. O método `post` foi sobrescrito com uma nova função `predict` similar a utilizada no Gradio mas que recebe como parâmetro um objeto com a seguinte estrutura (exemplo):

```
{
  "unit": "Apartamento",
  "bathrooms": 0,
  "bedrooms": 0,
  "parkingSpaces": 0,
  "suites": 0,
  "usableAreas": 0,
  "zone": "Zona Oeste",
  "condo": 0
}
```

O arquivo **main.py** contem essa implementação da API






In [None]:
! wget -qq -O ./main.py https://github.com/kamillafsilva/preditor_preco_imovel/blob/main/main.py?raw=true

Para testar a API criada precisamos rodá-la num servidor web local. O pacote `uvicorn` já nos permite isso, mas no Colab precisaremos também do pacote `pyngrok` para expor esse servidor local na Internet. O uso do `pyngrok` requer um *token* de autenticação, obtido nesse [link](https://dashboard.ngrok.com/get-started/your-authtoken) após a criação de uma conta. A autenticação com o *token* gerado é feita com o código abaixo:

In [None]:
!ngrok config add-authtoken "YOUR_TOKEN"

Agora podemos iniciar o servidor da API e esperar que o `uvicorn` informe que aplicação está pronta ("*Application startup complete*"). Finalmente, a API criada pode ser acessada na ***FastAPI Interface url*** fornecida pelo `pyngrok`

In [None]:
ngrok_tunnel = ngrok.connect('8000')
print('FastAPI Interface url:', ngrok_tunnel.public_url + '/docs')
nest_asyncio.apply()

uvicorn.run('main:app', port=8000)