In [0]:
# Importação de bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
# Lendo as tabelas
vendas_bronze = spark.read.format("delta").load("/Volumes/workspace/default/dadosvendas/bronze/vendas")
vendedores_bronze = spark.read.format("delta").load("/Volumes/workspace/default/dadosvendas/bronze/vendedores")
produtos_bronze = spark.read.format("delta").load("/Volumes/workspace/default/dadosvendas/bronze/produtos")

print("vendas")
vendas_bronze.show(3)
print("vendedores")
vendedores_bronze.show(3)
print("produtos")
produtos_bronze.show(3)

vendas
+--------+-----------+----------+----------+----------+-----------+-----------+------------+
|id_venda|id_vendedor|id_produto|data_venda|quantidade|valor_total|canal_venda|status_venda|
+--------+-----------+----------+----------+----------+-----------+-----------+------------+
|    1001|        186|       106|2024-01-15|         4|    3565.63|Loja Física|   concluido|
|    1002|        163|       102|2024-02-16|         2|    2494.31|   Telefone| processando|
|    1003|        164|       108|2024-03-01|         3|    4122.18|Loja Online| processando|
+--------+-----------+----------+----------+----------+-----------+-----------+------------+
only showing top 3 rows
vendedores
+---+--------------+--------+-------------+-------+-----------------+-----------+
| id|          nome|  regiao|data_cadastro| status|nivel_experiencia|meta_mensal|
+---+--------------+--------+-------------+-------+-----------------+-----------+
|  1| Beatriz Ramos| Sudeste|   2023-06-25|inativo|          

In [0]:
# tratamento

#removendo duplicatas de vendedores
window_spec = Window.partitionBy("id").orderBy(desc("data_cadastro"))
vendedores_clean = vendedores_bronze.withColumn("rank", row_number().over(window_spec)) \
                                  .filter(col("rank") == 1) \
                                  .drop("rank")

# tratando valores nulos (nulo vira 1)
vendas_clean = vendas_bronze.fillna({"quantidade": 1}) \
                           .filter(col("id_vendedor").isNotNull() & 
                                  col("id_produto").isNotNull())



In [0]:
#joins das tabelas pra criação da camada Silver

vendas_enriquecidas = vendas_clean.alias("v") \
    .join(vendedores_clean.alias("vd"), col("v.id_vendedor") == col("vd.id"), "left") \
    .join(produtos_bronze.alias("p"), col("v.id_produto") == col("p.id"), "left") \
    .select(
        col("v.id_venda"),
        col("v.id_vendedor"),
        col("vd.nome").alias("nome_vendedor"),
        col("vd.regiao").alias("regiao_vendedor"),
        col("v.id_produto"),
        col("p.nome").alias("nome_produto"),
        col("p.categoria"),
        col("v.data_venda"),
        col("v.quantidade"),
        col("v.valor_total"),
        col("v.canal_venda"),
        col("v.status_venda"),
        (col("v.valor_total") / col("v.quantidade")).alias("preco_unitario"),
        when(col("v.status_venda") == "concluido", col("v.valor_total")).otherwise(0).alias("valor_liquido")
    )
vendas_enriquecidas.display()


id_venda,id_vendedor,nome_vendedor,regiao_vendedor,id_produto,nome_produto,categoria,data_venda,quantidade,valor_total,canal_venda,status_venda,preco_unitario,valor_liquido
1001,186,Ana Oliveira,Norte,106,"Monitor 24""""",Eletrônicos,2024-01-15,4,3565.63,Loja Física,concluido,891.4075,3565.63
1002,163,Ana Oliveira,Sul,102,Smartphone Premium,Eletrônicos,2024-02-16,2,2494.31,Telefone,processando,1247.155,0.0
1003,164,João Silva,Norte,108,Mouse Sem Fio,Eletrônicos,2024-03-01,3,4122.18,Loja Online,processando,1374.0600000000002,0.0
1004,43,Camila Ferreira,Sudeste,103,Mesa Escritório,Móveis,2024-02-27,4,1409.91,Telefone,cancelado,352.4775,0.0
1005,174,Maria Santos,Sudeste,110,Webcam HD,Eletrônicos,2024-02-23,5,1495.46,Telefone,cancelado,299.092,0.0
1006,41,Felipe Gomes,Norte,101,Notebook Gamer,Eletrônicos,2024-01-13,4,3434.42,Loja Online,processando,858.605,0.0
1007,44,Ricardo Alves,Norte,104,Cadeira Ergonômica,Móveis,2024-01-03,1,2301.53,Telefone,processando,2301.53,0.0
1008,86,Pedro Costa,Sul,110,Webcam HD,Eletrônicos,2024-01-22,5,594.95,Loja Física,cancelado,118.99,0.0
1009,111,Fernanda Costa,Sul,103,Mesa Escritório,Móveis,2024-01-24,5,2720.19,Loja Online,cancelado,544.038,0.0
1010,55,Lucas Rocha,Nordeste,101,Notebook Gamer,Eletrônicos,2024-01-04,1,805.14,Loja Online,cancelado,805.14,0.0


In [0]:
# SIMULANDO UM Upsert-merge
# novos dados fictícios
novos_vendedores_data = [
    (1, "João Silva ATUALIZADO", "Sudeste", "2023-01-15", "ativo", "Senior", 75000.00),  
    (999, "Novo Vendedor MERGE", "Nordeste", "2024-03-01", "ativo", "Pleno", 45000.00)   
]
novos_vendedores_df = spark.createDataFrame(novos_vendedores_data, 
                                          ["id", "nome", "regiao", "data_cadastro", "status", "nivel_experiencia", "meta_mensal"])

In [0]:
# SIMULANDO UM Upserte-merge
from delta.tables import DeltaTable

# carregando a tabela delta como deltatable 
delta_table = DeltaTable.forPath(spark, "/Volumes/workspace/default/dadosvendas/bronze/vendedores")

# executando o merge (upsert)
merge_result = delta_table.alias("target").merge(
    novos_vendedores_df.alias("source"),
    "target.id = source.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

print(f"Resultado: {merge_result}")

Resultado: DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]


In [0]:
# Verificando o resultado do Merge
vendedores_atualizados = spark.read.format("delta").load("/Volumes/workspace/default/dadosvendas/bronze/vendedores")
vendedores_atualizados.filter(col("id").isin(1, 999)).show()

+---+--------------------+--------+-------------+------+-----------------+-----------+
| id|                nome|  regiao|data_cadastro|status|nivel_experiencia|meta_mensal|
+---+--------------------+--------+-------------+------+-----------------+-----------+
|  1|João Silva ATUALI...| Sudeste|   2023-01-15| ativo|           Senior|    75000.0|
|999| Novo Vendedor MERGE|Nordeste|   2024-03-01| ativo|            Pleno|    45000.0|
+---+--------------------+--------+-------------+------+-----------------+-----------+



In [0]:
# KPIs de performance por vendedor
kpi_vendedores = vendas_enriquecidas.filter(col("status_venda") == "concluido") \
    .groupBy("id_vendedor", "nome_vendedor", "regiao_vendedor") \
    .agg(
        sum("valor_total").alias("total_vendas"),
        avg("valor_total").alias("ticket_medio"),
        count("*").alias("qtd_vendas"),
        sum("quantidade").alias("total_itens_vendidos")
    )

# Pivot para transformar os canais de venda em colunas, agregando o total de vendas por categoria e região
vendas_por_canal = vendas_enriquecidas.filter(col("status_venda") == "concluido") \
    .groupBy("categoria", "regiao_vendedor") \
    .pivot("canal_venda") \
    .agg(sum("valor_total").alias("total_vendas"))

# verificando
kpi_vendedores.display()

id_vendedor,nome_vendedor,regiao_vendedor,total_vendas,ticket_medio,qtd_vendas,total_itens_vendidos
1,João Silva ATUALIZADO,Sudeste,2019.82,2019.82,1,3
2,Felipe Gomes,Sul,2565.9,2565.9,1,3
4,Ricardo Alves,Nordeste,2920.1,2920.1,1,5
7,Beatriz Ramos,Sudeste,1038.61,1038.61,1,2
13,Maria Santos,Sul,858.42,858.42,1,4
15,João Silva,Sul,5184.66,2592.33,2,7
19,Ana Oliveira,Nordeste,677.37,677.37,1,1
20,Pedro Costa,Norte,3394.69,3394.69,1,2
21,Renata Martins,Sul,3031.63,3031.63,1,5
28,Pedro Costa,Nordeste,1312.74,1312.74,1,4


In [0]:
# salvando os dados tratados (Silver)
vendas_enriquecidas.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/default/dadosvendas/silver/vendas_enriquecidas")

# Salvar KPIs (Gold) (formato parquet)
kpi_vendedores.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("/Volumes/workspace/default/dadosvendas/gold/kpi_vendedores")

# Salvar dados consolidados (Gold) (formato parquet) (particionando por regiao_vendedor)
vendas_enriquecidas.write \
    .format("parquet") \
    .mode("overwrite") \
    .partitionBy("regiao_vendedor") \
    .save("/Volumes/workspace/default/dadosvendas/gold/vendas_consolidadas")

In [0]:
# Consultas do resultado

#carregando os dados
vendas_gold = spark.read.format("parquet").load("/Volumes/workspace/default/dadosvendas/gold/vendas_consolidadas")
#view temporaria pra consultas sql
vendas_gold.createOrReplaceTempView("vendas_gold")

# todos os dados
consulta_completa = spark.sql("""
SELECT * 
FROM vendas_gold 
WHERE status_venda = 'concluido'
LIMIT 10
""")
consulta_completa.display()


# retornando dados do vendedor, produto e venda junto
consulta_desnormalizada = spark.sql("""
SELECT 
    id_venda,
    nome_vendedor,
    regiao_vendedor,
    nome_produto, 
    categoria,
    data_venda,
    valor_total,
    canal_venda,
    preco_unitario
FROM vendas_gold
WHERE regiao_vendedor = 'Sudeste' 
AND status_venda = 'concluido'
LIMIT 8
""")
consulta_desnormalizada.display()





id_venda,id_vendedor,nome_vendedor,id_produto,nome_produto,categoria,data_venda,quantidade,valor_total,canal_venda,status_venda,preco_unitario,valor_liquido,regiao_vendedor
1188,82,Felipe Gomes,107,Teclado Mecânico,Eletrônicos,2024-01-16,1,2146.24,Loja Física,concluido,2146.24,2146.24,Sudeste
1174,145,Rodrigo Santos,105,Tablet Android,Eletrônicos,2024-02-16,3,3722.09,Telefone,concluido,1240.6966666666667,3722.09,Sudeste
1173,48,Maria Santos,105,Tablet Android,Eletrônicos,2024-02-24,5,3926.23,Telefone,concluido,785.246,3926.23,Sudeste
1170,120,Maria Santos,102,Smartphone Premium,Eletrônicos,2024-01-16,1,3461.64,Loja Online,concluido,3461.64,3461.64,Sudeste
1165,35,André Souza,101,Notebook Gamer,Eletrônicos,2024-01-13,4,3768.19,Telefone,concluido,942.0475,3768.19,Sudeste
1140,145,Rodrigo Santos,110,Webcam HD,Eletrônicos,2024-02-04,3,355.39,Loja Física,concluido,118.46333333333332,355.39,Sudeste
1125,142,Rodrigo Santos,108,Mouse Sem Fio,Eletrônicos,2024-01-22,5,4630.55,Loja Física,concluido,926.11,4630.55,Sudeste
1124,7,Beatriz Ramos,105,Tablet Android,Eletrônicos,2024-02-12,2,1038.61,Telefone,concluido,519.305,1038.61,Sudeste
1094,100,Patrícia Lima,103,Mesa Escritório,Móveis,2024-01-08,1,1577.17,Loja Online,concluido,1577.17,1577.17,Sudeste
1059,96,Pedro Costa,107,Teclado Mecânico,Eletrônicos,2024-02-26,2,3353.02,Telefone,concluido,1676.51,3353.02,Sudeste


id_venda,nome_vendedor,regiao_vendedor,nome_produto,categoria,data_venda,valor_total,canal_venda,preco_unitario
1188,Felipe Gomes,Sudeste,Teclado Mecânico,Eletrônicos,2024-01-16,2146.24,Loja Física,2146.24
1174,Rodrigo Santos,Sudeste,Tablet Android,Eletrônicos,2024-02-16,3722.09,Telefone,1240.6966666666667
1173,Maria Santos,Sudeste,Tablet Android,Eletrônicos,2024-02-24,3926.23,Telefone,785.246
1170,Maria Santos,Sudeste,Smartphone Premium,Eletrônicos,2024-01-16,3461.64,Loja Online,3461.64
1165,André Souza,Sudeste,Notebook Gamer,Eletrônicos,2024-01-13,3768.19,Telefone,942.0475
1140,Rodrigo Santos,Sudeste,Webcam HD,Eletrônicos,2024-02-04,355.39,Loja Física,118.46333333333332
1125,Rodrigo Santos,Sudeste,Mouse Sem Fio,Eletrônicos,2024-01-22,4630.55,Loja Física,926.11
1124,Beatriz Ramos,Sudeste,Tablet Android,Eletrônicos,2024-02-12,1038.61,Telefone,519.305
