# Trabalho Final Nb. 3 - Fundamentos de Big Data

CIMATEC 2025.05

Gustavo Campos

João Carneiro

Uziel Araújo

## Inicialização

In [68]:
# Importa bibliotecas
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [69]:
# Inicia sessao spark
sc = SparkSession.builder.appName("SparkSQL") \
  .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.29") \
  .getOrCreate()

## Carregamento

In [70]:
# Monta google drive para acesso ao dataset 'notafiscal.csv'
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [71]:
# Carrega dados salvos na raiz do drive em dataframe spark
path = "/content/drive/MyDrive/"
df = sc.read.csv(path + 'notafiscal.csv', inferSchema=True, header=True)

## Seleção

In [72]:
# Cria, salva e visualiza amostra da tabela de vendas
df_venda = df.select(
    'id_venda',
    'dtc_venda',
    'cod_cliente',
    'cod_cidade',
    'cod_produto',
    'cod_gerencia',
    'num_nota',
    'qtd_venda',
    'val_venda',
).distinct()

df_venda.toPandas().to_csv(path + 'notafiscal_venda.csv', index=False)

df_venda.show(5)

+--------+----------+-----------+----------+-----------+------------+--------+---------+---------+
|id_venda| dtc_venda|cod_cliente|cod_cidade|cod_produto|cod_gerencia|num_nota|qtd_venda|val_venda|
+--------+----------+-----------+----------+-----------+------------+--------+---------+---------+
|       9|2021-02-09|          6|         2|         20|           1|     416|        2|    350.0|
|     464|2021-10-21|          6|         1|          6|           1|    1007|        2|    550.0|
|     505|2021-12-23|          3|         5|         13|           2|     303|        1|      3.2|
|     513|2021-08-24|          2|         2|         14|           2|     777|        2|     25.0|
|    1179|2022-03-22|          1|         1|         11|           3|     456|        2|      0.5|
+--------+----------+-----------+----------+-----------+------------+--------+---------+---------+
only showing top 5 rows



## Transformação

### Vendas por Ano-Mês

In [73]:
# Carrega dados de cidade e combina com vendas
df_cidade = sc.read.csv(path + 'notafiscal_cidade.csv', inferSchema=True, header=True)
df_vendas_mes = df_venda.join(df_cidade, 'cod_cidade')

df_vendas_mes.show(5)

+----------+--------+----------+-----------+-----------+------------+--------+---------+---------+-----------+
|cod_cidade|id_venda| dtc_venda|cod_cliente|cod_produto|cod_gerencia|num_nota|qtd_venda|val_venda| nom_cidade|
+----------+--------+----------+-----------+-----------+------------+--------+---------+---------+-----------+
|         2|       9|2021-02-09|          6|         20|           1|     416|        2|    350.0|NOVA IORQUE|
|         1|     464|2021-10-21|          6|          6|           1|    1007|        2|    550.0|   SALVADOR|
|         5|     505|2021-12-23|          3|         13|           2|     303|        1|      3.2|  BUDAPESTE|
|         2|     513|2021-08-24|          2|         14|           2|     777|        2|     25.0|NOVA IORQUE|
|         1|    1179|2022-03-22|          1|         11|           3|     456|        2|      0.5|   SALVADOR|
+----------+--------+----------+-----------+-----------+------------+--------+---------+---------+-----------+
o

In [74]:
# Carrega dados de produto e combina com vendas_mes
df_produto = sc.read.csv(path + 'notafiscal_produto.csv', inferSchema=True, header=True)
df_vendas_mes = df_vendas_mes.join(df_produto, 'cod_produto')

df_vendas_mes.show(5)

+-----------+----------+--------+----------+-----------+------------+--------+---------+---------+-----------+----------------+-------------+----------------+
|cod_produto|cod_cidade|id_venda| dtc_venda|cod_cliente|cod_gerencia|num_nota|qtd_venda|val_venda| nom_cidade|cod_departamento|  nom_produto|nom_departamento|
+-----------+----------+--------+----------+-----------+------------+--------+---------+---------+-----------+----------------+-------------+----------------+
|         20|         2|       9|2021-02-09|          6|           1|     416|        2|    350.0|NOVA IORQUE|               6|    Geladeira| Eletrodomestico|
|          6|         1|     464|2021-10-21|          6|           1|    1007|        2|    550.0|   SALVADOR|               1|    Notebook |     Informática|
|         13|         5|     505|2021-12-23|          3|           2|     303|        1|      3.2|  BUDAPESTE|               3|       Estojo|       Papelaria|
|         14|         2|     513|2021-08-24|  

In [75]:
# Cria coluna ano_mes, val_venda_total e seleciona colunas de interesse
df_vendas_mes = df_vendas_mes \
    .withColumn('ano_mes', F.date_format(F.col('dtc_venda'), "yyyyMM")) \
    .withColumn('val_venda_total', F.col('val_venda') * F.col('qtd_venda')) \
    .select('id_venda', 'ano_mes', 'nom_cidade', 'nom_produto', 'val_venda_total')

df_vendas_mes.show(5)

+--------+-------+-----------+-------------+---------------+
|id_venda|ano_mes| nom_cidade|  nom_produto|val_venda_total|
+--------+-------+-----------+-------------+---------------+
|       9| 202102|NOVA IORQUE|    Geladeira|          700.0|
|     464| 202110|   SALVADOR|    Notebook |         1100.0|
|     505| 202112|  BUDAPESTE|       Estojo|            3.2|
|     513| 202108|NOVA IORQUE|Classificador|           50.0|
|    1179| 202203|   SALVADOR|      Caneta |            1.0|
+--------+-------+-----------+-------------+---------------+
only showing top 5 rows



In [76]:
# Agrupa e consolida tabela para obter soma total de vendas por ano-mes
df_vendas_mes = df_vendas_mes \
  .groupBy('ano_mes', 'nom_cidade', 'nom_produto') \
  .agg(F.round(F.sum('val_venda_total'), 2).alias('soma_val_vendas')) \
  .coalesce(1)

df_vendas_mes.show(20)

+-------+-----------+------------------+---------------+
|ano_mes| nom_cidade|       nom_produto|soma_val_vendas|
+-------+-----------+------------------+---------------+
| 202101|      BAGDÁ|  Telefone Sem Fio|         1750.0|
| 202107|       ROMA|  Telefone Sem Fio|         2625.0|
| 202205|  BUDAPESTE|         Geladeira|         5250.0|
| 202205|      BAGDÁ|              Mesa|         1600.0|
| 202211|  BUDAPESTE|     Classificador|          125.0|
| 202307|  BUDAPESTE|         Geladeira|         4200.0|
| 202312|  BUDAPESTE|                TV|           50.3|
| 202210|       ROMA|        Computador|       10629.95|
| 202102|      BAGDÁ|          Papel A4|            4.3|
| 202309|   SALVADOR|             Baton|          788.0|
| 202309|   SALVADOR|          Borracha|           10.6|
| 202105|      BAGDÁ|       Celular 4Gb|          317.4|
| 202204|   SALVADOR|           Teclado|        2736.72|
| 202109|   SALVADOR|      Celular  8GB|          61.25|
| 202104|       ROMA|      Celu

### Vendas por Ano

In [77]:
# Obtem a soma consolidada de vendas por ano
df_vendas_ano = df_venda \
    .withColumn('ano', F.date_format(F.col('dtc_venda'), "yyyy")) \
    .withColumn('val_venda_total', F.col('val_venda') * F.col('qtd_venda')) \
    .join(df_produto, 'cod_produto') \
    .select('ano', 'nom_produto', 'val_venda_total') \
    .groupBy('ano', 'nom_produto') \
    .agg(F.round(F.sum('val_venda_total'), 2).alias('soma_val_vendas')) \
    .coalesce(1)

df_vendas_ano.show(20)

+----+------------------+---------------+
| ano|       nom_produto|soma_val_vendas|
+----+------------------+---------------+
|2023|             Fogão|       505500.0|
|2023|            Estojo|          953.6|
|2023|          Papel A4|          731.0|
|2023|              Mesa|       505600.0|
|2022|Barbeador Elétrico|       382700.0|
|2023|           Caneta |          150.0|
|2023|              Sofá|        80400.0|
|2021|         Notebook |       171050.0|
|2023|Barbeador Elétrico|       368725.0|
|2022|           Teclado|      113231.79|
|2021|              Mesa|       480000.0|
|2022|        microondas|       602700.0|
|2021|             Baton|        31717.0|
|2023|             Baton|        28565.0|
|2021|           Monitor|       401700.0|
|2023|        Computador|      648426.95|
|2021|          Papel A4|         599.85|
|2023|        microondas|       592200.0|
|2021|         Geladeira|       105350.0|
|2021|           Teclado|      113231.79|
+----+------------------+---------

## Registro

Salva novos dataframes no 'path' (diretório raiz do google drive)

In [78]:
# Salva tabela vendas por ano_mes resultante como parquet
df_vendas_mes.write.parquet(path + 'df_vendas_ano_mes.parquet', mode='overwrite')