####Camada Gold Delta - Criação das dimensões e fatos

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id

# Create a SparkSession with the required configurations for Delta Lake
spark = SparkSession.builder \
    .appName("Carga Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
#Definindo variáveis com os caminhos de armazenamento
silver_path = '/FileStore/lhdw/silver/vendas'
gold_path = '/FileStore/lhdw/gold/vendas'
gold_fato_path = '/FileStore/lhdw/gold/vendas/ft_vendas'

In [0]:
#Verificar a ultima data de venda carregada.
from pyspark.sql.functions import max
max_data_venda = spark.read.format("delta").load(gold_fato_path)\
    .select(max('DataVenda')).collect()[0][0]
print(max_data_venda)


####Importando dados da camada silver

In [0]:
#display(spark.read.parquet(silver_path))
from pyspark.sql.functions import lit

#Carregando dados da Silver maiores que a ultima data da Gold
df_silver = spark.read.parquet(silver_path)\
        .filter(f'Data > "{max_data_venda}"')


####Criando Dimensão DIM_PRODUTO

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

#Nome tabela destino
tbl_destino = "dim_produto"

#Nome da coluna sk
sk = 'sk_produto'

#Lendo Dim Produto para filtrar somente o que ainda não foi carregado.
df_dim_produto = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')\
                .select('IDProduto')

#Lendo Dim Produto para pegar a proxima SK.
max_sk_produto = spark.read.format("delta").load(f'{gold_path}/{tbl_destino}')\
    .select(max(sk)+1).collect()[0][0]

#Selecionando as colunas e excluindo os registros duplicados
df_dim_produto_new = df_silver.select('IDProduto','Produto','Categoria')\
                          .dropDuplicates()\
                          .orderBy('IDProduto')

#Filtrando somente os novos produtos
df_dim_produto_new = df_dim_produto_new.join(df_dim_produto,df_dim_produto_new.IDProduto == df_dim_produto.IDProduto,'anti')

#Adicionando a chave primária
df_dim_produto_new = df_dim_produto_new.withColumn('sk_produto', monotonically_increasing_id()+max_sk_produto)

#Criando a tabela gold DIM_PRODUTO no formato Delta
df_dim_produto_new.write.format('delta').mode('append').save(f"{gold_path}/{tbl_destino}")

#Carregando Dim Produto completa para carregar a FATO.
df_dim_produto = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')

####Criando dimensão DIM CATEGORIA

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

#Nome tabela destino
tbl_destino = "dim_categoria"

#Nome da coluna sk
sk = 'sk_categoria'

#Lendo Dim Categoria para filtrar somente o que ainda não foi carregado.
df_dim_categoria = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')\
                .select('Categoria')

#Lendo Dim Categoria para pegar a proxima SK.
max_sk_categoria = spark.read.format("delta").load(f'{gold_path}/{tbl_destino}')\
    .select(max(sk)+1).collect()[0][0]

#Selecionando as colunas e excluindo os registros duplicados
df_dim_categoria_new = df_silver.select('Categoria')\
                          .distinct()\
                          .orderBy('Categoria')

#Filtrando somente as novas categorias
df_dim_categoria_new = df_dim_categoria_new.join(df_dim_categoria,df_dim_categoria_new.Categoria == df_dim_categoria.Categoria,'anti')                          

#Adicionando a chave primária
df_dim_categoria_new = df_dim_categoria_new.withColumn('sk_categoria', monotonically_increasing_id()+max_sk_categoria)

#Criando a tabela gold DIM_CATEGORIA no formato Delta
df_dim_categoria_new.write.format('delta').mode('append').save(f"{gold_path}/{tbl_destino}")

#display(df_dim_categoria_new)

#Carregando DIM_CATEGORIA completa para carregar a FATO.
df_dim_categoria = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')


####Criando dimensão DIM SEGMENTO

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

#Nome tabela destino
tbl_destino = "dim_segmento"

#Nome da coluna sk
sk = 'sk_segmento'

#Lendo Dim Segmento para filtrar somente o que ainda não foi carregado.
df_dim_segmento = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')\
                .select('Segmento')

#Lendo Dim Segmento para pegar a proxima SK.
max_sk_segmento = spark.read.format("delta").load(f'{gold_path}/{tbl_destino}')\
    .select(max(sk)+1).collect()[0][0]

#Selecionando as colunas e excluindo os registros duplicados
df_dim_segmento_new = df_silver.select('Segmento')\
                          .distinct()\
                          .orderBy('Segmento')

#Filtrando somente os novos segmentos
df_dim_segmento_new = df_dim_segmento_new.join(df_dim_segmento,df_dim_segmento_new.Segmento == df_dim_segmento.Segmento,'anti')                          

#Adicionando a chave primária
df_dim_segmento_new = df_dim_segmento_new.withColumn('sk_segmento', monotonically_increasing_id()+max_sk_segmento)

#Criando a tabela gold DIM_SEGMENTO no formato Delta
df_dim_segmento_new.write.format('delta').mode('append').save(f"{gold_path}/{tbl_destino}")

#display(df_dim_segmento_new)

#Carregando DIM_SEGMENTO completa para carregar a FATO.
df_dim_segmento = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')

####Criando tabela dimensão DIM FABRICANTE

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

#Nome tabela destino
tbl_destino = "dim_fabricante"

#Nome da coluna sk
sk = 'sk_fabricante'

#Lendo Dim Fabricante para filtrar somente o que ainda não foi carregado.
df_dim_fabricante = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')\
                .select('IDFabricante')

#Lendo Dim Fabricante para pegar a proxima SK.
max_sk_fabricante = spark.read.format("delta").load(f'{gold_path}/{tbl_destino}')\
    .select(max(sk)+1).collect()[0][0]

#Selecionando as colunas e excluindo os registros duplicados
df_dim_fabricante_new = df_silver.select('IDFabricante','Fabricante')\
                          .dropDuplicates()\
                          .orderBy('IDFabricante')

#Filtrando somente os novos fabricantes
df_dim_fabricante_new = df_dim_fabricante_new.join(df_dim_fabricante,df_dim_fabricante_new.IDFabricante == df_dim_fabricante.IDFabricante,'anti')

#Adicionando a chave primária
df_dim_fabricante_new = df_dim_fabricante_new.withColumn('sk_fabricante', monotonically_increasing_id()+max_sk_fabricante)

#Criando a tabela gold DIM_FABRICANTE no formato Delta
df_dim_fabricante_new.write.format('delta').mode('append').save(f"{gold_path}/{tbl_destino}")

#display(df_dim_fabricante_new)

#Carregando DIM_FABRICANTE completa para carregar a FATO.
df_dim_fabricante = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')

####Criando dimensão DIM GEOGRAFIA

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

#Nome tabela destino
tbl_destino = "dim_geografia"

#Nome da coluna sk
sk = 'sk_geografia'

#Lendo Dim Geografia para filtrar somente o que ainda não foi carregado.
df_dim_geografia = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')

#Lendo Dim Geografia para pegar a proxima SK.
max_sk_geografia = spark.read.format("delta").load(f'{gold_path}/{tbl_destino}')\
    .select(max(sk)+1).collect()[0][0]

#Selecionando as colunas e excluindo os registros duplicados
df_dim_geografia_new = df_silver.select('Cidade','Estado','Regiao','Distrito','Pais','CodigoPostal')\
                          .dropDuplicates()

#Filtrando somente as novas Geografias
df_dim_geografia_new = df_dim_geografia_new.join(df_dim_geografia,\
          (df_dim_geografia_new.Cidade == df_dim_geografia.Cidade) &
          (df_dim_geografia_new.Estado == df_dim_geografia.Estado) &
          (df_dim_geografia_new.Regiao == df_dim_geografia.Regiao) &
          (df_dim_geografia_new.Distrito == df_dim_geografia.Distrito) &
          (df_dim_geografia_new.Pais == df_dim_geografia.Pais) &
          (df_dim_geografia_new.CodigoPostal == df_dim_geografia.CodigoPostal), 
          'anti')

#Adicionando a chave primária
df_dim_geografia_new = df_dim_geografia_new.withColumn('sk_geografia', monotonically_increasing_id()+max_sk_geografia)

#Criando a tabela gold DIM_GEOGRAFIA no formato Delta
df_dim_geografia_new.write.format('delta').mode('append').option("overwriteSchema", "true").save(f"{gold_path}/{tbl_destino}")

#display(df_dim_geografia_new)

#Carregando DIM_GEOGRAFIA completa para carregar a FATO.
df_dim_geografia = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')

####Criando dimensão DIM CLIENTE

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

#Nome tabela destino
tbl_destino = "dim_cliente"

#Nome da coluna sk
sk = 'sk_cliente'

#Lendo Gold Dim Cliente para filtrar somente o que ainda não foi carregado.
df_dim_cliente = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')\
                  .select('IDCliente')

#Lendo Dim Cliente para pegar a proxima SK.
max_sk_cliente = spark.read.format("delta").load(f'{gold_path}/{tbl_destino}')\
    .select(max(sk)+1).collect()[0][0]

#1-Extraindo clientes únicos para criar a dimensão clientes
df_dim_cliente_new = df_silver.select('IDCliente','Nome','Email','Cidade','Estado','Regiao','Distrito','Pais','CodigoPostal')\
                          .distinct()

#Filtrando somente os novos Clientes
df_dim_cliente_new = df_dim_cliente_new.join(df_dim_cliente,df_dim_cliente_new.IDCliente == df_dim_cliente.IDCliente,'anti')

#2-Fazendo join para buscar a SK_GEOGRAFIA da DIM_GEOGRAFIA
df_dim_cliente_completo = df_dim_cliente_new.alias('cli')\
        .join(df_dim_geografia.alias('geo'),
              (col('cli.Cidade')==col('geo.Cidade')) &
              (col('cli.Estado')==col('geo.Estado')) &
              (col('cli.Regiao')==col('geo.Regiao')) &
              (col('cli.Distrito') == col('geo.Distrito')) &
              (col('cli.Pais') == col('geo.Pais')) &
              (col('cli.CodigoPostal') == col('geo.CodigoPostal')),
              'left')\
        .select('cli.IDCliente','cli.Nome','cli.Email','geo.sk_geografia').orderBy('IDCliente')

#3-Criando a chave única de Clientes.
df_dim_cliente_completo = df_dim_cliente_completo.withColumn('sk_cliente',monotonically_increasing_id()+max_sk_cliente)

#4-#Criando a tabela gold DIM_CLIENTE no formato Delta
df_dim_cliente_completo.write.format('delta').mode('append').option("overwriteSchema", "true").save(f"{gold_path}/{tbl_destino}")

#display(df_dim_cliente_completo)

#Carregando DIM_CLIENTE completa para carregar a FATO.
df_dim_cliente = spark.read.format('delta').load(f'{gold_path}/{tbl_destino}')

In [0]:
#display(spark.read.format('Delta').load(f'{gold_path}/dim_cliente').count()) 
#display(spark.read.parquet(f'{silver_path}').select('IDCliente').distinct().count())

#223734
#223734

####Criando da tabela fato FT_VENDAS


In [0]:
from pyspark.sql.functions import broadcast,year, month

#Nome tabela destino
tbl_destino = 'ft_vendas'

# Juntar dados da Silver com tabelas de dimensões para obter as chaves substitutas
df_ft_vendas = df_silver.alias('s')\
    .join(broadcast(df_dim_produto.select('IDProduto','sk_produto')),'IDProduto')\
    .join(broadcast(df_dim_categoria.select('Categoria','sk_categoria')),'Categoria')\
    .join(broadcast(df_dim_segmento.select('Segmento','sk_segmento')),'Segmento')\
    .join(broadcast(df_dim_fabricante.select('IDFabricante','sk_fabricante')),'IDFabricante')\
    .join(broadcast(df_dim_cliente.select('IDCliente','sk_cliente')),'IDCliente')\
    .select(col('s.Data').alias('DataVenda'),
            'sk_produto',
            'sk_categoria',
            'sk_segmento',
            'sk_fabricante',
            'sk_cliente',
            'Unidades',
            col('s.PrecoUnitario'),
            col('s.CustoUnitario'),
            col('s.TotalVendas'))

df_ft_vendas.withColumn('Ano',year('DataVenda'))\
            .withColumn('Mes',month('DataVenda'))\
            .write.format('Delta')\
            .mode("append")\
            .option('MaxRecordsPerFile', 1000000)\
            .partitionBy('Ano','Mes')\
            .save(f'{gold_path}/{tbl_destino}')

#display(df_ft_vendas)


In [0]:
#dbutils.fs.rm('/FileStore/lhdw/gold/vendas/FT_VENDAS',recurse=True)

In [0]:
%sql
/* Validando a tabela delta criada*/
/* 
select * from delta.`/FileStore/lhdw/gold/vendas/dim_produto` 
select * from delta.`/FileStore/lhdw/gold/vendas/dim_categoria` 
select * from delta.`/FileStore/lhdw/gold/vendas/dim_segmento` 
select * from delta.`/FileStore/lhdw/gold/vendas/dim_fabricante` 
select * from delta.`/FileStore/lhdw/gold/vendas/dim_geografia`
select * from delta.`/FileStore/lhdw/gold/vendas/dim_cliente`
select * from delta.`/FileStore/lhdw/gold/vendas/ft_vendas`
select ano, sum(TotalVendas), count(*) from delta.`/FileStore/lhdw/gold/vendas/ft_vendas`
group by ano
*/

/*2013	12244978.910005424	124777
2012	11395367.170004882	116857
2011	10595198.130001815	112202*/


In [0]:
#Validando Silver
df_silver = spark.read.parquet('/FileStore/lhdw/silver/vendas')
df_silver = df_silver.withColumn('TotalVendas',df_silver.TotalVendas.cast('double'))
#display(df_silver)
display(df_silver.groupBy('Ano').agg(F.sum('TotalVendas').alias('vendas')))

####Fazendo Limpeza da Memória

In [0]:
import gc

# Coletar lixo para liberar memória
gc.collect()

# Limpar todos os dados em cache
spark.catalog.clearCache()