In [1]:
from pyspark.sql import SparkSession, dataframe, Row
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.functions import col,trim,ltrim,rtrim,when,regexp_replace,concat_ws, lit, sha2, upper

import os
import re 

In [2]:
#sessão do spark com o Hive
#spark = SparkSession.builder.master("local[*]")\
#    .enableHiveSupport()\
#    .getOrCreate()

# CLIENTES

In [3]:
#Iniciando o tratamento dos dados
df_clientes = spark.sql("select * from desafio_curso.tbl_clientes")

In [4]:
#Removendo colunas desnecessárias
df_clientes = df_clientes.drop("business_unit","customer_type","phone","regional_sales_mgr")

In [5]:
#Convertendo os tipos de dados
df_clientes = df_clientes.withColumn("address_number",col("address_number").cast(IntegerType()))\
        .withColumn("customerkey",col("customerkey").cast(IntegerType()))\
        .withColumn("division",col("division").cast(IntegerType()))\
        .withColumn("region_code",col("region_code").cast(IntegerType()))

In [6]:
#Tratando as colunas vazias
df_clientes = df_clientes.withColumn('line_of_business', regexp_replace('line_of_business', '^\s+$', 'Não Informado'))

In [7]:
df_clientes = df_clientes.select("address_number"
        ,"business_family"
        ,"customerkey"
        ,"division"
        ,"line_of_business"
        ,"region_code"
        ,"search_type"
        ,"dt_foto"
        ,upper(col("customer")).alias("customer")
    )

In [8]:
#Removendo tuplas duplicadas
df_clientes = df_clientes.distinct()

In [9]:
df_clientes.printSchema()

root
 |-- address_number: integer (nullable = true)
 |-- business_family: string (nullable = true)
 |-- customerkey: integer (nullable = true)
 |-- division: integer (nullable = true)
 |-- line_of_business: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- search_type: string (nullable = true)
 |-- dt_foto: string (nullable = true)
 |-- customer: string (nullable = true)



In [10]:
df_clientes.show()

+--------------+---------------+-----------+--------+----------------+-----------+-----------+--------+--------------------+
|address_number|business_family|customerkey|division|line_of_business|region_code|search_type| dt_foto|            customer|
+--------------+---------------+-----------+--------+----------------+-----------+-----------+--------+--------------------+
|      10017828|             R1|   10017828|       2|   Não Informado|          1|          C|20230630|       LEE MEGASTORE|
|      10018352|             R3|   10018352|       1|   Não Informado|          5|          C|20230630|       JOHNSON STORE|
|      10020662|             R3|   10020662|       1|              M1|          5|          C|20230630|       NEMATRON SHOP|
|      10001784|             R2|   10001784|       1|   Não Informado|          5|          C|20230630|         ACACIA SHOP|
|      10018867|             R3|   10018867|       2|   Não Informado|          4|          C|20230630|    KARI SUPERMARKET|


In [11]:
df_clientes.createOrReplaceTempView('tb_clientes')

# DIVISÃO

In [12]:
df_divisao = spark.sql("select * from desafio_curso.tbl_divisao")

In [13]:
df_divisao = df_divisao.withColumn("division",col("division").cast(IntegerType()))

In [14]:
df_divisao.createOrReplaceTempView('tb_divisao')

# ENDEREÇO

In [15]:
df_endereco = spark.sql("select * from desafio_curso.tbl_endereco")

In [16]:
df_endereco.printSchema()

root
 |-- address_number: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- customer_address_1: string (nullable = true)
 |-- customer_address_2: string (nullable = true)
 |-- customer_address_3: string (nullable = true)
 |-- customer_address_4: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- dt_foto: string (nullable = true)



In [17]:
df_endereco = df_endereco.drop("customer_address_1","customer_address_2","customer_address_3","customer_address_4","zip_code")

In [18]:
#Convertendo os tipos de dados
df_endereco = df_endereco.withColumn("address_number",col("address_number").cast(IntegerType()))

In [19]:
#Tratando as colunas vazias
df_endereco = df_endereco.withColumn('city', regexp_replace('city', '^\s+$', 'Não Informado'))
df_endereco = df_endereco.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df_endereco.columns])
df_endereco = df_endereco.na.fill("Não Informado")

In [20]:
df_endereco.createOrReplaceTempView('tb_endereco')

In [21]:
df_endereco.show()

+--------------+----------------+-------+-------------+--------+
|address_number|            city|country|        state| dt_foto|
+--------------+----------------+-------+-------------+--------+
|      10000000|           Akron|     US|           OH|20230630|
|      10000453|   Não Informado|     UK|Não Informado|20230630|
|      10000455|Huntington Beach|     US|           CA|20230630|
|      10000456|        Edmonton|     CA|           AB|20230630|
|      10000458|         Saginaw|     US|           MI|20230630|
|      10000460|  Goodlettsville|     US|           TN|20230630|
|      10000461|    Boucherville|     CA|           QU|20230630|
|      10000462|       Hazelwood|     US|           MO|20230630|
|      10000466| North Highlands|     US|           CA|20230630|
|      10000469|        Montreal|     CA|           QU|20230630|
|      10000471|          Jeddah|     AU|Não Informado|20230630|
|      10000472|        Van Nuys|     US|           CA|20230630|
|      10000473|  Rancho 

# REGIÃO

In [22]:
df_regiao = spark.sql("select * from desafio_curso.tbl_regiao")

In [23]:
df_regiao.printSchema()

root
 |-- region_code: string (nullable = true)
 |-- region_name: string (nullable = true)
 |-- dt_foto: string (nullable = true)



In [24]:
#Convertendo os tipos de dados
df_regiao = df_regiao.withColumn("region_code",col("region_code").cast(IntegerType()))

In [25]:
df_regiao.createOrReplaceTempView('tb_regiao')

# VENDAS

In [26]:
df_vendas = spark.sql("select * from desafio_curso.tbl_vendas")

In [27]:
df_vendas.printSchema()

root
 |-- actual_delivery_date: string (nullable = true)
 |-- customerkey: string (nullable = true)
 |-- datekey: string (nullable = true)
 |-- discount_amount: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- invoice_number: string (nullable = true)
 |-- item_class: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item: string (nullable = true)
 |-- line_number: string (nullable = true)
 |-- list_price: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- promised_delivery_date: string (nullable = true)
 |-- sales_amount: string (nullable = true)
 |-- sales_amount_based_on_list_price: string (nullable = true)
 |-- sales_cost_amount: string (nullable = true)
 |-- sales_margin_amount: string (nullable = true)
 |-- sales_price: string (nullable = true)
 |-- sales_quantity: string (nullable = true)
 |-- sales_rep: string (nullable = true)
 |-- u_m: string (nullable = true)
 |-- dt_foto: string (nullable = true)



In [28]:
#Convertendo os tipos de dados
df_vendas = df_vendas.withColumn("customerkey",col("customerkey").cast(IntegerType()))\
        .withColumn("discount_amount",regexp_replace("discount_amount", ',', '.').cast(DoubleType()))\
        .withColumn("invoice_number",col("invoice_number").cast(IntegerType()))\
        .withColumn("item_number",col("item_number").cast(IntegerType()))\
        .withColumn("line_number",col("item_number").cast(IntegerType()))\
        .withColumn("list_price",regexp_replace("list_price", ',', '.').cast(DoubleType()))\
        .withColumn("order_number",col("order_number").cast(IntegerType()))\
        .withColumn("sales_amount",regexp_replace("sales_amount", ',', '.').cast(DoubleType()))\
        .withColumn("sales_amount_based_on_list_price",regexp_replace("sales_amount_based_on_list_price", ',', '.').cast(DoubleType()))\
        .withColumn("sales_cost_amount",regexp_replace("sales_cost_amount", ',', '.').cast(DoubleType()))\
        .withColumn("sales_margin_amount",regexp_replace("sales_margin_amount", ',', '.').cast(DoubleType()))\
        .withColumn("sales_price",regexp_replace("sales_price", ',', '.').cast(DoubleType()))\
        .withColumn("sales_quantity",col("sales_quantity").cast(IntegerType()))\
        .withColumn("sales_rep",col("sales_rep").cast(IntegerType()))
df_vendas = df_vendas.select('discount_amount',
                             'invoice_number',
                             'item_class',
                             'item_number',
                             'item',
                             'line_number',
                             'list_price',
                             'order_number',
                             'sales_amount',
                             'sales_amount_based_on_list_price',
                             'sales_cost_amount',
                             'sales_margin_amount',
                             'sales_price',
                             'sales_quantity',
                             'sales_rep',
                             'u_m', 
                             'customerkey',
                             'dt_foto',
                             from_unixtime(unix_timestamp('actual_delivery_date', 'dd/MM/yyy')).alias('actual_delivery_date'),
                             from_unixtime(unix_timestamp('invoice_date', 'dd/MM/yyy')).alias('invoice_date'),
                             from_unixtime(unix_timestamp('promised_delivery_date', 'dd/MM/yyy')).alias('promised_delivery_date'),
                             from_unixtime(unix_timestamp('datekey', 'dd/MM/yyy')).alias('datekey')
                            )


In [29]:
#Tratando as colunas vazias
df_vendas = df_vendas.na.fill(value=0)
df_vendas = df_vendas.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df_vendas.columns])
df_vendas = df_vendas.na.fill("Não Informado")
df_vendas = df_vendas.withColumn("datekey",to_timestamp(col("datekey")))
df_vendas = df_vendas.withColumn("promised_delivery_date",to_timestamp(col("promised_delivery_date")))
df_vendas = df_vendas.withColumn("invoice_date",to_timestamp(col("invoice_date")))
df_vendas = df_vendas.withColumn("actual_delivery_date",to_timestamp(col("actual_delivery_date")))

In [30]:
df_vendas.printSchema()

root
 |-- discount_amount: double (nullable = true)
 |-- invoice_number: integer (nullable = true)
 |-- item_class: string (nullable = false)
 |-- item_number: integer (nullable = true)
 |-- item: string (nullable = false)
 |-- line_number: integer (nullable = true)
 |-- list_price: double (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- sales_amount: double (nullable = true)
 |-- sales_amount_based_on_list_price: double (nullable = true)
 |-- sales_cost_amount: double (nullable = true)
 |-- sales_margin_amount: double (nullable = true)
 |-- sales_price: double (nullable = true)
 |-- sales_quantity: integer (nullable = true)
 |-- sales_rep: integer (nullable = true)
 |-- u_m: string (nullable = false)
 |-- customerkey: integer (nullable = true)
 |-- dt_foto: string (nullable = false)
 |-- actual_delivery_date: timestamp (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- promised_delivery_date: timestamp (nullable = true)
 |-- datekey: timestamp (nu

In [31]:
df_vendas.createOrReplaceTempView('tb_vendas')

# CRIANDO TABELÃO COM OS DADOS

In [32]:
#Criando tabelão com todos os dados
query='''
SELECT    c.customerkey
          ,c.customer
          ,c.business_family
          ,c.division
          ,d.division_name
          ,c.line_of_business
          ,c.region_code
          ,r.region_name
          ,c.search_type
          ,v.datekey
          ,v.actual_delivery_date
          ,v.discount_amount
          ,v.invoice_date
          ,v.invoice_number
          ,v.item_class
          ,v.item_number
          ,v.item
          ,v.line_number
          ,v.list_price
          ,v.order_number
          ,v.promised_delivery_date
          ,v.sales_amount
          ,v.sales_amount_based_on_list_price
          ,v.sales_cost_amount
          ,v.sales_margin_amount
          ,v.sales_price
          ,v.sales_quantity
          ,v.sales_rep
          ,v.u_m
          ,e.address_number
          ,e.city
          ,e.country
          ,e.state
          ,e.dt_foto
FROM      tb_vendas v
          INNER JOIN tb_clientes c ON v.customerkey == c.customerkey
          INNER JOIN tb_regiao r ON c.region_code == r.region_code
          INNER JOIN tb_divisao d ON c.division == d.division
          LEFT JOIN tb_endereco e ON c.address_number == e.address_number
'''

In [33]:
df_stage = spark.sql(query)

In [34]:
df_stage = (df_stage
            .withColumn('Ano', year(df_stage.invoice_date))
            .withColumn('Mes', month(df_stage.invoice_date))
            .withColumn('Dia', dayofmonth(df_stage.invoice_date))
            .withColumn('Trimestre', quarter(df_stage.invoice_date))
           )

df_stage = df_stage.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df_stage.columns])
df_stage = df_stage.na.fill("Não Informado")

In [35]:
df_stage.printSchema()

root
 |-- customerkey: integer (nullable = true)
 |-- customer: string (nullable = false)
 |-- business_family: string (nullable = false)
 |-- division: integer (nullable = true)
 |-- division_name: string (nullable = false)
 |-- line_of_business: string (nullable = false)
 |-- region_code: integer (nullable = true)
 |-- region_name: string (nullable = false)
 |-- search_type: string (nullable = false)
 |-- datekey: timestamp (nullable = true)
 |-- actual_delivery_date: timestamp (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- invoice_number: integer (nullable = true)
 |-- item_class: string (nullable = false)
 |-- item_number: integer (nullable = true)
 |-- item: string (nullable = false)
 |-- line_number: integer (nullable = true)
 |-- list_price: double (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- promised_delivery_date: timestamp (nullable = true)
 |-- sales_amount: double (nullable = true)


In [36]:
#Gerando keys para as DW

df_stage = df_stage.withColumn('key_cliente',sha2(col("customerkey").cast(StringType()),256))
df_stage = df_stage.withColumn('key_tempo',sha2(concat_ws('|', col('invoice_date'), col('Ano'),col('Mes'),col('Dia')),256))
df_stage = df_stage.withColumn('key_localidade',sha2(concat_ws('|', col('division'), col('region_code'),col('address_number')),256))

In [37]:
df_stage.createOrReplaceTempView('tb_stage')

In [39]:
#DIM_CLIENTES
dim_clientes = spark.sql('''
    SELECT DISTINCT key_cliente
        ,business_family
        ,customer 
        ,line_of_business
        ,search_type
    FROM tb_stage    
''')

In [40]:
#Criando a dimensão de tempo
dim_tempo = spark.sql('''
    SELECT DISTINCT key_tempo
        ,invoice_date
        ,Ano 
        ,Mes 
        ,Dia
        ,Trimestre
    FROM tb_stage    
''')

dim_tempo = dim_tempo.withColumn('invoice_date',to_date('invoice_date'))

In [41]:
#Criando a dimensão de localidade
dim_localidade = spark.sql('''
    SELECT DISTINCT key_localidade
        ,division_name
        ,region_name 
        ,country 
        ,state
        ,city
    FROM tb_stage    
''')

In [69]:
#Criando a fato
ft_vendas = spark.sql('''
    SELECT DISTINCT key_cliente
        ,key_tempo
        ,key_localidade
        ,count(distinct invoice_number) qty_vendas
        ,CAST(sum(sales_quantity) AS STRING) quantity
        ,CAST(round(sum(sales_amount),2) AS string) amount
        ,CAST(round(sum(sales_cost_amount),2) as string) cost
        ,CAST(round(sum(sales_margin_amount),2) as string) total_amount
    FROM tb_stage    
    GROUP BY key_cliente
        ,key_tempo
        ,key_localidade
''')

In [71]:
ft_vendas = ft_vendas.withColumn("total_amount",regexp_replace("total_amount", '\\.', ','))\
            .withColumn("amount",regexp_replace("amount", '\\.', ','))\
            .withColumn("cost",regexp_replace("cost", '\\.', ','))

In [72]:
def criar_csv (df,name):
    output = "/input/desafio_curso/gold/" + name
    erase = "hdfs dfs -rm " + output + "/*"
    
    df.coalesce(1).write\
        .format('csv')\
        .option('header',True)\
        .mode('overwrite')\
        .option('sep',';')\
        .save("/datalake/gold/"+name)
    
    copiar = "hdfs dfs -get /datalake/gold/"+name+"/*.csv /input/desafio_curso/gold/"+name+".csv"
    
    os.system(erase)
    os.system(copiar)
    
criar_csv(dim_tempo,'dim_tempo')
criar_csv(dim_localidade,'dim_localidade')
criar_csv(dim_clientes,'dim_clientes')
criar_csv(ft_vendas,'ft_vendas')
