# 01. Data Sourcing - PySpark

## 01.1. Importação das bibliotecas

### 01.1.1. Importando as bibliotecas nativas do Python

In [1]:
import sys

### 01.1.2. Importando as bibliotecas de terceiros

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

### 01.1.3. Importando os módulos locais do projeto

In [3]:
sys.path.append('..')
import params.consts as consts

## 01.2. Fontes dos dados

### 01.2.1. Listando as fontes dos dados

| Fonte de Dados           | Disponibilidade  | Formato              |
|--------------------------|------------------|----------------------|
| Data Lakes               | -                | -                    |
| Data Warehouses          | -                | -                    |
| Clouds                   | -                | -                    |
| Bancos de Dados SQL      | -                | -                    |
| Bancos de Dados NoSQL    | -                | -                    |
| APIs                     | -                | -                    |
| ERPs                     | -                | -                    |
| CRMs                     | -                | -                    |
| Web Scraping             | -                | -                    |
| Sistemas Empresariais    | Ok               | Csv                  |
| Dados Locais             | Ok               | Csv                  |
| Outros                   | -                | -                    |

## 01.3. União entre tabelas

### 01.3.0. Iniciando a sessão Spark

In [4]:
spark = SparkSession.builder.appName('spark').getOrCreate()

### 01.3.1. Habilitando a exibição de todas as colunas

In [5]:
pd.set_option('display.max_columns', None)

### 01.3.2. Entendendo as tabelas e seus tipos de dados

In [6]:
dim_customers = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .option('sep', ',') \
    .load(consts.DIM_CUSTOMERS_RAW)

dim_calender = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .option('sep', ';') \
    .load(consts.DIM_CALENDER_RAW)

In [7]:
dim_customers.show(5)

+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+
|  ID|Year_Birth| Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Z_CostContact|Z_Revenue|Response|
+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+----------

In [8]:
dim_calender.show(5)

+----------+----+---+-----------+---+-------------+---------------+------+-----+---------+-----------------+--------+----------------+-------------+-------+-------------+
|      Data| Ano|Mês|Nome do Mês|Dia|Dia da Semana|       Dia Útil|Semana|   CW|Trimestre|Nome do Trimestre|Semestre|Nome do Semestre|Início do Mês|Mês/Ano|Mês/Ano Curto|
+----------+----+---+-----------+---+-------------+---------------+------+-----+---------+-----------------+--------+----------------+-------------+-------+-------------+
|2000-01-01|2000|  1|    Janeiro|  1|       Sábado|Final de Semana|     1|01/00|        1|           Trim 1|       1|      Semestre 1|   2000-01-01| jan/00|       jan/00|
|2000-01-02|2000|  1|    Janeiro|  2|      Domingo|Final de Semana|     2|02/00|        1|           Trim 1|       1|      Semestre 1|   2000-01-01| jan/00|       jan/00|
|2000-01-03|2000|  1|    Janeiro|  3|Segunda-Feira|       Dia Útil|     2|02/00|        1|           Trim 1|       1|      Semestre 1|   2000-01-

In [9]:
dim_customers.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: date (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- AcceptedCmp3: integer (nullable = true)
 |-- AcceptedCmp4: integer (nullable = true)
 |-- AcceptedCmp

In [10]:
dim_calender.printSchema()

root
 |-- Data: date (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- Mês: integer (nullable = true)
 |-- Nome do Mês: string (nullable = true)
 |-- Dia: integer (nullable = true)
 |-- Dia da Semana: string (nullable = true)
 |-- Dia Útil: string (nullable = true)
 |-- Semana: integer (nullable = true)
 |-- CW: string (nullable = true)
 |-- Trimestre: integer (nullable = true)
 |-- Nome do Trimestre: string (nullable = true)
 |-- Semestre: integer (nullable = true)
 |-- Nome do Semestre: string (nullable = true)
 |-- Início do Mês: date (nullable = true)
 |-- Mês/Ano: string (nullable = true)
 |-- Mês/Ano Curto: string (nullable = true)



### 01.3.3. Unindo as tabelas

In [11]:
dim_customers = dim_customers.withColumn('Dt_Customer', col('Dt_Customer').cast('date'))

dim_calender = dim_calender.withColumn('Data', col('Data').cast('date'))

dataset_raw = dim_customers.join(

    dim_calender.select('Data', 'Mês', 'Trimestre'),

    dim_customers['Dt_Customer'] == dim_calender['Data'],

    how = 'left'

)

dataset_raw = dataset_raw.drop('Data')

dataset_raw = dataset_raw \
    .withColumnRenamed('Mês', 'Dt_Customer_Month') \
    .withColumnRenamed('Trimestre', 'Dt_Customer_Quarter')

In [12]:
dataset_raw.show(5)

+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+-----------------+-------------------+
|  ID|Year_Birth| Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Z_CostContact|Z_Revenue|Response|Dt_Customer_Month|Dt_Customer_Quarter|
+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+------

In [13]:
dataset_raw.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: date (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- AcceptedCmp3: integer (nullable = true)
 |-- AcceptedCmp4: integer (nullable = true)
 |-- AcceptedCmp

### 01.3.4. Salvando a tabela unificada

In [14]:
dataset_raw.write \
    .format('csv') \
    .mode('overwrite') \
    .option('header', 'true') \
    .option('sep', ',') \
    .save(consts.DATASET_RAW_PYSPARK)

## 01.4. Compactação dos arquivos

### 01.4.1. Compactando arquivos brutos em csv usando o formato gzip

In [15]:
df = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .option('sep', ',') \
    .load(consts.DATASET_RAW_PYSPARK)

df.write \
    .format('csv') \
    .mode('overwrite') \
    .option('header', 'true') \
    .option('sep', ',') \
    .option('compression', 'gzip') \
    .save(consts.DATASET_RAW_COMPRESSED_PYSPARK)