In [1]:
import findspark

import pandas as pd
from sqlalchemy import create_engine

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr, round, when, date_format, count
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, RobustScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import warnings
warnings.filterwarnings('ignore')

https://sparkbyexamples.com/

#### Iniciando sessão spark

In [2]:
findspark.init()
spark = SparkSession.builder.master('local').appName('Projeto').getOrCreate()

#### Carregando dados do PostgreSQL

In [3]:
conexao = create_engine('postgresql://postgres:123456@localhost/king_county')
df_pandas = pd.read_sql_query('SELECT * FROM df_king', conexao)

#### Criando um Spark DataFrame

In [4]:
df_spark = spark.createDataFrame(df_pandas)

#### Avaliando os tipos de cada DataFrame

In [5]:
print(type(df_pandas))
print(type(df_spark))

<class 'pandas.core.frame.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


#### Renomeando colunas

In [6]:
novas_colunas =  ['id', 'data', 'preco', 'quartos', 'banheiros', 'm2_interior', 'm2_espaco_completo', 'numero_andares',
                 'vista_orla', 'qualidade_vista', 'qualidade_imovel', 'qualidade_design', 'm2_interior_acima_solo',
                 'm2_interior_abaixo_solo', 'ano_construcao', 'ano_ultima_renovacao', 'zipcode', 'lat', 'long',
                 'm2_interior_15_vizinhos', 'm2_espaco_completo_15_vizinhos']

df_spark = df_spark.toDF(*novas_colunas)

#### Formatando colunas de ano

In [7]:
df_spark = df_spark.withColumn('ano_construcao', col('ano_construcao').cast('Integer')) \
                               .withColumn('ano_ultima_renovacao', col('ano_ultima_renovacao').cast('Integer'))

In [8]:
df_spark.printSchema()

root
 |-- id: string (nullable = true)
 |-- data: date (nullable = true)
 |-- preco: double (nullable = true)
 |-- quartos: double (nullable = true)
 |-- banheiros: double (nullable = true)
 |-- m2_interior: double (nullable = true)
 |-- m2_espaco_completo: double (nullable = true)
 |-- numero_andares: double (nullable = true)
 |-- vista_orla: double (nullable = true)
 |-- qualidade_vista: double (nullable = true)
 |-- qualidade_imovel: double (nullable = true)
 |-- qualidade_design: double (nullable = true)
 |-- m2_interior_acima_solo: double (nullable = true)
 |-- m2_interior_abaixo_solo: double (nullable = true)
 |-- ano_construcao: integer (nullable = true)
 |-- ano_ultima_renovacao: integer (nullable = true)
 |-- zipcode: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- m2_interior_15_vizinhos: double (nullable = true)
 |-- m2_espaco_completo_15_vizinhos: double (nullable = true)



#### Removendo observações com mais de 8 quartos

In [9]:
print('Antes da realizar a filtragem!')
print(f'A quantidade de linhas no DataFrame é de: {df_spark.count()}')
print(f'A quantidade de colunas no DataFrame é de: {len(df_spark.columns)}')

Antes da realizar a filtragem!
A quantidade de linhas no DataFrame é de: 21613
A quantidade de colunas no DataFrame é de: 21


In [10]:
df_spark = df_spark.where(df_spark.quartos < 9)

In [11]:
print('Após realizar a filtragem!')
print(f'A quantidade de linhas no DataFrame é de: {df_spark.count()}')
print(f'A quantidade de colunas no DataFrame é de: {len(df_spark.columns)}')

Após realizar a filtragem!
A quantidade de linhas no DataFrame é de: 21602
A quantidade de colunas no DataFrame é de: 21


#### Criando a coluna de tamanho do imóvel

In [12]:
df_spark = df_spark.withColumn('tamanho_imovel_completo', round(expr('preco / m2_espaco_completo'), 2))

In [13]:
df_spark.select('tamanho_imovel_completo').show(4)

+-----------------------+
|tamanho_imovel_completo|
+-----------------------+
|                  39.27|
|                  74.29|
|                   18.0|
|                  120.8|
+-----------------------+
only showing top 4 rows



#### Transformando a coluna ano de construção

In [14]:
df_spark = df_spark.withColumn('ano_construcao', when(col('ano_construcao') <= 1925, 1)
                               .when((col('ano_construcao') > 1925) & (col('ano_construcao') <= 1950), 2)
                               .when((col('ano_construcao') > 1950) & (col('ano_construcao') <= 1975), 3)
                               .otherwise(4))

In [15]:
df_spark.select('ano_construcao').show(4)

+--------------+
|ano_construcao|
+--------------+
|             3|
|             3|
|             2|
|             3|
+--------------+
only showing top 4 rows



In [16]:
def obtendo_dummies(spark_dataframe, coluna):
    
    # Obtendo as categorias de uma coluna
    categorias = spark_dataframe.select(coluna).distinct().rdd.flatMap(lambda x : x).collect()
    categorias.sort()

    # Obtendo as variáveis dummies
    df_alterado = spark_dataframe.select('*')

    for categoria in categorias:
        funcao = udf(lambda item: 1 if item == categoria else 0, IntegerType())
        nova_coluna = coluna + '_' + str(categoria).replace('.', '_')
        df_alterado = df_alterado.withColumn(nova_coluna, funcao(col(coluna)))
        
    # Deletando coluna utilizada
    df_alterado = df_alterado.drop(coluna)
    
    return df_alterado

In [17]:
df_spark = obtendo_dummies(df_spark, 'ano_construcao')

In [18]:
df_spark.columns[-10:]

['zipcode',
 'lat',
 'long',
 'm2_interior_15_vizinhos',
 'm2_espaco_completo_15_vizinhos',
 'tamanho_imovel_completo',
 'ano_construcao_1',
 'ano_construcao_2',
 'ano_construcao_3',
 'ano_construcao_4']

#### Transformando coluna ano da última reforma

In [19]:
df_spark = df_spark.withColumn('ano_ultima_renovacao', when(col('ano_ultima_renovacao') <= 0, 0)
                               .when((col('ano_ultima_renovacao') > 0) & (col('ano_ultima_renovacao') <= 1970), 1)
                               .when((col('ano_ultima_renovacao') > 1970) & (col('ano_ultima_renovacao') <= 2000), 2)
                               .otherwise(3))

In [20]:
df_spark.select('ano_ultima_renovacao').show(4)

+--------------------+
|ano_ultima_renovacao|
+--------------------+
|                   0|
|                   2|
|                   0|
|                   0|
+--------------------+
only showing top 4 rows



In [21]:
df_spark = obtendo_dummies(df_spark, 'ano_ultima_renovacao')

In [22]:
df_spark.columns[-10:]

['m2_espaco_completo_15_vizinhos',
 'tamanho_imovel_completo',
 'ano_construcao_1',
 'ano_construcao_2',
 'ano_construcao_3',
 'ano_construcao_4',
 'ano_ultima_renovacao_0',
 'ano_ultima_renovacao_1',
 'ano_ultima_renovacao_2',
 'ano_ultima_renovacao_3']

#### Obtendo coluna com dia da semana

In [23]:
df_spark.select('data').show(2)

+----------+
|      data|
+----------+
|2014-10-13|
|2014-12-09|
+----------+
only showing top 2 rows



In [24]:
df_spark = df_spark.withColumn('dia_da_semana', date_format(col('data'), 'EEEE'))

In [25]:
df_spark.select(['data', 'dia_da_semana']).show(6)

+----------+-------------+
|      data|dia_da_semana|
+----------+-------------+
|2014-10-13|       Monday|
|2014-12-09|      Tuesday|
|2015-02-25|    Wednesday|
|2014-12-09|      Tuesday|
|2015-02-18|    Wednesday|
|2014-05-12|       Monday|
+----------+-------------+
only showing top 6 rows



#### Obtendo dummies das seguintes colunas

In [26]:
colunas = ['qualidade_imovel', 'qualidade_vista', 'dia_da_semana']

for item in colunas:
    df_spark = obtendo_dummies(df_spark, item)

In [27]:
df_spark.columns[-20:]

['ano_ultima_renovacao_1',
 'ano_ultima_renovacao_2',
 'ano_ultima_renovacao_3',
 'qualidade_imovel_1_0',
 'qualidade_imovel_2_0',
 'qualidade_imovel_3_0',
 'qualidade_imovel_4_0',
 'qualidade_imovel_5_0',
 'qualidade_vista_0_0',
 'qualidade_vista_1_0',
 'qualidade_vista_2_0',
 'qualidade_vista_3_0',
 'qualidade_vista_4_0',
 'dia_da_semana_Friday',
 'dia_da_semana_Monday',
 'dia_da_semana_Saturday',
 'dia_da_semana_Sunday',
 'dia_da_semana_Thursday',
 'dia_da_semana_Tuesday',
 'dia_da_semana_Wednesday']

#### Removendo colunas que não serão utilizadas na etapa de ML

In [28]:
df_spark = df_spark.drop('id', 'data', 'm2_interior_acima_solo', 'm2_interior_abaixo_solo',
                       'zipcode', 'lat', 'long', 'm2_espaco_completo_15_vizinhos')

In [29]:
df_spark.columns[:10]

['preco',
 'quartos',
 'banheiros',
 'm2_interior',
 'm2_espaco_completo',
 'numero_andares',
 'vista_orla',
 'qualidade_design',
 'm2_interior_15_vizinhos',
 'tamanho_imovel_completo']

#### Separando em treino e teste

In [30]:
train_df, test_df = df_spark.randomSplit([0.7, 0.3], seed = 8)

#### Verificando o tamanho dos dados de treino e teste

In [31]:
print(f'O tamanho dos dados de treino é: {train_df.count()}!')
print(f'O tamanho dos dados de teste é: {test_df.count()}!')

O tamanho dos dados de treino é: 15106!
O tamanho dos dados de teste é: 6496!


#### Aplicando normalização dos dados com RobustScaler

In [32]:
colunas_norm = ['quartos', 'banheiros', 'm2_interior', 'm2_espaco_completo',
                'numero_andares', 'vista_orla', 'qualidade_design', 'm2_interior_15_vizinhos', 'tamanho_imovel_completo']

In [33]:
def aplicando_robust_scaler(dataframe, lista_colunas):
    
    # Criando o VectorAssembler para transformar as colunas em vetores
    assembler = VectorAssembler(inputCols = lista_colunas, outputCol = "features")
    
    # Criando o RobustScaler para aplicar a escala robusta nas features
    scaler = RobustScaler(inputCol = "features", outputCol = "scaled_features", withScaling = True, withCentering = True)
    
    # Definindo a pipeline com as etapas de pré-processamento
    pipeline = Pipeline(stages = [assembler, scaler])
    
    # Treinando a pipeline com os dados de treinamento
    pipeline_model = pipeline.fit(dataframe)
    
    # Aplicando a pipeline no DataFrame de entrada
    dataframe = pipeline_model.transform(dataframe)
    
    # Dropando as colunas originais
    for coluna in lista_colunas:
        dataframe = dataframe.drop(coluna)
    
    dataframe = dataframe.drop('features')
    
    # Retornando o DataFrame transformado
    return dataframe

In [34]:
train_df = aplicando_robust_scaler(train_df, colunas_norm)

In [35]:
test_df = aplicando_robust_scaler(test_df, colunas_norm)

## Machine Learning

#### Transformando as colunas e o vetor assembler em um só vetor

In [36]:
def inserindo_assembler(dataframe):
    
    # Listando as colunas e excluindo a coluna target (preco)
    colunas = dataframe.columns
    colunas = [coluna for coluna in colunas if coluna != 'preco']
    
    #
    assembler = VectorAssembler(inputCols = colunas, outputCol = "features")
    
    #
    dataframe = assembler.transform(dataframe)
    
    return dataframe

In [37]:
train_df = inserindo_assembler(train_df)
test_df = inserindo_assembler(test_df)

In [38]:
lr = LinearRegression(fitIntercept = True, featuresCol = 'features', labelCol = 'preco')

In [40]:
model = lr.fit(train_df)

In [42]:
predictions = model.transform(test_df)

In [43]:
evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'preco', metricName = 'rmse')
rmse = evaluator.evaluate(predictions)
print(f'RMSE: {rmse:.3f}')

RMSE: 211951.355
