# **DataMart de Vendas em PySpark**

**Descrição:**  
Construção de um DataMart de vendas utilizando PySpark, com modelagem estrela (seção 1) e consultas analíticas (seção 2) no Colab.

**Autor:** Caio Silva  
**Data de Criação:** 27/04/2025  
**Última Atualização:** 27/04/2025  
**Stack:** PySpark, 3.11.12, Google Colab


# **Seção 1 - Transformação e modelagem de dados com PySpark**

In [19]:
# Instalação e importação de bibliotecas e funções usadas no projeto
!pip install pyspark
import os
from datetime import datetime
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, year, month, dayofmonth



In [4]:
# Upload dos dados brutos em CSV
uploaded = files.upload()

Saving dados_brutos.csv to dados_brutos.csv


In [5]:
# Criar a sessão Spark
spark = SparkSession.builder \
    .appName("DataMart_Vendas") \
    .getOrCreate()

In [6]:
# Ler o arquivo CSV
dados_brutos = spark.read.csv("/content/dados_brutos.csv", header=True, inferSchema=True)

# Printar os 10 primeiros registros
dados_brutos.show(10)


+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
| nome_cliente|        cidade|estado|nome_produto|categoria|fabricante|      data|qtd_vendida|valor_total|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
|Lucas Pereira|  Porto Alegre|    RS|  Detergente|  Limpeza|       Ypê|2024-01-26|          6|         90|
|Lucas Pereira|  Porto Alegre|    RS|      Feijão| Alimento|   Kicaldo|2024-01-14|         10|        240|
| Ana Oliveira|Rio de Janeiro|    RJ|Refrigerante|   Bebida| Coca-Cola|2024-01-15|          3|        150|
| Pedro Santos|      Curitiba|    PR|      Feijão| Alimento|   Kicaldo|2024-01-28|          4|        152|
| Pedro Santos|      Curitiba|    PR|       Arroz| Alimento|     Camil|2024-01-24|          3|         87|
| Pedro Santos|      Curitiba|    PR|Refrigerante|   Bebida| Coca-Cola|2024-01-10|          6|        192|
|   João Silva|     São Paulo|    SP|

In [7]:
# Montar tabela dimensão Cliente
dim_cliente = dados_brutos.select("nome_cliente").dropDuplicates() \
    .withColumn("cliente_id", monotonically_increasing_id())

# Montar tabela dimensão Localidade
dim_localidade = dados_brutos.select("cidade", "estado").dropDuplicates() \
    .withColumn("localidade_id", monotonically_increasing_id())

# Montar tabela dimensão Produto
dim_produto = dados_brutos.select("nome_produto", "categoria", "fabricante").dropDuplicates() \
    .withColumn("produto_id", monotonically_increasing_id())

# Montar tabela dimensão Tempo
dim_tempo = dados_brutos.select("data").dropDuplicates() \
    .withColumn("data_id", monotonically_increasing_id()) \
    .withColumn("ano", year("data")) \
    .withColumn("mes", month("data")) \
    .withColumn("dia", dayofmonth("data"))


In [8]:
# Join para adicionar os IDs nas vendas
fato = dados_brutos.join(dim_cliente, on="nome_cliente", how="left") \
    .join(dim_localidade, on=["cidade", "estado"], how="left") \
    .join(dim_produto, on=["nome_produto", "categoria", "fabricante"], how="left") \
    .join(dim_tempo, on="data", how="left")

# Selecionar apenas os campos da tabela fato
fato_vendas = fato.select(
    "cliente_id",
    "localidade_id",
    "produto_id",
    "data_id",
    "qtd_vendida",
    "valor_total"
)


In [9]:
# Montar pasta que irá receber o arquivos de resultado
os.makedirs("/content/resultado", exist_ok=True)

# Salvar os CSVs
dim_cliente.write.csv("/content/resultado/dim_cliente", header=True, mode="overwrite")
dim_localidade.write.csv("/content/resultado/dim_localidade", header=True, mode="overwrite")
dim_produto.write.csv("/content/resultado/dim_produto", header=True, mode="overwrite")
dim_tempo.write.csv("/content/resultado/dim_tempo", header=True, mode="overwrite")
fato_vendas.write.csv("/content/resultado/fato_vendas", header=True, mode="overwrite")


In [20]:
# Data do dia da execução
data_hoje = datetime.now().strftime('%Y-%m-%d')

# Definir o nome do arquivo .zip com a data do dia da execução do script
nome_zip = f"resultado_{data_hoje}.zip"

# Compactação dos resultados para disponibilização em .zip, caso prefira baixar tudo de uma vez
!zip -r {nome_zip} /content/resultado

# Baixar o arquivo zipado
files.download(f"/content/{nome_zip}")

  adding: content/resultado/ (stored 0%)
  adding: content/resultado/dim_produto/ (stored 0%)
  adding: content/resultado/dim_produto/._SUCCESS.crc (stored 0%)
  adding: content/resultado/dim_produto/part-00000-beec2fcb-d8ea-4129-abcd-87247cce22dc-c000.csv (deflated 18%)
  adding: content/resultado/dim_produto/_SUCCESS (stored 0%)
  adding: content/resultado/dim_produto/.part-00000-beec2fcb-d8ea-4129-abcd-87247cce22dc-c000.csv.crc (stored 0%)
  adding: content/resultado/dim_localidade/ (stored 0%)
  adding: content/resultado/dim_localidade/.part-00000-cecc329a-5392-4c31-9761-5508706480e9-c000.csv.crc (stored 0%)
  adding: content/resultado/dim_localidade/part-00000-cecc329a-5392-4c31-9761-5508706480e9-c000.csv (deflated 6%)
  adding: content/resultado/dim_localidade/._SUCCESS.crc (stored 0%)
  adding: content/resultado/dim_localidade/_SUCCESS (stored 0%)
  adding: content/resultado/dim_tempo/ (stored 0%)
  adding: content/resultado/dim_tempo/.part-00000-e0e6a5a4-144b-4b79-852e-3786a9c7

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

#**Seção 2 - Análise para as principais perguntas de negócio** *texto em itálico*

In [11]:
# Montar as tabelas como views temporárias para conseguirmos executar as querys
dim_cliente.createOrReplaceTempView("dim_cliente")
dim_localidade.createOrReplaceTempView("dim_localidade")
dim_produto.createOrReplaceTempView("dim_produto")
dim_tempo.createOrReplaceTempView("dim_tempo")
fato_vendas.createOrReplaceTempView("fato_vendas")


In [12]:
# Distribuição do total de vendas por estado
vendas_por_estado = spark.sql("""
    SELECT
        d.estado,
        SUM(f.valor_total) AS total_vendas
    FROM fato_vendas f
    JOIN dim_localidade d ON f.localidade_id = d.localidade_id
    GROUP BY d.estado
    ORDER BY total_vendas DESC
""")

vendas_por_estado.show()

+------+------------+
|estado|total_vendas|
+------+------------+
|    SP|        4646|
|    PR|        3113|
|    RS|        2991|
|    RJ|        2300|
|    MG|        1794|
+------+------------+



In [13]:
# Top 5 clientes da operação

top_clientes = spark.sql("""
    SELECT
        c.nome_cliente,
        SUM(f.valor_total) AS total_gasto
    FROM fato_vendas f
    JOIN dim_cliente c ON f.cliente_id = c.cliente_id
    GROUP BY c.nome_cliente
    ORDER BY total_gasto DESC
    LIMIT 5
""")

top_clientes.show()


+-------------+-----------+
| nome_cliente|total_gasto|
+-------------+-----------+
|   João Silva|       4646|
| Pedro Santos|       3113|
|Lucas Pereira|       2991|
| Ana Oliveira|       2300|
|  Maria Souza|       1794|
+-------------+-----------+



In [15]:
# Produtos com maior demanda
produtos_mais_vendidos = spark.sql("""
    SELECT
        p.nome_produto,
        SUM(f.qtd_vendida) AS total_vendido
    FROM fato_vendas f
    JOIN dim_produto p ON f.produto_id = p.produto_id
    GROUP BY p.nome_produto
    ORDER BY total_vendido DESC
""")

produtos_mais_vendidos.show()


+------------+-------------+
|nome_produto|total_vendido|
+------------+-------------+
|      Feijão|          142|
| Sabão em pó|          114|
|  Detergente|          112|
|Refrigerante|          101|
|       Arroz|           89|
+------------+-------------+



In [16]:
# Distribuição das vendas por ano e mês
vendas_ano_mes = spark.sql("""
    SELECT
        t.ano,
        t.mes,
        SUM(f.valor_total) AS total_vendas
    FROM fato_vendas f
    JOIN dim_tempo t ON f.data_id = t.data_id
    GROUP BY t.ano, t.mes
    ORDER BY t.ano, t.mes
""")

vendas_ano_mes.show()


+----+---+------------+
| ano|mes|total_vendas|
+----+---+------------+
|2024|  1|       14844|
+----+---+------------+



In [17]:
# Distribuição de receita a nível de fabricante
receita_por_fabricante = spark.sql("""
    SELECT
        p.fabricante,
        SUM(f.valor_total) AS total_receita
    FROM fato_vendas f
    JOIN dim_produto p ON f.produto_id = p.produto_id
    GROUP BY p.fabricante
    ORDER BY total_receita DESC
""")

receita_por_fabricante.show()


+----------+-------------+
|fabricante|total_receita|
+----------+-------------+
|   Kicaldo|         3704|
|       Ypê|         3000|
|       OMO|         2951|
|     Camil|         2647|
| Coca-Cola|         2542|
+----------+-------------+

