In [1]:
# ==============================================================================
# DISCLAIMER IMPORTANTE PARA AMBIENTES GOOGLE COLAB
# ==============================================================================
# Como o Google Colab fornece um ambiente de execução temporário, ele não vem
# com Java ou Spark pré-instalados. Portanto, este bloco de código DEVE SER
# EXECUTADO TODA VEZ que você iniciar ou reiniciar uma sessão (runtime) no Colab
# para instalar e configurar todas as dependências necessárias para o PySpark.
# ==============================================================================


# --- Bloco de Instalação e Configuração do Ambiente Spark ---

# 1. Instalação do Java
# O Apache Spark é executado sobre a Java Virtual Machine (JVM), então o Java é um pré-requisito obrigatório.
# !apt-get update -qq: Atualiza a lista de pacotes do sistema operacional (baseado em Debian/Ubuntu). O '-qq' torna a saída mais silenciosa.
!apt-get update -qq
# !apt-get install: Instala o OpenJDK 11 (uma versão de código aberto do Java) sem interação do usuário (-y).
!apt-get install -y openjdk-11-jdk-headless

# 2. Download do Spark
# Baixa os binários pré-compilados do Apache Spark a partir do site oficial de arquivamento da Apache.
# !wget -q: Baixa o arquivo da URL especificada. O '-q' (quiet) minimiza as mensagens de log durante o download.
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# 3. Extração do Spark
# Descompacta o arquivo .tgz que foi baixado no passo anterior.
# !tar -xvzf:
#   x: eXtract (extrair)
#   v: verbose (mostra os arquivos sendo extraídos)
#   z: gZip (indica que o arquivo está compactado com gzip)
#   f: file (especifica o nome do arquivo a ser descompactado)
!tar -xvf spark-3.5.1-bin-hadoop3.tgz

# 4. Configuração das Variáveis de Ambiente
# Define as variáveis de ambiente para que o sistema operacional e o Python saibam onde encontrar as instalações do Java e do Spark.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# 5. Instalação das bibliotecas Python para Spark
# Instala as bibliotecas Python necessárias para interagir com o Spark.
# !pip install -q: Instala os pacotes usando o gerenciador de pacotes do Python (pip) em modo silencioso.
#   pyspark==3.5.1: A biblioteca que fornece a API Python para o Spark. A versão é fixada para corresponder à versão do Spark baixada.
#   findspark: Uma biblioteca útil que ajuda o Python a localizar a instalação do Spark no sistema.
!pip install -q pyspark==3.5.1 findspark

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk-headless is already the newest version (11.0.27+6~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 38 not upgraded.
spark-3.5.1-bin-hadoop3/
spark-3.5.1-bin-hadoop3/sbin/
spark-3.5.1-bin-hadoop3/sbin/spark-config.sh
spark-3.5.1-bin-hadoop3/sbin/stop-slave.sh
spark-3.5.1-bin-hadoop3/sbin/stop-mesos-dispatcher.sh
spark-3.5.1-bin-hadoop3/sbin/start-workers.sh
spark-3.5.1-bin-hadoop3/sbin/start-slaves.sh
spark-3.5.1-bin-hadoop3/sbin/start-all.sh
spark-3.5.1-bin-hadoop3/sbin/stop-all.sh
spark-3.5.1-bin-hadoop3/sbin/workers.sh
spark-3.5.1-bin-hadoop3/sbin/start-mesos-dispatcher.sh
spark-3.5.1-bin-hadoop3/sbin/spark-daemon.sh
spark-3.5.1-bin-hadoop3/sbin/decommission-worker.sh
s

In [2]:
from google.colab import drive, files
from pyspark.sql import SparkSession
import findspark
import requests
import gzip
import tarfile
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.functions import lower,when, expr, row_number, col, count, sum, avg, trim, date_format,  explode, from_json, pandas_udf, lit, concat_ws, transform, flatten

import pandas as pd
import json
from transformers import pipeline

findspark.init()



In [3]:
spark = SparkSession.builder \
    .appName("IFood Test Case") \
    .getOrCreate()

spark

In [25]:
def sink_data_silver(df,silver_path):
  df.write.mode("overwrite").parquet(silver_path)


In [4]:
###################################################################################
# SEÇÃO DE CONFIGURAÇÃO: ORIGEM DOS DADOS
# ---------------------------------------------------------------------------------
# Define se os arquivos de dados serão lidos a partir do Google Drive ou de uma
# pasta local (uploaded).
###################################################################################

# Variável de controle para a fonte dos dados.
# Opções válidas:
#   "drive": Para carregar os arquivos diretamente de uma pasta no Google Drive.
#   "uploaded": Para carregar os arquivos de um diretório local.
data_source = "drive"

# Define o caminho base no Google Drive onde os arquivos de dados estão localizados.
# Esta variável só é utilizada se 'data_source' for definida como "drive".
# Exemplo de estrutura de pastas: drive/MyDrive/iFood/Data/Bronze/
drive_bronze_path = "drive/MyDrive/iFood/Data/Bronze"
upload_bronze_path = "./Data/Bronze"  ##Pode variar caso os .parquets nao sejam exatamente os criados no primeiro notebook.

if data_source == "drive":
    drive.mount('/content/drive')
    bronze_path = drive_bronze_path
else:
    bronze_path = upload_bronze_path


Mounted at /content/drive


In [None]:
### apenas para o fluxo usando upload manual...###########
######################################################

# Criar diretório local para salvar os arquivos
os.makedirs('Data', exist_ok=True)
# depois faca o upload dos zip nessa pasta, caso nao esteja usando o fluxo do google drive.


In [None]:
### apenas para o fluxo usando upload manual...###########
######################################################

# Define uma variável para armazenar o nome da pasta onde os arquivos .zip
# devem ser encontrados. O caminho "./Data" refere-se a uma pasta chamada "Data"
# localizada no mesmo nível do diretório de execução do notebook (/content/).
caminho_da_pasta_zip = "./Data"


# Imprime uma mensagem informativa para o usuário saber qual ação está sendo executada.
print(f"\nProcurando arquivos .zip em: {caminho_da_pasta_zip}")

# Bloco de verificação para garantir que a pasta de origem existe.
# Se a pasta não for encontrada, o script exibe um erro claro em vez de falhar.
if not os.path.isdir(caminho_da_pasta_zip):
    print(f"ERRO: O diretório '{caminho_da_pasta_zip}' não foi encontrado.")
    print("Por favor, verifique o caminho e tente novamente.")
else:
    # Se a pasta existe, esta linha obtém uma lista de todos os nomes de arquivos e subpastas dentro dela.
    arquivos_na_pasta = os.listdir(caminho_da_pasta_zip)

    # Inicializa uma variável de controle para rastrear se algum arquivo .zip foi processado.
    zip_encontrado = False

    # Inicia um loop para examinar cada item encontrado na pasta de origem.
    for nome_arquivo in arquivos_na_pasta:
        # Condição para filtrar e processar apenas os itens que são arquivos .zip.
        if nome_arquivo.endswith(".zip"):
            # Atualiza a variável de controle, indicando que pelo menos um zip foi encontrado.
            zip_encontrado = True
            # Constrói o caminho completo para o arquivo .zip, unindo o nome da pasta e o nome do arquivo.
            caminho_completo_zip = os.path.join(caminho_da_pasta_zip, nome_arquivo)

            print(f"  -> Descompactando '{nome_arquivo}'...")

            # Utiliza um comando de shell (!unzip) para extrair o conteúdo do arquivo.
            # A opção "-o" força a substituição de arquivos existentes sem pedir permissão.
            # A opção "-d /content/" especifica o diretório de destino para a extração.
            !unzip -o "{caminho_completo_zip}" -d /content/

    # Após o loop, se a variável de controle não mudou, significa que nenhum zip foi processado.
    if not zip_encontrado:
        print("Nenhum arquivo .zip foi encontrado na pasta especificada.")

# Mensagem final para indicar que todas as operações foram concluídas.
print("\nProcesso de descompactação concluído! ✅")


# Bloco final para verificação visual dos resultados.
# O comando de shell "!ls" lista o conteúdo do diretório especificado.
print("\nConteúdo extraído no diretório (/content/Data/Bronze/):")
!ls -l /content/Data/Bronze

In [5]:
# Caminhos para os dados de pedidos
orders_bronze_path = f"{bronze_path}/orders.parquet"

# Caminhos para os dados de consumidores
#consumers_bronze_path = f"{bronze_path}/consumers.parquet"
# Caminhos para os dados de restaurantes
#restaurants_bronze_path = f"{bronze_path}/restaurants.parquet"
# Caminhos para os dados de referencia do teste A/B
#ab_test_bronze_path = f"{bronze_path}/ab_test_ref.parquet"


In [6]:
dfp_orders = spark.read.parquet(orders_bronze_path)
#dfp_consumers = spark.read.parquet(consumers_bronze_path)
#dfp_restaurants = spark.read.parquet(restaurants_bronze_path)
#dfp_ab_test = spark.read.parquet(ab_test_bronze_path)

In [7]:
# --- Bloco de Verificação: Validação da Carga de Dados na Camada Bronze ---
#
# O objetivo deste bloco é realizar um teste de sanidade (sanity check) para
# garantir que a ingestão de dados para a camada Bronze ocorreu como esperado.
# Contamos o número de registros em cada DataFrame e comparamos com os valores
# conhecidos da fonte de dados original.
#
# Esta é uma etapa crucial de Data Quality (Qualidade de Dados).

# Contagem de registros no DataFrame de pedidos (orders).
# O valor esperado, conforme a fonte, é de aproximadamente 3.6 milhões de registros.
print("Contagem de registros em 'dfp_orders':")
print(dfp_orders.count())

# Contagem de registros no DataFrame de consumidores (consumers).
# O valor esperado é de aproximadamente 806 mil registros.
#print("\nContagem de registros em 'dfp_consumers':")
#print(dfp_consumers.count())

# Contagem de registros no DataFrame de restaurantes (restaurants).
# O valor esperado é de aproximadamente 7 mil registros.
#print("\nContagem de registros em 'dfp_restaurants':")
#print(dfp_restaurants.count())

# Contagem de registros no DataFrame de teste A/B (ab_test).
# O valor esperado é de aproximadamente 806 mil registros.
#print("\nContagem de registros em 'dfp_ab_test':")
#print(dfp_ab_test.count())

Contagem de registros em 'dfp_orders':
3670826


##EDA ORDERS DF####

In [8]:
###################################################################################
#1.Inspeção Inicial da Estrutura e Amostra dos Dados de Pedidos
###################################################################################

# O comando .printSchema() é usado para exibir a estrutura (schema) do DataFrame.
# Ele mostra uma lista de todas as colunas, seus respectivos tipos de dados
# (ex: String, Integer, Timestamp) e se a coluna pode conter valores nulos (nullable).
# É um passo fundamental para verificar se os dados foram carregados corretamente.
dfp_orders.printSchema()

# O comando .show() exibe as primeiras linhas do DataFrame em formato de tabela.
# - O primeiro argumento (5) especifica o número de linhas a serem mostradas.
# - O argumento 'truncate=False' é crucial para garantir que o conteúdo completo
#   de cada coluna seja exibido, sem cortes, permitindo uma visualização real dos dados.
dfp_orders.show(5, truncate=False)

root
 |-- cpf: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- delivery_address_city: string (nullable = true)
 |-- delivery_address_country: string (nullable = true)
 |-- delivery_address_district: string (nullable = true)
 |-- delivery_address_external_id: string (nullable = true)
 |-- delivery_address_latitude: string (nullable = true)
 |-- delivery_address_longitude: string (nullable = true)
 |-- delivery_address_state: string (nullable = true)
 |-- delivery_address_zip_code: string (nullable = true)
 |-- items: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- merchant_latitude: string (nullable = true)
 |-- merchant_longitude: string (nullable = true)
 |-- merchant_timezone: string (nullable = true)
 |-- order_created_at: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_scheduled: boolean (nullable = true)
 |-- order_scheduled_date: string (nullable = true)
 |--

In [None]:
###################################################
#2.Contagem de Valores Nulos
###################################################

print("Contagem de valores nulos por coluna:")

# Calcula e exibe a contagem de valores nulos para cada coluna do DataFrame.
dfp_orders.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in dfp_orders.columns]).show()

Null value counts after cleaning:
+---+-----------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+-----+-----------+-----------------+------------------+-----------------+----------------+--------+---------------+--------------------+------------------+---------------+
|cpf|customer_id|customer_name|delivery_address_city|delivery_address_country|delivery_address_district|delivery_address_external_id|delivery_address_latitude|delivery_address_longitude|delivery_address_state|delivery_address_zip_code|items|merchant_id|merchant_latitude|merchant_longitude|merchant_timezone|order_created_at|order_id|order_scheduled|order_scheduled_date|order_total_amount|origin_platform|
+---+-----------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------

In [None]:
### 2.1. Análise de Nulos na Coluna 'customer_id' ###

# Filtra o DataFrame para selecionar e exibir apenas as linhas onde
# a coluna 'customer_id' contém valores nulos.
dfp_orders.filter(col("customer_id").isNull()).show()

+-----------+-----------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+--------------------+--------------------+-----------------+------------------+-----------------+--------------------+--------------------+---------------+--------------------+------------------+---------------+
|        cpf|customer_id|customer_name|delivery_address_city|delivery_address_country|delivery_address_district|delivery_address_external_id|delivery_address_latitude|delivery_address_longitude|delivery_address_state|delivery_address_zip_code|               items|         merchant_id|merchant_latitude|merchant_longitude|merchant_timezone|    order_created_at|            order_id|order_scheduled|order_scheduled_date|order_total_amount|origin_platform|
+-----------+-----------+-------------+---------------------+------------------------+----

In [9]:
### 2.1.1. Verificação Individual de CPFs ###

# Busca individualmente por pedidos de CPFs com customer_id NULL para inspeção visual.
dfp_orders.filter(col("cpf")=='78812348914').show()
dfp_orders.filter(col("cpf")=='05486225449').show()
dfp_orders.filter(col("cpf")=='07098352090').show()
dfp_orders.filter(col("cpf")=='75159020523').show()

+-----------+-----------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+--------------------+--------------------+-----------------+------------------+-----------------+--------------------+--------------------+---------------+--------------------+------------------+---------------+
|        cpf|customer_id|customer_name|delivery_address_city|delivery_address_country|delivery_address_district|delivery_address_external_id|delivery_address_latitude|delivery_address_longitude|delivery_address_state|delivery_address_zip_code|               items|         merchant_id|merchant_latitude|merchant_longitude|merchant_timezone|    order_created_at|            order_id|order_scheduled|order_scheduled_date|order_total_amount|origin_platform|
+-----------+-----------+-------------+---------------------+------------------------+----

### Conclusão e Estratégia de Tratamento para `customer_id` Nulo

A investigação sobre a qualidade dos dados revelou a existência de valores nulos na coluna `customer_id`. A quantificação e análise desses registros nos permite agora tomar uma decisão informada sobre como tratá-los.

#### Análise dos Resultados

A contagem final identificou um total de **8.505 pedidos** com o campo `customer_id` nulo.

A ausência do `customer_id` é considerada uma falha crítica para diversas análises de negócio, pois quebra a rastreabilidade do pedido até um cliente específico.

#### Decisão e Estratégia de Limpeza

Considerando que o volume de 8.505 registros representa uma fração relativamente pequena do nosso universo total de pedidos (que está na casa dos milhões), a imputação (preenchimento) desses valores seria uma tarefa complexa e com alto risco de introduzir imprecisões na base.

Portanto, a estratégia de limpeza mais segura e eficaz para garantir a integridade das análises subsequentes é a **remoção completa** destes registros.

> **Ação a ser tomada:** Todos os 8.505 registros onde `customer_id` é nulo serão filtrados e permanentemente removidos do conjunto de dados que seguirá para as próximas etapas da análise.


In [15]:
### 2.2. Remoção de Registros com 'customer_id' Nulo ###


# 1. Conta e exibe o total de registros ANTES da limpeza.
count_antes = dfp_orders.count()
print(f"Total de registros antes da remoção dos nulos: {count_antes}")

# 2. Aplica o filtro para manter apenas as linhas onde 'customer_id' NÃO é nulo.
# O resultado é salvo em um novo DataFrame final.
dfp_orders_nonull = dfp_orders.filter(col("customer_id").isNotNull())

# 3. Conta e exibe o total de registros APÓS a limpeza.
count_apos = dfp_orders_nonull.count()
print(f"Total de registros após a remoção dos nulos:  {count_apos}")

# 4. Exibe a quantidade de registros que foram removidos.
print(f"Total de registros removidos: {count_antes - count_apos}")

Total de registros antes da remoção dos nulos: 3670826
Total de registros após a remoção dos nulos:  3662321
Total de registros removidos: 8505


In [16]:
### 2.2. Verificação de Duplicidade de 'order_id' ###

# Agrupa os pedidos por 'order_id', conta as ocorrências e filtra os IDs que aparecem mais de uma vez.
duplicate_orders = dfp_orders_nonull.groupBy("order_id").count().where("count > 1")

# Conta e exibe o número total de 'order_id's distintos que possuem duplicatas.
print(f"Número de 'order_id's duplicados: {duplicate_orders.count()}")



# Ordena os 'order_id's duplicados pela contagem em ordem decrescente.
duplicate_orders = duplicate_orders.orderBy(F.desc("count"))
# Exibe a lista de 'order_id's duplicados e suas respectivas contagens.
duplicate_orders.show(truncate=False)

Número de 'order_id's duplicados: 1234906
+----------------------------------------------------------------+-----+
|order_id                                                        |count|
+----------------------------------------------------------------+-----+
|3f5f739ab7928da58fe12a2635815b20021d04703161b2ea4577df1631cc524f|2    |
|2e06efcb70b6c19a7dc62576198a469e206bc5548e5b950b149597036a1706f0|2    |
|418469a489ffe3749e1000e577186606adc6697198258383e055fe4ca883e6c7|2    |
|a644e1d10cefc25c37528153cd802bdb342b888042e0dee3c937e1c2c54ed8e8|2    |
|89695ff3738ee6bb2a5cc2405fd779ad258783a9d02811d890a089467c35d794|2    |
|74ce4b1e650faaa1ec9c7a669b3f51d06aee39e9f671276fc6852b9cfe884e84|2    |
|f1996541846dd110c3c369e286fedc43baef6aea41ebeffd748d364669b575f4|2    |
|dc307b8e96ba3b55119fbc9d643fbacc2c3d183020dce9138d5cfdc05418dac1|2    |
|fd5bb792b17e379de01167f2356cc5bcf6a19ba6039a96810e93e7333e416e86|2    |
|cdb0919002afea59f43b96bfc7bd98ca4209115226e0d11c327ef9231ccc0b6f|2    |
|7c50fe46

In [12]:
### 2.2.1. Inspeção de um Caso de Duplicidade ###

# Filtra e exibe todas as linhas de um 'order_id' específico,
# identificado como duplicado na etapa anterior.
dfp_orders_nonull.filter(col("order_id")=='89695ff3738ee6bb2a5cc2405fd779ad258783a9d02811d890a089467c35d794').show(truncate=False)

dfp_orders_nonull.filter(col("order_id")=='72f8d9cf8aad32d4bc15451527014e92cb2e3a998e68c54274e231b53a8f7d93').show(truncate=False)

dfp_orders_nonull.filter(col("order_id")=='a644e1d10cefc25c37528153cd802bdb342b888042e0dee3c937e1c2c54ed8e8').show(truncate=False)

dfp_orders_nonull.filter(col("order_id")=='63f2e3d0445a2bfbc69fb2ce1ae90b4755e3bcbbdbf3727574ea6c4fca251e8b').show(truncate=False)
##Todos os registros duplicados, estao duplicados por um fator de 2.


##ANALISE DA COLUNA order_created_at  para varios duplicados
#2018-12-06T22:00:40.000Z
#2019-01-05T22:00:40.000Z

#2018-12-09T22:58:10.000Z
#2019-01-08T22:58:10.000Z

#2018-12-29T21:24:59.000Z
#2019-01-28T21:24:59.000Z

# Extract the time portion from 'order_created_at'
# The format 'HH:mm:ss.SSSZ' will give you the time part with milliseconds and timezone
#dfp_orders_with_time = dfp_orders.withColumn(
#    "order_created_time",
#    date_format(col("order_created_at"), "HH:mm:ss.SSSZ")
#)

# Group by order_id and the extracted time, then count
#duplicate_orders_by_time = dfp_orders_with_time.groupBy("order_id", "order_created_time").count().where("count > 1")

#print(f"Number of duplicate order IDs considering creation timestamp: {duplicate_orders_by_time.count()}")



+-----------+----------------------------------------------------------------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Análise e Estratégia de Tratamento para Pedidos Duplicados

A investigação detalhada dos `order_id`s duplicados revelou um padrão consistente e anômalo em todos os casos, nos permitindo formar uma hipótese  sobre a origem do problema e definir uma estratégia de limpeza.

#### Achados da Investigação

A análise de casos específicos confirmou os seguintes pontos:

- **Identidade dos Pedidos**: Para um mesmo `order_id`, os registros duplicados são **idênticos** em todas as suas colunas (itens, valores, cliente, etc.).
- **Anomalia no Timestamp**: A única coluna que apresenta diferença é a `order_created_at`. Notavelmente, a data de criação difere apenas no **dia**, enquanto a hora, minuto, segundo e milissegundo (`hh:mm:ss.ms`) são **exatamente os mesmos**.
- **Escala do Problema**: Este padrão se repete de forma idêntica em todos os **1.237.852** registros duplicados que foram identificados.

#### Hipótese e Conclusão

A probabilidade de múltiplos clientes realizarem pedidos idênticos, em dias diferentes, mas no mesmo exato milissegundo, é praticamente zero.

A conclusão mais plausível é que a duplicidade foi gerada por uma **falha técnica sistêmica**. Uma hipótese forte é a de um processo de replicação ou ingestão de dados que foi re-executado em um dia subsequente, inserindo novamente um lote de pedidos e atualizando apenas a data do evento para o dia da nova execução.

#### Estratégia de Limpeza

Com base nesta conclusão, a abordagem de tratamento será a **deduplicação** dos registros. A regra para a limpeza será:

> Para cada `order_id` duplicado, vamos **manter apenas a primeira ocorrência**, ou seja, o registro com o `order_created_at` **mais antigo**, e remover todas as instâncias subsequentes.

Esta ação garantirá a unicidade da chave `order_id`, corrigindo a anomalia e preparando o conjunto de dados para análises futuras sem o risco de métricas inflacionadas. O código na próxima seção implementará esta lógica.

In [17]:
### 3.1. Execução da Remoção de Duplicatas ###

# Define uma especificação de janela (Window) para agrupar dados pelo 'order_id'.
# Os dados dentro de cada grupo são ordenados pela data de criação.
windowSpec = Window.partitionBy("order_id").orderBy("order_created_at")

# Adiciona uma coluna 'row_number' que numera os registros dentro de cada grupo de 'order_id'.
# O registro mais antigo de cada grupo receberá o número 1.
dfp_orders_ranked = dfp_orders_nonull.withColumn("row_number", row_number().over(windowSpec))

# Filtra o DataFrame para manter apenas o primeiro registro de cada grupo (o mais antigo).
# Em seguida, remove a coluna auxiliar 'row_number' para limpar o DataFrame.
dfp_orders_cleaned = dfp_orders_ranked.filter(col("row_number") == 1).drop("row_number")


Contagem de registros após a limpeza: 2427415


In [18]:
### 2.3. Sanity Check no DataFrame Final ###

print("--- Iniciando Sanity Check no DataFrame Final ---")

# 1. Verificação de Duplicatas de 'order_id'
# A contagem de 'order_id's que aparecem mais de uma vez deve ser 0.
duplicatas_final = dfp_orders_cleaned.groupBy("order_id").count().where("count > 1").count()
print(f"\nNúmero de 'order_id's duplicados encontrados: {duplicatas_final}")


# 2. Verificação de Valores Nulos
# Exibe a contagem de nulos para todas as colunas. Esperamos 0 para 'customer_id'.
print("\nContagem de valores nulos no DataFrame final:")
dfp_orders_cleaned.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in dfp_orders_cleaned.columns]).show()


# 3. Verificação da Contagem Total de Registros
# Exibe o número final de linhas no DataFrame limpo.
print("\nContagem total de registros:")
print(f"O DataFrame final possui {dfp_orders_cleaned.count()} linhas.")

print("\n--- Sanity Check Concluído ---")

Número de 'order_id's duplicados: 0


In [20]:
### 4.1. Parsing da Coluna 'items' e Preparação da String de Busca ###

# Define o schema (a estrutura) da coluna 'items', que é um JSON aninhado.
# Isso permite que o Spark entenda como ler os nomes dos itens principais e dos acompanhamentos ('garnishItems').
item_schema = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("garnishItems", ArrayType(
            StructType([
                StructField("name", StringType()),
                StructField("categoryName", StringType())
            ])
        ))
    ])
)

# Usa o schema definido para converter a string JSON 'items' em uma coluna estruturada ('items_array').
df_parsed = dfp_orders_cleaned.withColumn("items_array", from_json("items", item_schema))

# Realiza uma sequência de transformações para extrair todos os nomes de itens e acompanhamentos,
# e depois concatená-los em uma única string de busca ('search_string') para cada pedido.
df_names = df_parsed.withColumn("main_item_names", expr("transform(items_array, x -> x.name)")) \
    .withColumn("garnish_names_nested", expr("transform(items_array, x -> transform(x.garnishItems, g -> g.name))")) \
    .withColumn("flat_garnish_names", expr("flatten(garnish_names_nested)")) \
    .withColumn("all_names", expr("concat(main_item_names, flat_garnish_names)")) \
    .withColumn("search_string", lower(concat_ws(" ", col("all_names"))))


### 4.2. Aplicação das Regras de Classificação ###

# Cria a nova coluna 'order_category' usando uma série de condições (CASE WHEN).
# Cada condição ('when') usa uma expressão regular ('rlike') para buscar um conjunto de
# palavras-chave na 'search_string'. Se uma correspondência é encontrada, a categoria é atribuída.
# Se nenhuma das condições for satisfeita, o pedido é classificado como 'Outros'.
df_classified = df_names.withColumn(
    "order_category",
    when(lower(col("search_string")).rlike("pizza|pizzas|massa tradicional|massa fina|massa pan|massa grossa|1/2|1/3|brotinho|calzone|borda recheada|borda tradicional"), "Pizza")
    .when(lower(col("search_string")).rlike("lasanha|lasagna|canelone|espaguete|massa|penne|nhoque|gnocchi|talharim|ravióli|fettuccine|carbonara|bolonhesa|macarrão|risoto|polenta|cannelloni|capeletti|rondelli"), "Italiana")
    .when(lower(col("search_string")).rlike("pf|prato feito|marmita|marmitex|quentinha|executivo|almoço|prato do dia|comercial"), "Marmita")
    .when(lower(col("search_string")).rlike("feijoada|baião de dois|baião|tropeiro|moqueca|bobó|vatapá|caruru|barreado|vaca atolada|galinhada|sarapatel"), "Brasileira")
    .when(lower(col("search_string")).rlike("esfirra|esfiha|quibe|kibe|tabule|homus|kafta|árabe|beirute|coalhada|babaganoush|shawarma|falafel"), "Arabe")
    .when(lower(col("search_string")).rlike("açai|açaí|creme de açaí|açaí na tigela"), "Açai")
    .when(lower(col("search_string")).rlike("hambúrguer|hamburguer|burger|smash|xis|prensado|cheeseburger|cheese salada|cheese bacon|cheese tudo|x-tudo|x-salada|x-bacon|x-calabresa|x-frango|x-egg|x-coração|x-picanha|sanduiche|sanduíche|wrap|bauru|baguete|hot dog|cachorro quente|dogão|misto quente|americano|podrão"), "Lanches")
    .when(lower(col("search_string")).rlike("fit|saudável|fitness|low carb|detox|integral|grelhado|salada no pote|poke|wrap fit|suco verde"), "Saúdavel")
    .when(lower(col("search_string")).rlike("bolo|pudim|doce|mousse|torta|sobremesa|brigadeiro|banoffee|churros|brownie|cheesecake|palha italiana|red velvet"), "Doces e bolos")
    .when(lower(col("search_string")).rlike("sorvete|gelato|geladinho|sacolé|picolé|milkshake|milk shake|sundae|banana split|frozen yogurt"), "Sorvetes")
    .when(lower(col("search_string")).rlike("pastel|pastelão|pastel de feira"), "Pastel")
    .when(lower(col("search_string")).rlike("coxinha|risoles|joelho|bolinho de|enroladinho de|empada|salgado frito|salgado assado|panqueca|croquete|escondidinho"), "Salgados")
    .when(lower(col("search_string")).rlike("picanha|mignom|mignon|alcatra|maminha|ancho|chorizo|contra-filé|entrecot|carne de sol|bife|filé|costela|cupim|churrasco|espeto|parrilla|cordeiro"), "Carnes")
    .when(lower(col("search_string")).rlike("yakisoba|rolinho primavera|frango xadrez|lombo agridoce|chinesa|yakissoba|yakisoba|gyoza|guioza"), "Chinesa")
    .when(lower(col("search_string")).rlike("taco|burrito|quesadilla|nachos|guacamole|chilli|mexicana"), "Mexicana")
    .when(lower(col("search_string")).rlike("sopa|caldo|creme de|canja"), "Caldos e Sopas")
    .when(lower(col("search_string")).rlike("tapioca|crepe"), "Tapiocas e Crepes")
    .when(lower(col("search_string")).rlike("vegetariano|vegano|sem carne|veggie|plant based"), "Vegetariana")
    .when(lower(col("search_string")).rlike("pão|salgadinho|croissant|pão de queijo|rosquinha|padaria|confeitaria|sonho|folhado"), "Padaria")
    .when(lower(col("search_string")).rlike("sushi|sashimi|temaki|uramaki|niguiri|filadélfia|filadelfia|combinado|hot roll|roll|hot|holl|shimeji|japonês|japonesa|lamen|lámen|guioza|gyoza|sunomono|yakimeshi|combo|pecas|peças"), "Japonesa")
    .otherwise("Outros")
)

In [None]:
### 4.3. Inspeção Visual da Classificação ###

# Exibe as 20 primeiras linhas do DataFrame com a nova coluna 'order_category',
# permitindo uma verificação visual da lógica de classificação aplicada.
df_classified.show(20, truncate=False)
print(f"O DataFrame df_classified possui {df_classified.count()} linhas.")

In [None]:
### 4.4. Agregação e Análise de Frequência das Categorias ###

# 1. Calcula o número total de pedidos no DataFrame para usar como base no cálculo percentual.
total_orders = df_classified.count()

# 2. Agrupa os dados por 'order_category', conta os pedidos em cada grupo,
#    calcula o percentual correspondente e ordena o resultado da maior para a menor categoria.
category_summary_df = df_classified.groupBy("order_category") \
    .agg(F.count("*").alias("count")) \
    .withColumn("percentage", (F.col("count") / total_orders) * 100) \
    .orderBy(F.desc("count"))

# 3. Exibe a tabela final com o resumo numérico da distribuição das categorias.
print("--- Resumo da Distribuição de Categorias ---")
category_summary_df.show(truncate=False)


##resultado cacheado
#--- Summary of Category Distribution ---
#+-----------------+------+-------------------+
#|order_category   |count |percentage         |
#+-----------------+------+-------------------+
#|Pizza            |553652|22.756182351311605 |
#|Outros           |523706|21.52534305750904  |
#|Japonesa         |253403|10.41535996685538  |
#|Lanches          |223619|9.191179190570882  |
#|Carnes           |175192|7.200734574228907  |
#|Chinesa          |147408|6.058757717920537  |
#|Italiana         |122793|5.047032972814342  |
#|Arabe            |100459|4.129061798440921  |
#|Marmita          |76271 |3.1348875902496283 |
#|Saúdavel         |61734 |2.5373883979031424 |
#|Doces e bolos    |54133 |2.224972400033868  |
#|Açai             |33315 |1.3693117970023518 |
#|Brasileira       |20295 |0.8341642779577586 |
#|Sorvetes         |15151 |0.6227357957791575 |
#|Salgados         |13302 |0.5467382717612272 |
#|Pastel           |12768 |0.5247898251276011 |
#|Padaria          |12275 |0.5045265588534855 |
#|Mexicana         |11555 |0.4749331476620794 |
#|Tapiocas e Crepes|11057 |0.45446437158802355|
#|Caldos e Sopas   |7349  |0.3020583039522823 |
#+-----------------+------+-------------------+

In [21]:
# 1. Calculate the total number of orders to compute percentages
total_orders = df_classified.count()

# 2. Group by category, count, and calculate the percentage
category_summary_df = df_classified.groupBy("order_category") \
    .agg(F.count("*").alias("count")) \
    .withColumn("percentage", (F.col("count") / total_orders) * 100) \
    .orderBy(F.desc("count"))

In [22]:
# 3. Display the numerical summary table
print("--- Summary of Category Distribution ---")
category_summary_df.show(truncate=False)


--- Summary of Category Distribution ---
+-----------------+------+------------------+
|order_category   |count |percentage        |
+-----------------+------+------------------+
|Pizza            |552338|22.75416440946439 |
|Outros           |513775|21.165519698938994|
|Japonesa         |261545|10.774630625583184|
|Lanches          |223154|9.193071642055438 |
|Carnes           |174785|7.200458100489615 |
|Chinesa          |147189|6.063610878238785 |
|Italiana         |122487|5.045985132332131 |
|Arabe            |100196|4.127683152654161 |
|Marmita          |76141 |3.1367112751630852|
|Saúdavel         |61634 |2.5390796382159624|
|Doces e bolos    |53956 |2.2227760807278525|
|Açai             |33210 |1.3681220557671432|
|Brasileira       |20235 |0.8336028244037381|
|Sorvetes         |15091 |0.6216901518693754|
|Salgados         |13269 |0.5466308809989228|
|Pastel           |12751 |0.5252913078315822|
|Padaria          |12240 |0.5042401072746111|
|Mexicana         |11528 |0.47490849319

In [24]:
### 5.1. Seleção de Colunas e Criação do DataFrame Final ###

# Lista das colunas que serão mantidas na visão final.
# Incluímos as chaves, dados do pedido, informações de localização e a categoria que criamos.
colunas_finais = [
    "order_id",
    "customer_id",
    "merchant_id",
    "order_created_at",
    "order_total_amount",
    "delivery_address_city",
    "delivery_address_state",
    "delivery_address_latitude",
    "delivery_address_longitude",
    "origin_platform",
    "order_category"  # A coluna de categoria criada na etapa anterior
]

# Cria o DataFrame final ('df_visao_final') selecionando apenas as colunas da lista.
# O DataFrame de origem é o 'df_classified', que já contém todas as transformações.
df_visao_final = df_classified.select(colunas_finais)


# --- Verificação da Estrutura Final ---

# Exibe o schema do DataFrame final para confirmar que apenas as colunas desejadas permaneceram.
print("--- Schema do DataFrame Final ---")
df_visao_final.printSchema()

#Count final.
print("--- Count do DataFrame Final ---")
print(f"O  df_visao_final possui {df_visao_final.count()} linhas.")

# Mostra uma amostra do DataFrame final para uma última inspeção visual.
print("\n--- Amostra do DataFrame Final ---")
df_visao_final.show(10, truncate=False)

--- Schema do DataFrame Final ---
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- order_created_at: string (nullable = true)
 |-- order_total_amount: double (nullable = true)
 |-- delivery_address_city: string (nullable = true)
 |-- delivery_address_state: string (nullable = true)
 |-- delivery_address_latitude: string (nullable = true)
 |-- delivery_address_longitude: string (nullable = true)
 |-- origin_platform: string (nullable = true)
 |-- order_category: string (nullable = false)

--- Count do DataFrame Final ---
O  df_visao_final possui 2427415 linhas.

--- Amostra do DataFrame Final ---
+----------------------------------------------------------------+----------------------------------------------------------------+----------------------------------------------------------------+------------------------+------------------+---------------------+----------------------+-------------------------

In [28]:
drive_base_path = "drive/MyDrive/iFood/Data/"

if data_source == "drive":
    # Define o caminho para a camada 'Bronze' dentro da estrutura do Drive.
    orders_silver_path = f"{drive_base_path}/Silver/orders.parquet"

sink_data_silver(df_visao_final, orders_silver_path)


In [29]:
file_path = "Data/Silver/orders.parquet"
os.makedirs('Data/Silver', exist_ok=True)


df_visao_final.write.mode("overwrite").parquet(file_path)

if os.path.exists(file_path):
  print(f"Baixando: {file_path}")
  try:
    zip_file_name = f"{os.path.basename(file_path)}.zip"
    !zip -r "{zip_file_name}" "{file_path}"
    files.download(zip_file_name)
    print(f"Download de {file_path} concluído.")
  except Exception as e:
    print(f"Erro ao baixar {file_path}: {e}")
  else:
    print(f"Arquivo não encontrado, ignorando download: {file_path}")

print("Processo de download concluído.")

Baixando: Data/Silver/orders.parquet
  adding: Data/Silver/orders.parquet/ (stored 0%)
  adding: Data/Silver/orders.parquet/.part-00034-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet.crc (deflated 0%)
  adding: Data/Silver/orders.parquet/part-00010-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet (deflated 28%)
  adding: Data/Silver/orders.parquet/part-00020-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet (deflated 27%)
  adding: Data/Silver/orders.parquet/part-00023-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet (deflated 28%)
  adding: Data/Silver/orders.parquet/.part-00039-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet.crc (deflated 0%)
  adding: Data/Silver/orders.parquet/.part-00013-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet.crc (deflated 0%)
  adding: Data/Silver/orders.parquet/.part-00019-2ad50eb0-ccc6-4021-95cf-616207427958-c000.snappy.parquet.crc (deflated 0%)
  adding: Data/Silver/orders.parquet/.part-00005-2ad50eb0

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Download de Data/Silver/orders.parquet concluído.
Arquivo não encontrado, ignorando download: Data/Silver/orders.parquet
Processo de download concluído.
