<a href="https://colab.research.google.com/github/jeacfontes/futebol_2025/blob/main/ifood_abtest_data_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# Instalação de Pacotes
!pip install -r requirements.txt

Collecting numpy==1.26.4 (from -r requirements.txt (line 3))
  Downloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting scipy==1.15.2 (from -r requirements.txt (line 4))
  Downloading scipy-1.15.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting matplotlib==3.10.1 (from -r requirements.txt (line 5))
  Downloading matplotlib-3.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting statsmodels==0.14.2 (from -r requirements.txt (line 7))
  Downloading statsmodels-0.14.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.2 kB)
Collecting tqdm==4.66.4 (from -r requirements.txt (line 8))
  Downloa

In [1]:
    # 2. Configuração do Ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

In [2]:
# 3. Inicialização da SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("iFood AB Test - Ingestão") \
    .getOrCreate()

In [3]:
# 4. Download dos Arquivos
import requests
from tqdm import tqdm

In [4]:
# URLs dos arquivos
data_urls = {
    "order.json.gz": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/order.json.gz",
    "consumer.csv.gz": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/consumer.csv.gz",
    "restaurant.csv.gz": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/restaurant.csv.gz",
    "ab_test_ref.tar.gz": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz"
}

os.makedirs("data", exist_ok=True)

for name, url in data_urls.items():
    path = f"data/{name}"
    if not os.path.exists(path):
        print(f"🔽 Baixando: {name}")
        r = requests.get(url, stream=True)
        with open(path, "wb") as f:
            for chunk in tqdm(r.iter_content(chunk_size=8192), desc=f"⬇️ {name}"):
                f.write(chunk)
        print(f"✅ Download concluído: {name}")
    else:
        print(f"✅ Já existe: {name}")

✅ Já existe: order.json.gz
✅ Já existe: consumer.csv.gz
✅ Já existe: restaurant.csv.gz
✅ Já existe: ab_test_ref.tar.gz


In [5]:
# 5. Inspeção e extração do arquivo TAR
import tarfile
with tarfile.open("data/ab_test_ref.tar.gz", "r:gz") as tar:
    members = tar.getnames()
    print(f"📦 Arquivos dentro de data/ab_test_ref.tar.gz:")
    for name in members:
        print("—", name)
    # Ignora arquivos ocultos (ex: criados por sistemas macOS)
    safe_members = [m for m in members if not m.startswith("._")]
    tar.extractall(path="data", members=[m for m in tar if m.name in safe_members])
    print("✅ Extração concluída")

📦 Arquivos dentro de data/ab_test_ref.tar.gz:
— ._ab_test_ref.csv
— ab_test_ref.csv
✅ Extração concluída


In [6]:
# 6. Leitura com PySpark
# Acompanhando o carregamento
import time
start = time.time()
df_orders = spark.read.json("data/order.json.gz")
print(f"⏱️ Tempo para carregar orders: {round(time.time() - start, 2)}s")
start = time.time()
df_users = spark.read.option("header", True).csv("data/consumer.csv.gz")
print(f"⏱️ Tempo para carregar users: {round(time.time() - start, 2)}s")
start = time.time()
df_restaurants = spark.read.option("header", True).csv("data/restaurant.csv.gz")
print(f"⏱️ Tempo para carregar restaurants: {round(time.time() - start, 2)}s")
start = time.time()
df_abtest = spark.read.option("header", True).csv("data/ab_test_ref.csv")
print(f"⏱️ Tempo para carregar abtest: {round(time.time() - start, 2)}s")

⏱️ Tempo para carregar orders: 137.14s
⏱️ Tempo para carregar users: 1.57s
⏱️ Tempo para carregar restaurants: 0.79s
⏱️ Tempo para carregar abtest: 0.71s


In [7]:
# Exibe uma amostra de cada DataFrame
print("Amostra de Pedidos:"); df_orders.show(3)
print("Amostra de Usuários:"); df_users.show(3)
print("Amostra de Restaurantes:"); df_restaurants.show(3)
print("Amostra de Teste A/B:"); df_abtest.show(3)

Amostra de Pedidos:
+-----------+--------------------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+--------------------+--------------------+-----------------+------------------+-----------------+--------------------+--------------------+---------------+--------------------+------------------+---------------+
|        cpf|         customer_id|customer_name|delivery_address_city|delivery_address_country|delivery_address_district|delivery_address_external_id|delivery_address_latitude|delivery_address_longitude|delivery_address_state|delivery_address_zip_code|               items|         merchant_id|merchant_latitude|merchant_longitude|merchant_timezone|    order_created_at|            order_id|order_scheduled|order_scheduled_date|order_total_amount|origin_platform|
+-----------+--------------------+-------------+----

In [8]:
# 8. Pré-processamento básico
# Remove registros inválidos de pedidos sem cliente, ID de pedido ou valor total — essenciais para análise.
df_orders = df_orders.dropna(subset=["customer_id", "order_id", "order_total_amount"])
df_abtest = df_abtest.filter(df_abtest.is_target.isNotNull())


In [9]:
# 9. Join dos dados principais
df_joined = df_orders.join(df_abtest, on="customer_id", how="inner")

In [10]:
# 10. Exporta amostra para Pandas para análise inicial (reduzido para 50.000 registros)
# Reinicie o ambiente e reinstale numpy se encontrar erro de incompatibilidade (dtype size)
# !pip install --force-reinstall numpy==1.26.4 --no-cache-dir && os.kill(os.getpid(), 9)

try:
    start = time.time()
    sample_df = df_joined.limit(50000).toPandas()
    print(f"⏱️ Tempo para converter para Pandas: {round(time.time() - start, 2)}s")
except ValueError as e:
    print("❌ Erro ao converter para Pandas. Pode ser uma incompatibilidade do NumPy.\n", str(e))
    raise

os.makedirs("trusted_data", exist_ok=True)
sample_output_path = "trusted_data/sample_orders.parquet"
sample_df.to_parquet(sample_output_path, index=False)
print(f"✅ Arquivo de amostra salvo como {sample_output_path}")

⏱️ Tempo para converter para Pandas: 219.62s
✅ Arquivo de amostra salvo como trusted_data/sample_orders.parquet


In [15]:
# 11. Salvando os DataFrames completos como .parquet para análises finais
output_path_orders = "trusted_data/processed_orders.parquet"
df_joined.write.mode("overwrite").parquet(output_path_orders)
print(f"✅ Arquivo de pedidos salvo como {output_path_orders}")

output_path_users = "trusted_data/processed_users.parquet"
df_users.write.mode("overwrite").parquet(output_path_users)
print(f"✅ Arquivo de usuários salvo como {output_path_users}")

output_path_restaurants = "trusted_data/processed_restaurants.parquet"
df_restaurants.write.mode("overwrite").parquet(output_path_restaurants)
print(f"✅ Arquivo de restaurantes salvo como {output_path_restaurants}")

output_path_abtest = "trusted_data/processed_abtest.parquet"
df_abtest.write.mode("overwrite").parquet(output_path_abtest)
print(f"✅ Arquivo de teste A/B salvo como {output_path_abtest}")

✅ Arquivo de pedidos salvo como trusted_data/processed_orders.parquet
✅ Arquivo de usuários salvo como trusted_data/processed_users.parquet
✅ Arquivo de restaurantes salvo como trusted_data/processed_restaurants.parquet
✅ Arquivo de teste A/B salvo como trusted_data/processed_abtest.parquet


In [16]:
# 12. Explode os itens dos pedidos em uma tabela separada
from pyspark.sql.functions import explode, from_json, col
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType, IntegerType

item_schema = ArrayType(StructType([
    StructField("external_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_price", DoubleType(), True)
]))

orders_with_items = df_orders.withColumn("items_array", from_json(col("items"), item_schema))

df_item_orders = orders_with_items.select(
    "order_id",
    "customer_id",
    explode("items_array").alias("item")
)

df_item_orders = df_item_orders.select(
    "order_id",
    "customer_id",
    "item.name",
    "item.price",
    "item.quantity",
    "item.total_price",
    "item.external_id"
)

output_path_items = "trusted_data/processed_item_orders.parquet"
df_item_orders.write.mode("overwrite").parquet(output_path_items)
print(f"✅ Arquivo de itens de pedidos salvo como {output_path_items}")

✅ Arquivo de itens de pedidos salvo como trusted_data/processed_item_orders.parquet
