# ETL RAW -> SILVER

### O objetivo desse arquivo é realizar a extração, transformação e carregamento dos dados brutos

- A estratégia utilizada no desenvolvimento desses scripts se deu a partir de uma analise lógica realizada no analytcs da raw e seguira os seguintes passos:
    
    0. **Configurações**
    1. **Limpeza das Linhas Onde Dados Essenciais estão Nulos**
    2. **Limpeza dos Imovéis Irregulares**
    3. **Limpeza das Irregularidades Lógicas**
    4. **Criação de Novas Colunas para Melhor Utilização dos Dados**
    5. **Carregar Todos os Dados Tratados dentro do Postgres**

### 0. Configurações

In [1]:
import pandas as pd
import pyspark.sql as psql
from pyspark.sql import SparkSession

import psycopg2
import os

In [2]:
RAW_DATA = '../Data Layer/raw/us-realestate-data.csv'

DB_CONFIG = {
    'host': 'localhost',
    'port': 5434,
    'database': os.getenv('POSTGRES_DB'),
    'user': os.getenv('POSTGRES_USER'),
    'password': os.getenv('POSTGRES_PASSWORD')
}

In [3]:
# Criação da SparkSession e do Schema do Dataset

spark = SparkSession.builder.appName("etl-rawtosilver").getOrCreate()

# No schema inicial todos tem permissão para serem nulos na intenção de evitar mudanças no dado bruto

schema = psql.types.StructType([
    psql.types.StructField("brokered_by", psql.types.FloatType(), True),
    psql.types.StructField("status", psql.types.StringType(), True),
    psql.types.StructField("price", psql.types.FloatType(), True),
    psql.types.StructField("bed", psql.types.IntegerType(), True),
    psql.types.StructField("bath", psql.types.IntegerType(), True),
    psql.types.StructField("acre_lot", psql.types.FloatType(), True),
    psql.types.StructField("street", psql.types.FloatType(), True),
    psql.types.StructField("city", psql.types.StringType(), True),
    psql.types.StructField("state", psql.types.StringType(), True),
    psql.types.StructField("zip_code", psql.types.StringType(), True),
    psql.types.StructField("house_size", psql.types.FloatType(), True),
    psql.types.StructField("prev_sold_date", psql.types.DateType(), True)
])

df = spark.read.csv(RAW_DATA, header=True, schema=schema)

total = df.count()

total

2226382

In [4]:
total = df.count()

### 1. Limpeza das Linhas Onde Dados Essenciais estão Nulos

In [5]:
df_clean = df.na.drop(subset=['price', 'zip_code', 'city', 'state', 'status'])
aux = df_clean.count()

print(50*'=')
print(f"Linhas Removidas: {total - aux}")
print(50*'=')

total = aux

Linhas Removidas: 3143


### 2. Limpeza dos Imovéis Irregulares

In [6]:
print(20*'=' + 'Limpando Imoveis Sem Tamanho' + '='*20)
df_clean = df_clean.filter(df_clean.house_size.isNotNull() | df_clean.acre_lot.isNotNull())
aux = df_clean.count()
print(f"Linhas Removidas: {total - aux}")
total = aux

print(20*'=' + 'Limpando Imoveis Sem Comôdo e Lotes' + '='*13)
df_clean = df_clean.filter(
    (df_clean.house_size.isNotNull() & df_clean.bed.isNotNull() & df_clean.bath.isNotNull()) | 
    (df_clean.acre_lot.isNotNull() & df_clean.house_size.isNull()))
aux = df_clean.count()
print(f"Linhas Removidas: {total - aux}")
total = aux

print(20*'=' + len('Limpando Imoveis Sem Tamanho')*'=' + '='*20)

Linhas Removidas: 49487
Linhas Removidas: 51436


### 3. Limpeza das Irregularidades Lógicas

#### 3.1 Áreas Irregulares

In [7]:
print(20*'=' + 'Limpando Outliers Relacionados a Área' + '='*20)
df_clean = df_clean.filter(
    ((df_clean.acre_lot > 0.02) & (df_clean.acre_lot <= 50.0) & df.house_size.isNull()) |
    ((df.house_size >= 300) & (df.house_size <= 60000))
)
aux = df_clean.count()
print(f'Linhas Removidas: {total - aux}')
total = aux

print(20*'=' + len('Limpando Outliers Relacionados a Área')*'=' + '='*20)

Linhas Removidas: 29819


#### 3.2 Preços Irregulares

In [8]:
print(20*'=' + 'Limpando Outliers Relacionados ao Preço' + '='*20)
df_clean = df_clean.filter((df_clean.price >= 20000) & (df_clean.price <= 3e8))
aux = df_clean.count()
print(f'Linhas Removidas: {total - aux}')
total = aux

print(20*'=' + 'Limpando Outliers Preço por sqft' + '='*27)
df_clean = df_clean.filter(
    ((psql.functions.col("house_size").isNotNull()) & ((psql.functions.col("price") / psql.functions.col("house_size")) >= 150)) |
    (psql.functions.col("acre_lot").isNotNull() & psql.functions.col("house_size").isNull())
)
aux = df_clean.count()
print(f'Linhas Removidas: {total - aux}')
total = aux

print(20*'=' + len('Limpando Outliers Relacionados ao Preço')*'=' + '='*20)

Linhas Removidas: 57135
Linhas Removidas: 450281


#### 3.3 Outliers em Banheiros e Quartos

In [9]:
print(20*'=' + 'Limpando Outliers Banheiros e Quartos' + '='*20)
df_clean = df_clean.filter((df.house_size.isNotNull()) & (((df.bed * 100) + (df.bath * 20)) < df.house_size) | (df.house_size.isNull() & df.acre_lot.isNotNull()))
aux = df_clean.count()
print(f'Linhas Removidas: {total - aux}')
total = aux

print(20*'=' + len('Limpando Outliers Banheiros e Quartos')*'=' + '='*20)

Linhas Removidas: 161


#### 3.4 Lotes Vazios com Numero de Banheiros / Quartos

In [10]:
print(20*'=' + 'Limpando Lotes Vazios com bath' + '='*20)
df_clean = df_clean.withColumn(
    "bath",
    psql.functions.when(psql.functions.col("house_size").isNull(), None).otherwise(psql.functions.col("bath"))
)
aux = df_clean.count()
print(f'Linhas Modificadas: {aux}')

print(20*'=' + 'Limpando Lotes Vazios com bed' + '='*21)
df_clean = df_clean.withColumn(
    "bed",
    psql.functions.when(psql.functions.col("house_size").isNull(), None).otherwise(psql.functions.col("bed"))
)
aux = df_clean.count()
print(f'Linhas Modificadas: {aux}')

print(20*'=' + len('Limpando Lotes Vazios com bath')*'=' + '='*20)

Linhas Modificadas: 1584920
Linhas Modificadas: 1584920


### 4. Criação e Apagando Colunas para Melhor Utilização dos Dados

In [11]:
print(20*'=' + 'Criando Coluna price_per_sqft' + '='*20)
df_clean = df_clean.withColumn(
    'price_per_sqft', 
    psql.functions.when(df.house_size.isNull(), None)
    .otherwise(psql.functions.round(psql.functions.col('price') / psql.functions.col('house_size'), 2))
)



In [12]:
print(20*'=' + 'Criando Coluna price_per_acre' + '='*20)
df_clean = df_clean.withColumn(
    'price_per_acre', 
    psql.functions.when((df.acre_lot.isNull()) | (df.acre_lot == 0), None)
    .otherwise(psql.functions.round(psql.functions.col('price') / psql.functions.col('acre_lot'), 2))
)



In [13]:
print(20*'=' + 'Criando Coluna rooms' + '='*20)
df_clean = df_clean.withColumn(
    'rooms', 
    psql.functions.when(df_clean.house_size.isNull(), None)
    .otherwise(df_clean.bed + df_clean.bath)
)



In [14]:
print(20*'=' + 'Removendo Colunas Desnecessárias' + '='*20)
df_clean = df_clean.drop('street')



### 5. Carregar no Banco de Dados

#### 5.1 Estabelecendo Conexão

In [15]:
db = psycopg2.connect(**DB_CONFIG)
cur = db.cursor()

#### 5.2 Cria One Big Table

In [16]:
tmap = {
    "string": "TEXT",
    "int": "INTEGER",
    "float": "REAL",
    "double": "DOUBLE PRECISION",
    "date": "DATE"
}

sparksql = []
sparksql.append('id SERIAL PRIMARY KEY')

for coluna in df_clean.dtypes:
    tipo = tmap.get(coluna[1])
    sparksql.append(f'{coluna[0]} {tipo}')

create_table = f"CREATE TABLE IF NOT EXISTS one_big_table ({', '.join(sparksql)});"

cur.execute(create_table)

db.commit()

#### 5.3 Popula a One Big Table

In [None]:
from psycopg2.extras import execute_batch

BATCH_SIZE = df_clean.count() // 4
cols = df_clean.columns

buffer = []

sql = f"INSERT INTO one_big_table ({', '.join(cols)}) VALUES ({', '.join(['%s'] * len(cols))})"

for linha in df_clean.toLocalIterator():
    buffer.append(tuple(linha[col] for col in cols))

    if len(buffer) >= BATCH_SIZE:
        execute_batch(
            cur,
            sql,
            buffer,
            page_size=5000
        )
        buffer = []

if buffer:
    execute_batch(
        cur,
        sql,
        buffer,
        page_size=5000
    )

db.commit()

In [18]:
cur.execute('SELECT COUNT(*) FROM one_big_table;')
total_banco = cur.fetchone()[0]

print(total_banco)

1584920


In [19]:
df_clean.count()

1584920