# Processamento de Dados
Este notebook pertence ao projeto: Case Téc­ni­co Data Sci­en­ce - iFo­od acessível em \
https://github.com/paulolaque/ifood-case.git

## Procedimentos iniciais

### Importação de bibliotecas

In [0]:
import os
import tarfile
import urllib.request
import sys

# Este notebook executa em pyspark, favor instalar caso necessário
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
except ImportError:
    print("PySpark não está instalado. Instale com: pip install pyspark")
    sys.exit(1)

### Configuração de diretórios

In [0]:
# Detectar se está rodando no Databricks, pois o padrão de diretórios podem ser diferentes de uma máquina local
running_in_databricks = os.path.exists("/databricks/driver")

# Ajusta o caminho raiz do projeto
if running_in_databricks:
    project_root = "/dbfs" + os.getcwd().split("/notebooks")[0]  # raiz do workspace em DBFSpara Databricks
else:
    project_root = os.path.abspath("..")  # pasta acima de notebooks/

# Cria string de diretório
raw_data_path = os.path.join(project_root, "data", "raw")
tar_gz_path = os.path.join(raw_data_path, "ds-technical-evaluation-data.tar.gz")

# Cria a pasta se não existir
os.makedirs(raw_data_path, exist_ok=True)

### Configuração de sessão Spark 

In [0]:
# Cria SparkSession se necessário
spark = SparkSession.builder \
    .appName("Load JSON Files") \
    .getOrCreate()

## Carregamento e Leitura dos Dados

### Download e extração de arquivos

In [0]:

# URL de download
url = "https://data-architect-test-source.s3.sa-east-1.amazonaws.com/ds-technical-evaluation-data.tar.gz"

# Baixa o arquivo se não existir
if not os.path.exists(tar_gz_path):
    print("Baixando arquivo...")
    urllib.request.urlretrieve(url, tar_gz_path)
    print("Download completo.")
else:
    print("Arquivo já existe.")

# Extrai os arquivos
print("Extraindo arquivos...")
with tarfile.open(tar_gz_path, "r:gz") as tar:
    tar.extractall(path=raw_data_path)
print("Extração completa.")

# Lista arquivos
extracted_path = os.path.join(raw_data_path, "ds-technical-evaluation-data")
print("Arquivos extraídos:")
for root, dirs, files in os.walk(extracted_path):
    for file in files:
        print(os.path.join(root, file))

Arquivo já existe.
Extraindo arquivos...
Extração completa.
Arquivos extraídos:
/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/profile.json
/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/._transactions.json
/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/transactions.json
/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/._profile.json
/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/._offers.json
/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/offers.json


###  Conversão para DataFrames do Spark

In [0]:
for file in os.listdir(extracted_path):
    print(file)

from pyspark.sql.functions import lit

# Função para extrair em tabelas
def read_extracted_df(table_name: str):
    df = None  # Inicializa a variável df
    
    for file in os.listdir(extracted_path):
        # Verifica se o arquivo é o '.json'
        if table_name in file.lower() and file.endswith(".json"):
            # Detecta se precisa usar prefixo "file:/"
            if running_in_databricks:
                spark_path = "file:" + os.path.join(extracted_path, file)
            else:
                spark_path = os.path.join(extracted_path, file)

            print(f"Lendo {spark_path}")

            try:
                # Lê o arquivo JSON
                df = spark.read.json(spark_path)
                df = df.withColumn("source_file", lit(file))  # Adiciona a coluna do nome do arquivo
                break  # Para ao encontrar o arquivo ".json"
            except Exception as e:
                print(f"Erro ao ler {file}: {e}")
                
    return df  # Retorna o DataFrame


profile.json
._transactions.json
transactions.json
._profile.json
._offers.json
offers.json


In [0]:
df_profile = read_extracted_df('profile')
df_transactions=read_extracted_df('transactions')
df_offers=read_extracted_df('offers')

Lendo file:/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/profile.json
Lendo file:/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/._transactions.json
Erro ao ler ._transactions.json: Unable to infer schema for JSON. It must be specified manually.
Lendo file:/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/transactions.json
Lendo file:/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/._offers.json
Erro ao ler ._offers.json: Unable to infer schema for JSON. It must be specified manually.
Lendo file:/dbfs/databricks/driver/data/raw/ds-technical-evaluation-data/offers.json


## Transformação de Tabelas

### Desaninhamento de JSON

In [0]:
from pyspark.sql.functions import col

print('Antes:', df_transactions.count())

# Acesse diretamente os campos da coluna STRUCT 'value'
df_transactions = df_transactions.select("*", 
    col("value.amount").alias("amount"),
    col("value.offer_id").alias("offer_id"),
    col("value.offer id").alias("offer id"),
    col("value.reward").alias("reward")
)

print(f'Depois de desaninhar Json da coluna value: {df_transactions.count()}')

# Retirar coluna desaninhada
df_transactions=df_transactions.drop("value")

Antes: 306534
Depois de desaninhar Json da coluna value: 306534


Podemos ver que desaninhar não alterou o número de linhas

### Conversão de arrays em colunas

In [0]:
from pyspark.sql.functions import explode, col, collect_set, array, lit, when,array_contains

#  Explode os valores da lista para identificar os valores únicos
unique_channels = df_offers.select(explode(col("channels")).alias("channel")) \
                                  .distinct() \
                                  .rdd.flatMap(lambda row: row) \
                                  .collect()

#  Para cada valor possível, cria uma nova coluna binária (1 se presente, 0 se não)
for ch in unique_channels:
    df_offers = df_offers.withColumn(
        f"channel_{ch}",
        when(array_contains(col("channels"), ch), lit(1)).otherwise(lit(0))
    )
df_offers=df_offers.drop("channels")

## Manipulação e Limpeza dos Dados

### Remover linhas duplicadas

In [0]:
def verificar_duplicatas(df, nome_df="DataFrame"):
    total = df.count()
    distintos = df.distinct().count()
    duplicadas = total - distintos
    print(f"\n {nome_df}:")
    print(f" - Total de linhas: {total}")
    print(f" - Linhas distintas: {distintos}")
    print(f" - Linhas duplicadas: {duplicadas}")

verificar_duplicatas(df_transactions, "df_transactions")
verificar_duplicatas(df_offers, "df_offers")
verificar_duplicatas(df_profile, "df_profile")


 df_transactions:
 - Total de linhas: 306534
 - Linhas distintas: 306137
 - Linhas duplicadas: 397

 df_offers:
 - Total de linhas: 10
 - Linhas distintas: 10
 - Linhas duplicadas: 0

 df_profile:
 - Total de linhas: 17000
 - Linhas distintas: 17000
 - Linhas duplicadas: 0


In [0]:
df_transactions = df_transactions.dropDuplicates()
df_offers = df_offers.dropDuplicates()
df_profile = df_profile.dropDuplicates()

verificar_duplicatas(df_transactions, "df_transactions")
verificar_duplicatas(df_offers, "df_offers")
verificar_duplicatas(df_profile, "df_profile")



 df_transactions:
 - Total de linhas: 306137
 - Linhas distintas: 306137
 - Linhas duplicadas: 0

 df_offers:
 - Total de linhas: 10
 - Linhas distintas: 10
 - Linhas duplicadas: 0

 df_profile:
 - Total de linhas: 17000
 - Linhas distintas: 17000
 - Linhas duplicadas: 0


### Identificar colunas duplicadas

In [0]:
display(df_transactions.describe())

summary,account_id,event,time_since_test_start,source_file,amount,offer_id,offer id,reward
count,306137,306137,306137.0,306137,138953.0,33182,134002,33182.0
mean,2.565638242424101E31,,15.25770896690044,,12.777356156398175,,,4.902627930805859
stddev,0.0,,8.347840597971611,,30.25052863201714,,,2.8872005318851213
min,0009655768c64bdeb2e877511632db8f,offer completed,0.0,transactions.json,0.05,0b1e1539f2cc45b7b9fa7c272da2e1d7,0b1e1539f2cc45b7b9fa7c272da2e1d7,2.0
max,ffff82501cea40309d5fdd7edcca4a07,transaction,29.75,transactions.json,1062.28,fafdcd668e3743c1bb461111dcafc2a4,fafdcd668e3743c1bb461111dcafc2a4,10.0


In [0]:
# Como offer id e offer_id aparentemente são colunas que se complementam vamos testar se quando ambas são não nulasse temos divergência de valores

# Filtra linhas em que ambas as colunas são não nulas e diferentes
df_diff = df_transactions.filter(
    col("offer id").isNotNull() & 
    col("offer_id").isNotNull() & 
    (col("offer id") != col("offer_id"))
)

# Conta essas linhas
diff_count = df_diff.count()
print(f"Número de linhas com 'offer id' diferente de 'offer_id': {diff_count}")


Número de linhas com 'offer id' diferente de 'offer_id': 0


In [0]:
display(df_profile.describe())

summary,age,credit_card_limit,gender,id,registered_on,source_file
count,17000.0,14825.0,14825,17000,17000.0,17000
mean,62.53141176470588,65404.9915682968,,2.565638242424101E31,20167034.23411765,
stddev,26.7385799457672,21598.299410229534,,,11677.499960794426,
min,18.0,30000.0,F,0009655768c64bdeb2e877511632db8f,20130729.0,profile.json
max,118.0,120000.0,O,ffff82501cea40309d5fdd7edcca4a07,20180726.0,profile.json


In [0]:
display(df_offers.describe())

summary,discount_value,duration,id,min_value,offer_type,source_file,channel_mobile,channel_email,channel_social,channel_web
count,10.0,10.0,10,10.0,10,10,10.0,10.0,10.0,10.0
mean,4.2,6.5,,7.7,,,0.9,1.0,0.6,0.8
stddev,3.583914681524164,2.3213980461973533,,5.831904586934796,,,0.3162277660168379,0.0,0.5163977794943222,0.4216370213557839
min,0.0,3.0,0b1e1539f2cc45b7b9fa7c272da2e1d7,0.0,bogo,offers.json,0.0,1.0,0.0,0.0
max,10.0,10.0,fafdcd668e3743c1bb461111dcafc2a4,20.0,informational,offers.json,1.0,1.0,1.0,1.0


Apenas olhandos o mínimo e máximo de cada coluna podemos ver que não temos possíveis colunas duplicadas.
 Se fossem iguais poderiamos testar se realmente são iguais,mas não precisamos.

### Tratamento de valores nulos 

#### Verificação de valores nulos

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

def check_nulls(df, df_name="DataFrame"):
    print(f"\n>>> Verificando nulos em: {df_name}")
    nulls_df = df.select([
        spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
    ])
    nulls_df.show()

dfs = {
    "df_profile": df_profile,
    "df_transactions": df_transactions,
    "df_offers": df_offers
}

for name, df in dfs.items():
    check_nulls(df, name)



>>> Verificando nulos em: df_profile
+---+-----------------+------+---+-------------+-----------+
|age|credit_card_limit|gender| id|registered_on|source_file|
+---+-----------------+------+---+-------------+-----------+
|  0|             2175|  2175|  0|            0|          0|
+---+-----------------+------+---+-------------+-----------+


>>> Verificando nulos em: df_transactions
+----------+-----+---------------------+-----------+------+--------+--------+------+
|account_id|event|time_since_test_start|source_file|amount|offer_id|offer id|reward|
+----------+-----+---------------------+-----------+------+--------+--------+------+
|         0|    0|                    0|          0|167184|  272955|  172135|272955|
+----------+-----+---------------------+-----------+------+--------+--------+------+


>>> Verificando nulos em: df_offers
+--------------+--------+---+---------+----------+-----------+--------------+-------------+--------------+-----------+
|discount_value|duration| id|mi

#### Imputação de valores nulos em numéricos

In [0]:
from pyspark.sql.functions import col

# Verificando a hipótese de que os valores nulos são na verdade zero nas colunas numéricas
print("Tipos das colunas:")
print("df_profile:")
df_profile.select("credit_card_limit").printSchema()

print("df_transactions:")
df_transactions.select("amount", "reward").printSchema()

# Contando valores zero
zero_credit_card_limit = df_profile.filter(col("credit_card_limit") == 0).count()
print(f"\nNúmero de registros com credit_card_limit = 0: {zero_credit_card_limit}")

zero_amount = df_transactions.filter(col("amount") == 0).count()
print(f"Número de registros com amount = 0: {zero_amount}")

zero_reward = df_transactions.filter(col("reward") == 0).count()
print(f"Número de registros com reward = 0: {zero_reward}")


Tipos das colunas:
df_profile:
root
 |-- credit_card_limit: double (nullable = true)

df_transactions:
root
 |-- amount: double (nullable = true)
 |-- reward: double (nullable = true)


Número de registros com credit_card_limit = 0: 0
Número de registros com amount = 0: 0
Número de registros com reward = 0: 0


In [0]:
# Como não temos valor zero para as três colunas numéricas com dados faltantes (reward,amount e credit_card_limit), faz sentido supor que quando são nulas são zero, logo vamos inputar como zero esses dados. Para não perder informação vamos criar colunas informando quando elas foram nulas.
from pyspark.sql.functions import col, when

# Para a tabela df_profile
df_profile = df_profile.withColumn(
    "credit_card_limit_is_null", 
    when(col("credit_card_limit").isNull(), 1).otherwise(0)
).fillna({"credit_card_limit": 0})  # Preenche os nulos de credit_card_limit com 0

# Para a tabela df_transactions
df_transactions = df_transactions.withColumn(
    "amount_is_null", 
    when(col("amount").isNull(), 1).otherwise(0)
).withColumn(
    "reward_is_null", 
    when(col("reward").isNull(), 1).otherwise(0)
).fillna({"amount": 0, "reward": 0})  # Preenche os nulos de amount e reward com 0


#### Imputação de valores nulos com coluna auxiliar

In [0]:
from pyspark.sql.functions import when, col

# Como vimos que 'offer id' e 'offer_id' se complementam e não contem divergência de dados vamos unificá-las
# Atualiza 'offer_id' com valor de 'offer id' se for null
df_transactions = df_transactions.withColumn(
    "offer_id",
    when(col("offer_id").isNull(), col("offer id")).otherwise(col("offer_id"))
)

# Remove a coluna com dados duplicados
df_transactions = df_transactions.drop("offer id")

### Remoção de colunas irrelevantes

In [0]:
#vamos também remover as colunas source que não adicionam valor a nossa análise por ter sempre o mesmo valor e dizer apenas o nome da tabela. Isso vai melhorar a organização da tabela para não ter colunas com nomes duplicados
df_transactions=df_transactions.drop("source_file")
df_offers=df_offers.drop("source_file")
df_profile=df_profile.drop("source_file")

### Remoção de Outliers

In [0]:
from pyspark.sql.functions import col

def remover_outliers(df, nome_df="df"):
    print(f"\nRemovendo outliers em: {nome_df}")
    numeric_cols = [f.name for f in df.schema.fields if str(f.dataType) in ('IntegerType', 'DoubleType', 'LongType', 'FloatType')]
    original_count = df.count()
    df_filtrado = df

    for col_name in numeric_cols:
        quantiles = df_filtrado.approxQuantile(col_name, [0.25, 0.75], 0.01)
        if len(quantiles) < 2:
            continue

        q1, q3 = quantiles
        iqr = q3 - q1
        lower = q1 - 1.5 * iqr
        upper = q3 + 1.5 * iqr

        df_filtrado = df_filtrado.filter((col(col_name) >= lower) & (col(col_name) <= upper))

    final_count = df_filtrado.count()
    total_removed = original_count - final_count

    print(f"{total_removed} linhas removidas por outliers em {nome_df}")
    print(f"Total final de linhas: {final_count}")
    return df_filtrado

# Aplicando a função nos três datasets
df_transactions = remover_outliers(df_transactions, "df_transactions")
df_offers = remover_outliers(df_offers, "df_offers")
df_profile = remover_outliers(df_profile, "df_profile")


Removendo outliers em: df_transactions
0 linhas removidas por outliers em df_transactions
Total final de linhas: 306137

Removendo outliers em: df_offers
0 linhas removidas por outliers em df_offers
Total final de linhas: 10

Removendo outliers em: df_profile
0 linhas removidas por outliers em df_profile
Total final de linhas: 17000


## Preparação do Dataset Unificado

#### Junção de múltiplas tabelas (joins)

In [0]:
# Faz o LEFT JOIN entre transações e perfis e remover colunas de ids duplicados.
print('Total de linhas Antes:',df_transactions.count())
df_joined = df_transactions.join(df_profile, df_transactions.account_id == df_profile.id, how="left")
df_joined=df_joined.drop("id")
df_joined = df_joined.join(df_offers, df_joined.offer_id == df_offers.id, how="left")
df_joined=df_joined.drop("id")
print('Total de linhas depois:',df_joined.count())

Total de linhas Antes: 306137
Total de linhas depois: 306137


#### Checagem de integridade após merge

In [0]:
from pyspark.sql.functions import col, sum as spark_sum, isnan, when

def contar_nulos(df, nome_df="DataFrame"):
    print(f"\nValores nulos no {nome_df}:\n")
    display(df.select([
        spark_sum(
            when(col(c).isNull(), 1).otherwise(0)
        ).alias(c) for c in df.columns
    ]))

# Exemplo para df_joined
contar_nulos(df_joined, "df_joined")


Valores nulos no df_joined:



account_id,event,time_since_test_start,amount,offer_id,reward,amount_is_null,reward_is_null,age,credit_card_limit,gender,registered_on,credit_card_limit_is_null,discount_value,duration,min_value,offer_type,channel_mobile,channel_email,channel_social,channel_web
0,0,0,0,138953,0,0,0,0,0,33749,0,0,138953,138953,138953,138953,138953,138953,138953,138953


#### Ajuste de Nulos Pós Merge

In [0]:
# Como na tabela transactions haviam offer_id nulos vamos prencher esses valores com zero, ou seja não houve oferta e vamos criar uma coluna para identificar quando não houve oferta.
from pyspark.sql.functions import when, col

#  Criar coluna binária indicando se offer_id é nulo
df_joined = df_joined.withColumn("is_offer_id_null", when(col("offer_id").isNull(), 1).otherwise(0))

#  Lista de colunas para preencher nulos com zero
colunas_para_preencher = [
    "offer_id", "discount_value", "duration", "min_value",
    "channel_mobile", "channel_email", "channel_social", "channel_web"
]

#  Substituir nulos por 0 nessas colunas
for coluna in colunas_para_preencher:
    df_joined = df_joined.withColumn(coluna, when(col(coluna).isNull(), 0).otherwise(col(coluna)))


## Engenharia de Variáveis

### Transformar categoricas em numéricas 

In [0]:
from pyspark.sql.functions import lit, when

def create_binary_columns(df, col_name):
    categories = df.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
    for cat in categories:
        new_col = f"{col_name}_{cat}"
        df = df.withColumn(new_col, when(df[col_name] == cat, 1).otherwise(0))
    return df

# Aplica para cada coluna categórica
categorical_cols = ["event", "gender", "offer_type"]
for col_name in categorical_cols:
    df_joined = create_binary_columns(df_joined, col_name)
df_joined=df_joined.drop("event", "gender", "offer_type")

### Transformar Data em tempo

In [0]:
from pyspark.sql.functions import to_date, datediff, col, max as spark_max, lit

# Vamos criar a coluna de data para tempo para o modelo conseguir interpretar 

#  Converter 'registered_on' para data no formato yyyyMMdd
df_joined = df_joined.withColumn("registered_on_date", to_date(col("registered_on").cast("string"), "yyyyMMdd"))

#  Obter a data mais recente da coluna 'registered_on_date'
max_data = df_joined.agg(spark_max("registered_on_date").alias("max_date")).collect()[0]["max_date"]

# Criar nova coluna com diferença em dias até a data mais recente
df_joined = df_joined.withColumn("dias_desde_registro", datediff(lit(max_data), col("registered_on_date")))

### Extração de features

In [0]:
from pyspark.sql.functions import year, month

# Criar colunas de ano e mês
df_joined = df_joined \
    .withColumn("ano_registro", year(col("registered_on_date"))) \
    .withColumn("mes_registro", month(col("registered_on_date")))

df_joined=df_joined.drop("registered_on")


In [0]:
display(df_joined.describe())

summary,account_id,time_since_test_start,amount,offer_id,reward,amount_is_null,reward_is_null,age,credit_card_limit,credit_card_limit_is_null,discount_value,duration,min_value,channel_mobile,channel_email,channel_social,channel_web,is_offer_id_null,event_transaction,event_offer received,event_offer completed,event_offer viewed,gender_F,gender_None,gender_M,gender_O,offer_type_discount,offer_type_None,offer_type_informational,offer_type_bogo,dias_desde_registro,ano_registro,mes_registro
count,306137,306137.0,306137.0,306137,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0,306137.0
mean,2.565638242424101E31,15.25770896690044,5.79953409747922,0.0,0.5313928077952028,0.5461084416454072,0.8916106187752543,60.91181725828632,57236.52155734198,0.1102414931876904,2.4129589040200954,3.6087993284052566,4.283588720082839,0.5014519643166295,0.5461084416454072,0.3589471380460382,0.4405543923145519,0.4538915583545929,0.4538915583545929,0.249159689942738,0.1083893812247457,0.1885593704779233,0.3689197973456328,0.0,0.5078869917716577,0.0129517176950188,0.227574582621506,0.0,0.08514488611308,0.2333889729108209,574.9588811545158,2016.4759797084312,6.701823040011498
stddev,0.0,8.347840597971611,21.349947949433396,0.0,1.796204936781422,0.4978702856115855,0.3108720620171976,26.03628379759729,28413.982091099115,0.3131910387847271,3.32528198686135,3.6491835271217674,5.40011520963605,0.4999987084215869,0.4978702856115855,0.4796924449839558,0.4964544539172295,0.4978702856115855,0.4978702856115855,0.43252716671732,0.3108720620171976,0.3911588348436399,0.482512943844163,0.0,0.4999386080191423,0.1130664073139756,0.4192671775488134,0.0,0.2790976333263564,0.4229883505202933,421.5297568664519,1.1996947819853838,3.500130897619721
min,0009655768c64bdeb2e877511632db8f,0.0,0.0,0,0.0,0.0,0.0,18.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2013.0,1.0
max,ffff82501cea40309d5fdd7edcca4a07,29.75,1062.28,fafdcd668e3743c1bb461111dcafc2a4,10.0,1.0,1.0,118.0,120000.0,1.0,10.0,10.0,20.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1823.0,2018.0,12.0


## Salvamento

### Salvar em Parquet

In [0]:
    # Caminho para salvar os dados processados
    processed_data_path = os.path.join(project_root, "data", "processed")

    # Cria a pasta se não existir
    os.makedirs(processed_data_path, exist_ok=True)

    # Caminho completo do arquivo Parquet
    parquet_output_path = os.path.join(processed_data_path, "unificado.parquet")

    # Salva como Parquet
    print(f"Salvando dados em: {parquet_output_path}")
    df_joined.write.mode("overwrite").parquet(parquet_output_path)
    print("Dados salvos com sucesso.")


Salvando dados em: /dbfs/databricks/driver/data/processed/unificado.parquet
Dados salvos com sucesso.
