**1. Configurações Iniciais e Importações**

A partir deste notebook é feita a implementação da arquitetura Medallion com as camadas Bronze, Silver e Gold, utilizando Databricks e Delta Lake. Incluindo a criação de surrogate keys (chaves substitutas) para as dimensões e otimização da tabela fato na camada Gold.

**Explicações:**

- Importar bibliotecas e funções necessárias.
- Definir os caminhos de arquivo para as camadas Bronze, Silver e Gold.
- Configurar as definições do Spark para um desempenho ótimo, como partições de shuffle automático.

Observação: A função display não é recomendada em pipelines em produção porém inclui em vários pontos a exibição dos dados, para testar e melhor apresentar o case.
 

In [0]:
# Importar as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Iniciar a SparkSession com configurações otimizadas
spark = SparkSession.builder \
    .appName("Load Data Bronze") \
    .config("spark.sql.shuffle.partitions", "200")  \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Define um número fixo de partições para shuffle, melhorando o paralelismo                 
# Define o tamanho máximo de partições para evitar muitos arquivos pequenos        
# Usa o codec Snappy para compressão rápida, otimizando tempo de leitura e escrita    
# Habilita otimizações adaptativas, ajustando o número de partições dinamicamente com base no tamanho dos dados

# Define os caminhos de armazenamento no Data Lake
lz_path_in = "/mnt/lhdw/landingzone/vendas/processar"
lz_path_out = "/mnt/lhdw/landingzone/vendas/processado"
bronze_path = "/mnt/lhdw/bronze/vendas"



**Justificativa:**

- **spark.sql.shuffle.partitions**: Define o número de partições para operações que envolvem shuffle (como joins e agregações). Escolher um valor fixo, como 200, garante que o cluster trabalhe de forma paralela de maneira eficiente.

Um cálculo comum para o número de partições é o seguinte:

_`número de partições = número de núcleos de CPU * 2 ou 3`_

Isso ajuda a garantir que o Spark use todos os núcleos disponíveis.
- **spark.sql.files.maxPartitionByte**s: Define o tamanho máximo dos arquivos particionados para evitar a criação de muitos arquivos pequenos, o que prejudicaria a performance de leitura e escrita.
- **spark.sql.parquet.compression.codec**: Snappy é uma escolha comum para Parquet, pois oferece uma boa combinação de compressão rápida e descompressão eficiente.
- **spark.sql.adaptive.enabled**: A otimização adaptativa ajusta o plano de execução conforme o tamanho dos dados, melhorando o desempenho automaticamente.

%md
**2. Camada Bronze: Ingestão de Dados Brutos**

A camada Bronze armazena dados brutos com formato parquet, sem transformações significativas. Aqui vamos simplesmente gravar os dados brutos como parquet.

### Criando um Schema para dados brutos

In [0]:
# Definir o esquema dos dados brutos
schema_lz = StructType([
    StructField("IDProduto", IntegerType(), True),
    StructField("Data", DateType(), True),
    StructField("IDCliente", IntegerType(), True),
    StructField("IDCampanha", IntegerType(), True),
    StructField("Unidades", IntegerType(), True),
    StructField("Produto", StringType(), True),
    StructField("Categoria", StringType(), True),
    StructField("Segmento", StringType(), True),
    StructField("IDFabricante", IntegerType(), True),
    StructField("Fabricante", StringType(), True),
    StructField("CustoUnitario", DoubleType(), True),
    StructField("PrecoUnitario", DoubleType(), True),
    StructField("CodigoPostal", StringType(), True),
    StructField("EmailNome", StringType(), True),
    StructField("Cidade", StringType(), True),
    StructField("Estado", StringType(), True),
    StructField("Regiao", StringType(), True),
    StructField("Distrito", StringType(), True),
    StructField("Pais", StringType(), True)
])

# Leitura dos dados e adição da coluna nome do arquivo durante a leitura
df_vendas = spark.read.option("header", "true").schema(schema_lz).csv(lz_path_in) \
                      .withColumn("filename", regexp_extract(input_file_name(), "([^/]+)$", 0))

distinct_filenames = df_vendas.select("filename").distinct()

# Exibindo o DataFrame para verificar a leitura correta dos dados
display(df_vendas)

IDProduto,Data,IDCliente,IDCampanha,Unidades,Produto,Categoria,Segmento,IDFabricante,Fabricante,CustoUnitario,PrecoUnitario,CodigoPostal,EmailNome,Cidade,Estado,Regiao,Distrito,Pais,filename
449,2011-09-21,113077,21,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33180,"(Lysandra.Castaneda@xyza.com): Castaneda, Lysandra","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-10-10,234410,18,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33158,"(Dorothy.Rodriquez@xyza.com): Rodriquez, Dorothy","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-10-07,58091,20,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33186,"(Beau.Sutton@xyza.com): Sutton, Beau","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-10-08,114284,20,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33165,"(Boris.Leonard@xyza.com): Leonard, Boris","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-10-30,205070,18,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33179,"(Rafael.Fox@xyza.com): Fox, Rafael","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-11-22,139458,16,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33143,"(Martena.Guy@xyza.com): Guy, Martena","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-11-08,151912,16,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33155,"(Ina.Winters@xyza.com): Winters, Ina","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-08-17,176229,1,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33189,"(Kelly.Nicholson@xyza.com): Nicholson, Kelly","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-08-10,76694,3,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33176,"(Mufutau.Morton@xyza.com): Morton, Mufutau","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv
449,2011-09-23,146382,5,1,Maximus UM-54,Urban,Moderation,7,VanArsdel,74.7299175,102.36975,33175,"(Courtney.Marshall@xyza.com): Marshall, Courtney","Miami, FL, USA",FL,East,District #10,USA,dados_vendas_2011.csv


### Apresentando os arquivos lidos

In [0]:

display(distinct_filenames)

filename
dados_vendas_2011.csv


### Salvar os dados na camada Bronze

Os dados serão salvos de forma particionada **Ano e Mês**

In [0]:
# Escrever a tabela no formato Parquet, particionando por DataVenda (ano e mês)
df_vendas.withColumn("Ano", year("Data")) \
             .withColumn("Mes", month("Data")) \
             .write.mode("overwrite").partitionBy("Ano", "Mes").parquet(bronze_path)


**Justificativas:**

- Lê os dados brutos a partir de um arquivo CSV na landing zone e escreve esses dados no formato Parquet na camada Bronze.
- O Parquet é escolhido pelo seu suporte a colunas e sua eficiência tanto em termos de espaço quanto em desempenho de leitura e escrita.

### Mover os arquivos processados para pasta processado

Com o DataFrame particionado em memória, será persistido para a camada Bronze

In [0]:
from pyspark.sql import functions as F
# Unpersist the DataFrame to ensure it does not hold onto file references
distinct_filenames.unpersist()
# Mover os arquivos processados para o caminho lz_path_out
# Nota: A operação de mover arquivos diretamente não é suportada pelo DataFrame API do Spark.
# É necessário utilizar o dbutils.fs.mv para mover os arquivos manualmente após o processamento.


# Primeiro, verifique se há arquivos a serem movidos
if distinct_filenames.select("filename").distinct().count() > 0:
    filenames = distinct_filenames.select("filename").distinct().collect()

    for row in filenames:
        src_path = row.filename
        dbutils.fs.mv(lz_path_in + "/" + src_path, lz_path_out)


####Evidências

In [0]:
%fs ls /mnt/lhdw/landingzone/vendas/processar/

In [0]:
%fs ls /mnt/lhdw/landingzone/vendas/processado/

path,name,size,modificationTime
dbfs:/mnt/lhdw/landingzone/vendas/processado/dados_2012.csv,dados_2012.csv,22400712,1744398519000
dbfs:/mnt/lhdw/landingzone/vendas/processado/dados_vendas_2011.csv,dados_vendas_2011.csv,21493733,1744739084000
dbfs:/mnt/lhdw/landingzone/vendas/processado/dados_vendas_2012.csv,dados_vendas_2012.csv,22400712,1744398523000


In [0]:
%fs ls /mnt/lhdw/bronze/vendas/Ano=2011/Mes=10

path,name,size,modificationTime
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_SUCCESS,_SUCCESS,0,1744739075000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_2166931184801473550,_committed_2166931184801473550,932,1744738578000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_2552357199212090461,_committed_2552357199212090461,622,1744738642000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_7067702745515975292,_committed_7067702745515975292,324,1744398497000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_7520773942416823161,_committed_7520773942416823161,624,1744739075000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_vacuum7987331722946362019,_committed_vacuum7987331722946362019,96,1744738585000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_started_2166931184801473550,_started_2166931184801473550,0,1744738574000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/_started_7520773942416823161,_started_7520773942416823161,0,1744739068000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/part-00000-tid-7520773942416823161-fa47ec94-477a-4362-8c3f-43b95b0c24af-41-10.c000.snappy.parquet,part-00000-tid-7520773942416823161-fa47ec94-477a-4362-8c3f-43b95b0c24af-41-10.c000.snappy.parquet,66083,1744739074000
dbfs:/mnt/lhdw/bronze/vendas/Ano=2011/Mes=10/part-00001-tid-7520773942416823161-fa47ec94-477a-4362-8c3f-43b95b0c24af-42-10.c000.snappy.parquet,part-00001-tid-7520773942416823161-fa47ec94-477a-4362-8c3f-43b95b0c24af-42-10.c000.snappy.parquet,64669,1744739073000


### A opção de gravar dados no modo "append" 

Permite adicionar novos dados a um arquivo existente, sem substituir ou excluir os dados já presentes. 

No caso específico do código fornecido, a linha de código comentada `df_vendas.withColumn("Ano", year("Data")) \ .withColumn("Mes", month("Data")) \ .write.mode("append").partitionBy("Ano", "Mes").parquet(bronze_path)` indica que os dados do DataFrame `df_vendas` serão adicionados ao arquivo Parquet existente no caminho `bronze_path`, mantendo a estrutura de particionamento por ano e mês.

Essa opção é útil quando se deseja adicionar novos dados a um conjunto de dados já existente, como por exemplo, quando novas vendas são registradas e precisam ser incorporadas ao conjunto de dados de vendas existente.

In [0]:
#df_vendas.withColumn("Ano", year("Data")) \
#         .withColumn("Mes", month("Data")) \
#         .write.mode("append").partitionBy("Ano", "Mes").parquet(bronze_path)

### Gerenciar o uso de memória 
Em PySpark, é importante gerenciar o uso de memória eficientemente, especialmente quando se trabalha com grandes conjuntos de dados. Para isso, você pode usar alguns comandos específicos que ajudam a liberar memória, remover objetos em cache ou persistidos e forçar a coleta de lixo.



**Limpar cache:**
PySpark armazena dados em cache para melhorar o desempenho de operações repetidas. Para liberar esses dados, você pode usar o comando unpersist().

In [0]:
# Exemplo de como liberar o cache de um DataFrame

df_vendas.unpersist()

# O comando unpersist() remove o DataFrame do cache, liberando a memória associada. Ele é especialmente útil quando você já não precisa mais dos dados persistidos.

Out[33]: DataFrame[IDProduto: int, Data: date, IDCliente: int, IDCampanha: int, Unidades: int, Produto: string, Categoria: string, Segmento: string, IDFabricante: int, Fabricante: string, CustoUnitario: double, PrecoUnitario: double, CodigoPostal: string, EmailNome: string, Cidade: string, Estado: string, Regiao: string, Distrito: string, Pais: string, filename: string]

**Liberar variáveis manualmente:** Se você criou variáveis grandes que não são mais necessárias, você pode removê-las explicitamente.

In [0]:
del df_vendas

# O comando del remove o objeto da memória. Isso é útil quando você tem grandes DataFrames ou objetos Python que já não são necessários.

**Justificativa**
Como nesse case estou trabalhando com apenas um DataFrame no Databricks, com base nos tipos de limpeza existente decidi por usar o df.unpersist(), garantindo que ele seja removido do cache do Spark após o uso. Para uma limpeza adicional no ambiente Python, também decidi por remover a variável com del df, embora isso não libere memória no Spark diretamente.



####Boas Práticas de Limpeza de Cache
Evite manter em cache **DataFrames** desnecessários para otimizar o uso de memória e desempenho.

Outras formas de limpeza de cache:

- **Para uma limpeza rápida e geral**: Use o spark.catalog.clearCache() para limpar o cache de todos os objetos em cache no SparkSession atual, liberando uma quantidade significativa de memória quando múltiplos DataFrames estão sendo reutilizados.
- **Para liberar memória de DataFrames específicos**: Use df.unpersist(). O comando unpersist() remove o DataFrame do cache, liberando a memória associada. Ele é especialmente útil quando você já não precisa mais dos dados persistidos.
- **Para remover variáveis específicas**: Use del. O comando del remove o objeto da memória. Isso é útil quando você tem grandes DataFrames ou objetos Python que já não são necessários.
- **Para uma solução completa**: Reinicie o cluster.