### Camada Gold (Delta): Criação de Fatos e Dimensões

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession with the required configurations for Delta Lake
spark = SparkSession.builder \
    .appName("Carga Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
# Define os caminhos de armazenamento no Data Lake
silver_path = "/mnt/lhdw/silver/vendas"
gold_path = "/mnt/lhdw/gold/vendas_delta"
gold_fato_path = "/mnt/lhdw/gold/vendas_delta/fato_vendas"

### Ler dados Camada Silver
Filtrado pela maior data na tabela fato

In [0]:
from pyspark.sql.functions import date_sub, lit

# Ler a maior data de venda da tabela fato_vendas
max_data_venda = spark.read.format("delta").load(gold_fato_path) \
                          .selectExpr("max(DataVenda) as MaxDataVenda") \
                          .collect()[0]["MaxDataVenda"]

display(max_data_venda)

# Carregar dados da Silver filtrando pela DataVenda maior que a obtida acima
df_silver = spark.read.format("parquet").load(silver_path) \
                          .filter(f"Data > '{max_data_venda}'")

df_silver.count()                          

datetime.date(2011, 12, 31)Out[3]: 116895

### Criação da Dimensão Produto

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp
#Nome tabela destino

tb_destino = "dim_produto"

# Extrair produtos únicos para a dimensão Produto
dim_produto_df = df_silver.select(
    "IDProduto", "Produto", "Categoria").dropDuplicates()

# Adicionar chave substituta (surrogate keys)
dim_produto_df = dim_produto_df.withColumn("sk_produto", monotonically_increasing_id()+1) \
                               .withColumn("data_atualizacao", current_timestamp())


# Escrever DimProduto no formato Delta
dim_produto_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{gold_path}/{tb_destino}")
display(dim_produto_df)

IDProduto,Produto,Categoria,sk_produto,data_atualizacao
585,Maximus UC-50,Urban,1,2024-10-01T17:17:33.763+0000
555,Maximus UC-20,Mix,2,2024-10-01T17:17:33.763+0000
423,Maximus UM-28,Urban,3,2024-10-01T17:17:33.763+0000
681,Maximus UC-46,Urban,4,2024-10-01T17:17:33.763+0000
628,Maximus UC-93,Urban,5,2024-10-01T17:17:33.763+0000
415,Maximus UM-20,Urban,6,2024-10-01T17:17:33.763+0000
547,Maximus UC-12,Mix,7,2024-10-01T17:17:33.763+0000
653,Maximus UC-18,Urban,8,2024-10-01T17:17:33.763+0000
512,Maximus UR-01,Urban,9,2024-10-01T17:17:33.763+0000
520,Maximus UE-08,Urban,10,2024-10-01T17:17:33.763+0000


### Criação da Dimensão Categoria

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
#Nome tabela destino

tb_destino = "dim_categoria"

# Extrair Categorias únicas para a dimensão Categoria
dim_categoria_df = df_silver.select(
    "Categoria").dropDuplicates()

# Adicionar chave substituta (surrogate keys)
dim_categoria_df = dim_categoria_df.withColumn("sk_categoria", monotonically_increasing_id()+1)\
                                   .withColumn("data_atualizacao", current_timestamp())

# Escrever DimCatgoria no formato Parquet, particionando por Categoria
dim_categoria_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{gold_path}/{tb_destino}")

### Criação da Dimensão Segmento

In [0]:
#Nome tabela destino

tb_destino = "dim_segmento"

# Extrair Segmentos únicos para a dimensão Segmentos
dim_segmento_df = df_silver.select(
   "Segmento").dropDuplicates()

# Adicionar chave substituta (surrogate keys)
dim_segmento_df = dim_segmento_df.withColumn("sk_segmento", monotonically_increasing_id()+1) \
                                 .withColumn("data_atualizacao", current_timestamp())


# Escrever DimSegmento no formato Parquet
dim_segmento_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{gold_path}/{tb_destino}")

### Criação da Dimensão Fabricante

In [0]:
#Nome tabela destino
tb_destino = "dim_fabricante"

# Extrair produtos únicos para a dimensão Fabricante    
dim_fabricante_df = df_silver.select(
    "IDFabricante", "Fabricante").dropDuplicates()

# Adicionar chave substituta (surrogate keys)
dim_fabricante_df = dim_fabricante_df.withColumn("sk_fabricante", monotonically_increasing_id()+1)\
                                      .withColumn("data_atualizacao", current_timestamp())


# Escrever DimFabricante no formato Delta
dim_fabricante_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{gold_path}/{tb_destino}")

### Criação da Dimensão Geografia

In [0]:
#Nome tabela destino
tb_destino = "dim_geografia"

# Extrair Geografia  únicos para a dimensão Geografia
dim_geografia_df = df_silver.select(
     "Cidade", "Estado", "Regiao", "Distrito", "Pais", "CodigoPostal"
).dropDuplicates()

# Adicionar chave substituta
dim_geografia_df = dim_geografia_df.withColumn("sk_geografia", monotonically_increasing_id()+1) \
                                   .withColumn("data_atualizacao", current_timestamp())


# Escrever DimGeografia no formato Parquet
dim_geografia_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{gold_path}/{tb_destino}")


### Criação da Dimensão Cliente

In [0]:
#Nome tabela destino
tb_destino = "dim_cliente"

from pyspark.sql.functions import col, monotonically_increasing_id
# Passo 1 - Extrair clientes únicos para a dimensão Cliente
dim_cliente_df = df_silver.select(
    "IDCliente", "Nome", "Email", "Cidade", "Estado", "Regiao", "Distrito", "Pais", "CodigoPostal"
).dropDuplicates()

# Passo 2 - Realizar o join para obter a SK_Geografia
dim_cliente_com_sk_df = dim_cliente_df.alias("cliente") \
    .join(dim_geografia_df.alias("geografia"), 
          (col("cliente.Cidade") == col("geografia.Cidade")) &
          (col("cliente.Estado") == col("geografia.Estado")) &
          (col("cliente.Regiao") == col("geografia.Regiao")) &
          (col("cliente.Distrito") == col("geografia.Distrito")) &
          (col("cliente.Pais") == col("geografia.Pais")) &
          (col("cliente.CodigoPostal") == col("geografia.CodigoPostal")), 
          "left") \
    .select("cliente.IDCliente", "cliente.Nome", "cliente.Email", "geografia.sk_geografia")

# Passo 3 - Adicionar chave substituta
dim_cliente_com_sk_df = dim_cliente_com_sk_df.withColumn("sk_cliente", monotonically_increasing_id()+1) \
                                             .withColumn("data_atualizacao", current_timestamp())


# Passo 4 - Selecionar colunas específicas
dim_cliente_com_sk_df = dim_cliente_com_sk_df.select("IDCliente", "Nome","Email", "sk_geografia", "sk_cliente","data_atualizacao")

# Passo 5 - Escrever DimCliente no formato Delta
dim_cliente_com_sk_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(f"{gold_path}/{tb_destino}")

### Criação de Tabela Fato

In [0]:
#Nome tabela destino
tb_destino = "fato_vendas"

from pyspark.sql.functions import broadcast,year, month
# Juntar dados da Silver com tabelas de dimensões para obter as chaves substitutas
fato_vendas_df = df_silver.alias("s") \
    .join(broadcast(dim_produto_df.select("IDProduto", "sk_produto").alias("dprod")), "IDProduto") \
    .join(broadcast(dim_categoria_df.select("Categoria", "sk_categoria").alias("dcat")), "Categoria") \
    .join(broadcast(dim_segmento_df.select("Segmento", "sk_segmento").alias("dseg")), "Segmento") \
    .join(broadcast(dim_fabricante_df.select("Fabricante", "sk_fabricante").alias("dfab")), "Fabricante") \
    .join(broadcast(dim_cliente_com_sk_df.select("IDCliente", "sk_cliente").alias("dcli")), "IDCliente") \
    .select(
        col("s.Data").alias("DataVenda"),
        "sk_produto",
        "sk_categoria",
        "sk_segmento",
        "sk_fabricante",
        "sk_cliente",
        "Unidades",
        col("s.PrecoUnitario"),
        col("s.CustoUnitario"),
        col("s.TotalVendas"),
        current_timestamp().alias("data_atualizacao")
    )

# Escrever tabela Fato no formato Delta, particionando por DataVenda (ano e mês)
fato_vendas_df.withColumn("Ano", year("DataVenda")) \
             .withColumn("Mes", month("DataVenda")) \
             .write.format("delta") \
             .mode("append")\
             .option("mergeSchema", "true")\
             .option("MaxRecordsPerFile", 1000000)\
             .partitionBy("Ano", "Mes")\
             .save(f"{gold_path}/{tb_destino}")

### Demonstração de informação total vendas por ano

In [0]:
from pyspark.sql.functions import sum, col
gold_path = "/mnt/lhdw/gold/vendas_delta/"
# Consulta da fato vendas por categoria ano a ano com a soma de total de vendas

resultado = spark.read.format("delta").load(f"{gold_path}/fato_vendas") \
    .groupBy("Ano") \
    .agg(sum("TotalVendas").alias("SomaTotalVendas")) \
    .orderBy(col("Ano"), col("SomaTotalVendas").desc())

display(resultado)

Ano,SomaTotalVendas
2011,10595198.130002877
2012,11399112.610002369


### Limpeza de Memória

In [0]:
import gc
# Coletar lixo após operações pesadas para liberar memória
gc.collect()

Out[12]: 302