#### 1.Preparando ambiente

In [3]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.functions import when, col, sum, count, isnan, round, isnull, to_timestamp, unix_timestamp
from pyspark.sql.functions import regexp_replace, concat_ws, sha2, rtrim
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType, DecimalType, TimestampType, FloatType
from pyspark.sql import HiveContext

import os
import re

from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import when

from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import year, month, dayofmonth, quarter 

from pyspark.sql.functions import col, to_date, unix_timestamp

spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [38]:
def salvar_df(df, file):
    output = "/input/desafio_curso/gold/" + file
    erase = "hdfs dfs -rm " + output + "/*"
    rename = "hdfs dfs -get /datalake/gold/" + file + "/part-* /input/big_data_BI/gold/" + file + ".csv"
    print(rename)

    df.coalesce(1).write \
        .format("csv") \
        .option("header", True) \
        .option("delimiter", ";") \
        .mode("overwrite") \
        .save("/datalake/gold/" + file + "/")

    os.system(erase)
    os.system(rename)

#### 2.Criando os DFs

In [5]:
#Vendas
df_vendas = spark.sql("""
                    SELECT DISTINCT * 
                    FROM desafio_curso.tbl_vendas
                    """)

In [6]:
#Clientes
df_clientes = spark.sql("""
                    SELECT DISTINCT * 
                    FROM desafio_curso.tbl_clientes
                    """)

In [7]:
#Endereço
df_endereco = spark.sql("""
                    SELECT * 
                    FROM desafio_curso.tbl_endereco
                    """)
#df_endereco.show(5)

+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+
|address_number|                city|country|  customer_address_1|  customer_address_2|  customer_address_3|  customer_address_4|state|    zip_code|
+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+
|Address Number|                City|Country|  Customer Address 1|  Customer Address 2|  Customer Address 3|  Customer Address 4|State|    Zip Code|
|      10000000|               Akron|     US|         PO Box 6258|                 ...|                 ...|                 ...|   OH|       44312|
|      10000453|                 ...|     UK|                 ...|                 ...|                 ...|                 ...|     |            |
|      10000455|    Huntington Beach|     US|   7392 Count Circle|                 ...|                 ..

In [8]:
#Região
df_regiao = spark.sql("""
                    SELECT * 
                    FROM desafio_curso.tbl_regiao
                    """)
#df_regiao.show(5)

+-----------+-----------+
|region_code|region_name|
+-----------+-----------+
|Region Code|Region Name|
|          0|     Canada|
|          1|    Western|
|          2|   Southern|
|          3|  Northeast|
+-----------+-----------+
only showing top 5 rows



In [9]:
#Divisão
df_divisao = spark.sql("""
                    SELECT * 
                    FROM desafio_curso.tbl_divisao
                    """)
#df_divisao.show(5)

#### 3.Criando a df_stage

In [10]:
#Tabelas temporárias para VENDAS e CLIENTES
df_vendas.createOrReplaceTempView("tbl_vendas")
df_clientes.createOrReplaceTempView("tbl_clientes")

In [11]:
#VENDAS + CLIENTES
df_stage = spark.sql("""
                    SELECT DISTINCT v.*, 
                        c.address_number, 
                        c.business_family, 
                        c.business_unit,
                        c.customer,
                        c.customer_type, 
                        c.division, 
                        c.line_of_business, 
                        c.phone, 
                        c.region_code, 
                        c.regional_sales_mgr, 
                        c.search_type
                    FROM tbl_vendas AS v
                    INNER JOIN tbl_clientes AS c
                    ON v.customerkey = c.customerkey
                    """)
df_stage = df_stage.drop("c.customerkey")
#df_stage.show(5)

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+-------------------+-------------+--------+----------------+------------+-----------+------------------+-----------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promised_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|address_number|business_family|business_unit|           customer|customer_type|division|line_of_business|       phone|region_code|regional_sales_mgr|search_type|
+--------------------+-----------+--------

In [12]:
#Tabelas temporárias para ENDEREÇO 
df_endereco.createOrReplaceTempView("tbl_endereco")
df_stage.createOrReplaceTempView("df_stage")

In [13]:
#STAGE + ENDEREÇO
df_stage = spark.sql("""
                    SELECT DISTINCT s.*, 
                        e.city, 
                        e.country, 
                        e.customer_address_1, 
                        e.customer_address_2, 
                        e.customer_address_3, 
                        e.customer_address_4, 
                        e.state, 
                        e.zip_code
                    FROM df_stage AS s
                    LEFT JOIN tbl_endereco AS e
                    ON s.address_number = e.address_number
                    """)

# Remove a coluna duplicada 'address_number'
df_stage = df_stage.drop("e.address_number")
#df_stage.show(5)

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+-------------------+-------------+--------+----------------+------------+-----------+------------------+-----------+---------+-------+--------------------+--------------------+--------------------+--------------------+-----+--------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promised_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|address_number|business_family|business_unit|           customer|customer_type|division

In [14]:
#Tabela temporária para REGIÃO
df_regiao.createOrReplaceTempView("tbl_regiao")
df_stage.createOrReplaceTempView("df_stage")

In [15]:
#STAGE + REGIÃO
df_stage = spark.sql("""
                    SELECT DISTINCT s.*, r.region_name
                    FROM df_stage AS s
                    LEFT JOIN tbl_regiao AS r
                    ON s.region_code = r.region_code
                    """)
df_stage = df_stage.drop("r.region_code")
#df_stage.show(5)

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+-------------------+-------------+--------+----------------+------------+-----------+------------------+-----------+---------+-------+--------------------+--------------------+--------------------+--------------------+-----+--------+-------------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promised_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|address_number|business_family|business_unit|           customer|customer

In [16]:
#Tabela temporária para DIVISÃO
df_divisao.createOrReplaceTempView("tbl_divisao")
df_stage.createOrReplaceTempView("df_stage")

In [17]:
#STAGE + DIVISÃO
df_stage = spark.sql("""
                    SELECT DISTINCT s.*, 
                        d.division_name
                    FROM df_stage AS s
                    LEFT JOIN tbl_divisao AS d
                    ON s.division = d.division
                    """)

df_stag = df_stage.drop("d.division")
#df_stage.show(5)

+--------------------+-----------+----------+---------------+------------+--------------+----------+-----------+--------------------+-----------+----------+------------+----------------------+------------+--------------------------------+-----------------+-------------------+-----------+--------------+---------+---+--------------+---------------+-------------+-------------------+-------------+--------+----------------+------------+-----------+------------------+-----------+---------+-------+--------------------+--------------------+--------------------+--------------------+-----+--------+-------------+-------------+
|actual_delivery_date|customerkey|   datekey|discount_amount|invoice_date|invoice_number|item_class|item_number|                item|line_number|list_price|order_number|promised_delivery_date|sales_amount|sales_amount_based_on_list_price|sales_cost_amount|sales_margin_amount|sales_price|sales_quantity|sales_rep|u_m|address_number|business_family|business_unit|           cus

#### 4.Aplicando regras

In [18]:
df_stage.printSchema()

root
 |-- actual_delivery_date: string (nullable = true)
 |-- customerkey: string (nullable = true)
 |-- datekey: string (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- invoice_number: string (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: string (nullable = true)
 |-- list_price: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- promised_delivery_date: string (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- sales_amount_based_on_list_price: string (nullable = true)
 |-- sales_cost_amount: string (nullable = true)
 |-- sales_margin_amount: string (nullable = true)
 |-- sales_price: string (nullable = true)
 |-- sales_quantity: string (nullable = true)
 |-- sales_rep: string (nullable = true)
 |-- u_m: string (nullable = true)
 |-- address_number: string (nullable = true)
 

In [20]:
df_stage2 = df_stage

In [21]:
#Convertendo colunas de data
data_columns = ['actual_delivery_date', 
                'datekey', 
                'invoice_date', 
                'promised_delivery_date']

date_format = "yyyy-MM-dd"

for c in data_columns:
    df_stage2 = df_stage2.withColumn(c, to_timestamp(col(c), date_format).cast(TimestampType()))

#Convertendo colunas de string
string_columns = ['customerkey',
                   'invoice_number',
                   'item_class', 
                   'item_number',
                   'item', 
                   'line_number',
                   'order_number',
                   'sales_quantity',
                   'sales_rep',
                   'u_m',
                   'address_number',
                   'business_family',
                   'business_unit',
                   'customer',
                   'customer_type',
                   'division',
                   'line_of_business',
                   'phone',
                   'regional_sales_mgr',
                   'search_type',
                   'city',
                   'country',
                   'customer_address_1',
                   'customer_address_2',
                   'customer_address_3',
                   'customer_address_4',
                   'state',
                   'zip_code',
                   'region_code',
                   'region_name',
                   'division_name']

for c in string_columns:
    df_stage2 = df_stage2.withColumn(c, col(c).cast(StringType()))

#Converter colunas de decimais
decimal_columns = ['discount_amount',
                   'list_price',
                   'sales_amount',
                   'sales_amount_based_on_list_price',
                   'sales_cost_amount',
                   'sales_margin_amount',
                   'sales_price']

for c in decimal_columns:
    df_stage2 = df_stage2.withColumn(c, col(c).cast(FloatType()))

#Converter colunas de inteiros
integer_columns = ['sales_quantity', 
                   'sales_rep', 
                   'business_unit', 
                   'division']

for c in integer_columns:
    df_stage2 = df_stage2.withColumn(c, col(c).cast(IntegerType()))

#Verificando o schema
df_stage2.printSchema()


root
 |-- actual_delivery_date: timestamp (nullable = true)
 |-- customerkey: string (nullable = true)
 |-- datekey: timestamp (nullable = true)
 |-- discount_amount: float (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- invoice_number: string (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: string (nullable = true)
 |-- list_price: float (nullable = true)
 |-- order_number: string (nullable = true)
 |-- promised_delivery_date: timestamp (nullable = true)
 |-- sales_amount: float (nullable = true)
 |-- sales_amount_based_on_list_price: float (nullable = true)
 |-- sales_cost_amount: float (nullable = true)
 |-- sales_margin_amount: float (nullable = true)
 |-- sales_price: float (nullable = true)
 |-- sales_quantity: integer (nullable = true)
 |-- sales_rep: integer (nullable = true)
 |-- u_m: string (nullable = true)
 |-- address_number: string (nullable = 

##### 4.1.Campos strings vazios deverão ser preenchidos com 'Não informado'.

##### 4.2.Campos decimais ou inteiros nulos ou vazios, deversão ser preenchidos por 0.

#### 5.Gravar as informações em tabelas dm em formato cvs delimitado por ';'.

In [22]:
#Criando as colunas dia, mês, ano e trimestre para a dimensão tempo
df_stage2 = df_stage2.withColumn("data", to_date("invoice_date", "dd/MM/yyyy"))
df_stage2 = df_stage2.withColumn("dia", dayofmonth("data"))
df_stage2 = df_stage2.withColumn("mes", month("data"))
df_stage2 = df_stage2.withColumn("ano", year("data"))
df_stage2 = df_stage2.withColumn("trimestre", quarter("data")).drop(df_stage2.invoice_date)  

In [23]:
#Criação das chaves - PK_LOCALIDADE
df_stage2 = df_stage2.withColumn('PK_LOCALIDADE', sha2(concat_ws("", df_stage2.customer_address_1, df_stage2.customer_address_2, df_stage2.customer_address_3, df_stage2.customer_address_4, df_stage2.address_number, df_stage2.region_code, df_stage2.region_name, df_stage2.city, df_stage2.country, df_stage2.state, df_stage2.zip_code ), 256)) 

In [24]:
#Criação das chaves - PK_TEMPO
df_stage2 = df_stage2.withColumn('PK_TEMPO', sha2(concat_ws("", df_stage2.data, df_stage2.dia, df_stage2.mes, df_stage2.ano, df_stage2.trimestre), 256))

In [27]:
#Criação das chaves - PK_CLIENTES
df_stage2 = df_stage2.withColumn('PK_CLIENTES', sha2(concat_ws("", df_stage2.customerkey, df_stage2.customer, df_stage2.customer_type, df_stage2.phone, df_stage2.line_of_business), 256))

In [28]:
#Tabela temp para a fatos e as dimensões
df_stage2.createOrReplaceTempView("stage")

In [29]:
ft_vendas = spark.sql("""
                    SELECT PK_CLIENTES,
                            PK_TEMPO, 
                            PK_LOCALIDADE, 
                            COUNT(sales_price) AS VALOR_DE_VENDA 
                    FROM stage 
                    GROUP BY PK_CLIENTES, PK_TEMPO, PK_LOCALIDADE
                    """)

In [31]:
df_clientes = spark.sql("""
                        SELECT DISTINCT PK_CLIENTES,
                                        customerkey,
                                        customer,
                                        customer_type,
                                        phone,
                                        line_of_business
                        FROM STAGE
                        """)

In [32]:
df_localidade = spark.sql("""
                        SELECT DISTINCT PK_LOCALIDADE,
                                        customer_address_1,
                                        customer_address_2,
                                        customer_address_3,
                                        customer_address_4,
                                        address_number,
                                        region_code,
                                        region_name,
                                        city,
                                        country,
                                        state,
                                        zip_code
                        FROM STAGE
                        """)

In [33]:
df_tempo = spark.sql("""
                    SELECT DISTINCT PK_TEMPO,
                                    data,
                                    dia,
                                    mes,
                                    ano,
                                    trimestre
                    FROM STAGE
                    """)

##### 6.Enviando para a GOLD

In [39]:
#FT_VENDAS
salvar_df(ft_vendas, 'ft_vendas')

hdfs dfs -get /datalake/gold/ft_vendas/part-* /input/big_data_BI/gold/ft_vendas.csv


In [40]:
#DIM_CLIENTES
salvar_df(df_clientes, 'dim_clientes')

hdfs dfs -get /datalake/gold/dim_clientes/part-* /input/big_data_BI/gold/dim_clientes.csv


In [41]:
#DIM_TEMPO
salvar_df(df_tempo, 'dim_tempo')

hdfs dfs -get /datalake/gold/dim_tempo/part-* /input/big_data_BI/gold/dim_tempo.csv


In [None]:
#DIM_LOCALIDADE
salvar_df(df_localidade, 'dim_localidade')

hdfs dfs -get /datalake/gold/dim_localidade/part-* /input/big_data_BI/gold/dim_localidade.csv
