# 📊 Notebook 1 — Processamento e Preparação de Dados

Este notebook tem como objetivo realizar a **exploração, limpeza e preparação dos dados brutos** do case técnico proposto, utilizando **PySpark** como engine principal de processamento distribuído.

---

## 🎯 Objetivo

- Carregar os dados brutos fornecidos (`customers.json.gz`, `offers.json.gz`, `transactions.json.gz`)
- Realizar uma análise exploratória inicial (**EDA**) para entender a estrutura, qualidade e distribuição dos dados
- Tratar dados faltantes, tipos e formatos
- Preparar um conjunto de dados unificado e otimizado para análise e modelagem futura
- Salvar os dados tratados em formato **Parquet**, que é mais eficiente e leve para uso com Spark

---

## 🔁 Etapas executadas neste notebook

1. Importação de bibliotecas e configuração do ambiente PySpark
2. Leitura dos arquivos `.json.gz` diretamente com Spark
3. Exploração e validação de schemas e estatísticas dos dados
4. Tratamento de dados ausentes, inconsistentes ou inválidos
5. Conversão para formatos otimizados (`.parquet`)
6. Exportação dos dados tratados para `data/processed/`

---

## 🗂️ Estrutura esperada dos dados

- `data/raw/` → Arquivos `.json.gz` (compactados)
- `data/processed/` → Arquivos `.parquet` tratados e otimizados

---

## ⚙️ Tecnologias utilizadas

- Python 3.11
- PySpark
- JupyterLab
- Pandas (suporte auxiliar para análise exploratória)


## 1. 📦 Importação de bibliotecas e configuração do PySpark

Nesta etapa, vamos:

- Importar as bibliotecas necessárias para manipulação e análise dos dados
- Inicializar a sessão do PySpark (`SparkSession`), que será usada para leitura, transformação e gravação dos dados
- Configurar parâmetros básicos de execução, como nome da aplicação e quantidade de memória (caso necessário)

In [1]:
import sys
import os

# Obtém o caminho absoluto do diretório 'src'
src_path = os.path.abspath("../")
# Adiciona 'src' ao sys.path
if src_path not in sys.path:
    sys.path.append(src_path)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count
from pathlib import Path

In [2]:
sys.path

['/usr/local/lib/python311.zip',
 '/usr/local/lib/python3.11',
 '/usr/local/lib/python3.11/lib-dynload',
 '',
 '/usr/local/lib/python3.11/site-packages',
 '/app']

In [3]:
# Inicialização do SparkSession
spark = SparkSession.builder \
    .appName("iFood - Data Processing") \
    .getOrCreate()

#Testa se Spark está funcionando
spark.sparkContext.setLogLevel("WARN")
print("✅ SparkSession iniciada com sucesso!")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 23:09:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ SparkSession iniciada com sucesso!


## 2.Leitura dos dados brutos (.json.gz)

Nesta etapa, vamos:

- Ler os três conjuntos de dados fornecidos:
  - `customers.json.gz`
  - `offers.json.gz`
  - `transactions.json.gz`
- Utilizar o PySpark para carregar os arquivos diretamente do formato `.gz`, que é suportado nativamente
- Exibir uma prévia de cada dataset e verificar seu schema para garantir que os dados foram carregados corretamente


In [4]:
# Lê os arquivos JSON compactados com PySpark
customers_df = spark.read.json("/app/data/raw/profile.json.gz")
offers_df = spark.read.json("/app/data/raw/offers.json.gz")
transactions_df = spark.read.json("/app/data/raw/transactions.json.gz")

                                                                                

In [5]:
# Mostra uma prévia dos dados
print("🏷️ Offers:")
offers_df.show(5, truncate=False)

print("👤 Customers:")
customers_df.show(5, truncate=False)

print("💳 Transactions:")
transactions_df.show(5, truncate=False)

🏷️ Offers:
+----------------------------+--------------+--------+--------------------------------+---------+-------------+
|channels                    |discount_value|duration|id                              |min_value|offer_type   |
+----------------------------+--------------+--------+--------------------------------+---------+-------------+
|[email, mobile, social]     |10            |7.0     |ae264e3637204a6fb9bb56bc8210ddfd|10       |bogo         |
|[web, email, mobile, social]|10            |5.0     |4d5c57ea9a6940dd891ad53e9dbe8da0|10       |bogo         |
|[web, email, mobile]        |0             |4.0     |3f207df678b143eea3cee63160fa8bed|0        |informational|
|[web, email, mobile]        |5             |7.0     |9b98b8c7a33c4b65b9aebfe6a799e6d9|5        |bogo         |
|[web, email]                |5             |10.0    |0b1e1539f2cc45b7b9fa7c272da2e1d7|20       |discount     |
+----------------------------+--------------+--------+--------------------------------+------

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------------------+--------------+---------------------+----------------------------------------------------+
|account_id                      |event         |time_since_test_start|value                                               |
+--------------------------------+--------------+---------------------+----------------------------------------------------+
|78afa995795e4d85b5d9ceeca43f5fef|offer received|0.0                  |{NULL, 9b98b8c7a33c4b65b9aebfe6a799e6d9, NULL, NULL}|
|a03223e636434f42ac4c3df47e8bac43|offer received|0.0                  |{NULL, 0b1e1539f2cc45b7b9fa7c272da2e1d7, NULL, NULL}|
|e2127556f4f64592b11af22de27a7932|offer received|0.0                  |{NULL, 2906b810c7d4411798c6938adc9daaa5, NULL, NULL}|
|8ec6ce2a7e7949b1bf142def7d0e0586|offer received|0.0                  |{NULL, fafdcd668e3743c1bb461111dcafc2a4, NULL, NULL}|
|68617ca6246f4fbc85e91a2a49552598|offer received|0.0                  |{NULL, 4d5c57ea9a6940dd891ad53e9dbe8da0, NULL, NULL}|


                                                                                

## 3. 🔍 Exploração e Validação dos Dados (EDA)

Nesta etapa, vamos:

- Explorar os schemas das tabelas para verificar tipos de dados
- Observar estatísticas descritivas básicas
- Contar valores nulos e valores únicos
- Identificar possíveis problemas de qualidade (ex: campos vazios, inconsistentes)
- Verificar distribuições de colunas importantes

In [6]:
from src.eda.data_diagnostics import isna_sum, value_counts

### 3.1 Check and Fix NaN Values ``customers``
- age: **0 nulos (0.00%)**
- credit_card_limit: **2175 nulos (12.79%)**
- gender: **2175 nulos (12.79%)**
- id: **0 nulos (0.00%)**
- registered_on: **0 nulos (0.00%)**

In [7]:
isna_sum(customers_df, "customers")


📘 Schema de customers:
root
 |-- age: long (nullable = true)
 |-- credit_card_limit: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- registered_on: string (nullable = true)


🔢 Total de linhas: 17000

📊 Nulos por coluna (valores e %):
– age: 0 nulos (0.00%)
– credit_card_limit: 2175 nulos (12.79%)
– gender: 2175 nulos (12.79%)
– id: 0 nulos (0.00%)
– registered_on: 0 nulos (0.00%)

🔎 Amostra de customers:
+---+-----------------+------+--------------------------------+-------------+
|age|credit_card_limit|gender|id                              |registered_on|
+---+-----------------+------+--------------------------------+-------------+
|118|NULL             |NULL  |68be06ca386d4c31939f3a4f0e3dd783|20170212     |
|55 |112000.0         |F     |0610b486422d4921ae7d2bf64640c50b|20170715     |
|118|NULL             |NULL  |38fe809add3b4fcf9315a9694bb96ff5|20180712     |
|75 |100000.0         |F     |78afa995795e4d85b5d9ceeca43f5fef|20170

- Limpeza na Tabela customers
1. Coluna gender: Colocar o gender "O" para "NULL"
2. Coluna credit_card_limit: Usar a mediana para substituir os dados nulos

In [8]:
value_counts(customers_df, "gender")


📊 Distribuição da coluna: gender (total: 17000 registros)
+------+-----+------------------+
|gender|count|percent           |
+------+-----+------------------+
|M     |8484 |49.90588235294118 |
|F     |6129 |36.05294117647059 |
|NULL  |2175 |12.794117647058822|
|O     |212  |1.2470588235294118|
+------+-----+------------------+



In [9]:
value_counts(customers_df, "credit_card_limit")


📊 Distribuição da coluna: credit_card_limit (total: 17000 registros)
+-----------------+-----+------------------+
|credit_card_limit|count|percent           |
+-----------------+-----+------------------+
|NULL             |2175 |12.794117647058822|
|73000.0          |314  |1.8470588235294116|
|72000.0          |297  |1.7470588235294116|
|71000.0          |294  |1.7294117647058824|
|57000.0          |288  |1.6941176470588233|
|74000.0          |282  |1.6588235294117646|
|53000.0          |282  |1.6588235294117646|
|52000.0          |281  |1.6529411764705884|
|56000.0          |281  |1.6529411764705884|
|54000.0          |272  |1.6               |
|70000.0          |270  |1.588235294117647 |
|51000.0          |268  |1.576470588235294 |
|61000.0          |258  |1.5176470588235296|
|64000.0          |258  |1.5176470588235296|
|55000.0          |254  |1.4941176470588236|
|50000.0          |253  |1.4882352941176469|
|60000.0          |251  |1.4764705882352942|
|75000.0          |243  |1.429

25/03/25 23:16:42 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-de9a4af0-70c8-4952-8666-986a8408eeb0. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-de9a4af0-70c8-4952-8666-986a8408eeb0
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90)
	at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
	at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimize

### 3.2 Check NaN Values ``offers``

In [None]:
#isna_sum(offers_df, "offers")

In [None]:
#check_schema_and_nulls(transactions_df, "transactions")