<a href="https://colab.research.google.com/github/murillo-borges/ifood-ab-campaign-case/blob/main/Ifood_case.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Preparação do ambiente e ETL

### Database Pedidos

In [None]:
# 1 - Instalação e configuração do ambiente
print("⚙️ Instalando Java e PySpark...")
!apt-get install openjdk-11-jdk -y
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
print("✅ Ambiente configurado!\n")

# 2 - Iniciando a SparkSession
print("🚀 Iniciando sessão Spark...")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("iFood Order Case").getOrCreate()
print("✅ SparkSession criada!\n")

# 3 - Baixando o arquivo order.json.gz
print("📥 Baixando o arquivo order.json.gz...")
import urllib.request
url = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/order.json.gz"
local_path = "order.json.gz"
urllib.request.urlretrieve(url, local_path)
print("✅ Download concluído!\n")

# 4 - Lendo o arquivo JSON compactado com PySpark
print("📊 Lendo arquivo JSON com PySpark...")
df_order = spark.read.json(local_path, multiLine=False)
print("✅ Dados carregados!\n")

# 5 - Exibindo as primeiras linhas do DataFrame
print("🔍 Primeiras linhas do DataFrame:")
df_order.show(5, truncate=False)

# 6 - Exibindo o schema do DataFrame
print("\n🧠 Estrutura do DataFrame:")
df_order.printSchema()

⚙️ Instalando Java e PySpark...
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre
  x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk
  openjdk-11-jre x11-utils
0 upgraded, 10 newly installed, 0 to remove and 35 not upgraded.
Need to get 6,920 kB of archives.
After this operation, 16.9 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB]
Get:3 http

In [None]:
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType, LongType

# Define the schema for the 'items' column
items_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True), # Changed to LongType based on data example
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True), # Changed to LongType based on data example
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ]), True), True),
    StructField("integrationId", StringType(), True),
    StructField("totalAddition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("totalDiscount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True)
]))

# Parse the JSON string in 'items' column and then explode it
df_exploded = df_order.withColumn("parsed_items", from_json(col("items"), items_schema)) \
                      .withColumn("item", explode("parsed_items"))

# Show the schema of the new DataFrame with exploded items
print("🧠 Estrutura do DataFrame com itens explodidos:")
df_exploded.printSchema()

# Show the first few rows of the new DataFrame
print("\n🔍 Primeiras linhas do DataFrame com itens explodidos:")
df_exploded.select("order_id", "item.*").show(5, truncate=False)

🧠 Estrutura do DataFrame com itens explodidos:
root
 |-- cpf: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- delivery_address_city: string (nullable = true)
 |-- delivery_address_country: string (nullable = true)
 |-- delivery_address_district: string (nullable = true)
 |-- delivery_address_external_id: string (nullable = true)
 |-- delivery_address_latitude: string (nullable = true)
 |-- delivery_address_longitude: string (nullable = true)
 |-- delivery_address_state: string (nullable = true)
 |-- delivery_address_zip_code: string (nullable = true)
 |-- items: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- merchant_latitude: string (nullable = true)
 |-- merchant_longitude: string (nullable = true)
 |-- merchant_timezone: string (nullable = true)
 |-- order_created_at: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_scheduled: boolean (nullable = true)
 |-- orde

In [None]:
df_order.show(1, truncate=False)

+-----------+----------------------------------------------------------------+-------------+---------------------+------------------------+-------------------------+----------------------------+-------------------------+--------------------------+----------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import *

# 1 - Definindo o schema do array de itens
item_schema = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("externalId", StringType()),
        StructField("quantity", DoubleType()),
        StructField("integrationId", StringType()),
        StructField("unitPrice", StructType([
            StructField("value", StringType()),
            StructField("currency", StringType())
        ])),
        StructField("totalValue", StructType([
            StructField("value", StringType()),
            StructField("currency", StringType())
        ]))
    ])
)

# 2 - Convertendo a string JSON para array de structs
df_order_fixed = df_order.withColumn("items_array", from_json(col("items"), item_schema))

# 3 - Aplicando explode na nova coluna
df_exploded = df_order_fixed.withColumn("item", explode("items_array"))

# 4 - Selecionando os campos desejados
df_items_detalhados = df_exploded.select(
    "order_id",
    "customer_id",
    "order_created_at",
    "order_total_amount",
    col("item.name").alias("item_name"),
    col("item.externalId").alias("item_id"),
    col("item.quantity").alias("item_quantity"),
    col("item.unitPrice.value").cast("float").alias("item_unit_price"),
    col("item.totalValue.value").cast("float").alias("item_total_price"),
    col("item.integrationId").alias("item_integration_id")
)

# 5 - Exibindo resultado
df_items_detalhados.show(5, truncate=False)

+----------------------------------------------------------------+----------------------------------------------------------------+------------------------+------------------+----------------------------------------+--------------------------------+-------------+---------------+----------------+-------------------+
|order_id                                                        |customer_id                                                     |order_created_at        |order_total_amount|item_name                               |item_id                         |item_quantity|item_unit_price|item_total_price|item_integration_id|
+----------------------------------------------------------------+----------------------------------------------------------------+------------------------+------------------+----------------------------------------+--------------------------------+-------------+---------------+----------------+-------------------+
|33e0612d62e5eb42aba15b58413137e441fbe906de2febd6

### Database Usuários

In [None]:
# 1 - Baixando o arquivo CSV compactado (.gz) da internet
print("📥 Baixando o arquivo consumer.csv.gz...")
import urllib.request

url = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/consumer.csv.gz"
local_path = "consumer.csv.gz"
urllib.request.urlretrieve(url, local_path)
print("✅ Download concluído!\n")

# 2 - Lendo o arquivo CSV com PySpark
print("📊 Lendo o arquivo CSV com PySpark...")
df_consumer = spark.read.option("header", True).option("inferSchema", True).csv(local_path)
print("✅ Arquivo carregado em um DataFrame Spark!\n")

# 3 - Exibindo as primeiras linhas do DataFrame
print("🔍 Primeiras linhas do dataset:")
df_consumer.show(5, truncate=False)

# 4 - Exibindo o schema do DataFrame
print("\n🧠 Estrutura geral do dataset:")
df_consumer.printSchema()

📥 Baixando o arquivo consumer.csv.gz...
✅ Download concluído!

📊 Lendo o arquivo CSV com PySpark...
✅ Arquivo carregado em um DataFrame Spark!

🔍 Primeiras linhas do dataset:
+----------------------------------------------------------------+--------+-----------------------+------+-------------+-------------------+---------------------+
|customer_id                                                     |language|created_at             |active|customer_name|customer_phone_area|customer_phone_number|
+----------------------------------------------------------------+--------+-----------------------+------+-------------+-------------------+---------------------+
|e8cc60860e09c0bb19610b06ced69c973eb83982cfc98e397ce65cba92f70928|pt-br   |2018-04-05 14:49:18.165|true  |NUNO         |46                 |816135924            |
|a2834a38a9876cf74e016524dd2e8c1f010ee12b2b684d58c40ab11eef19b6eb|pt-br   |2018-01-14 21:40:02.141|true  |ADRIELLY     |59                 |231330577            |
|41e105172

### Database Merchants - Restaurantes

In [None]:
# 1 - Baixando o arquivo CSV compactado (.gz) da internet
print("📥 Baixando o arquivo restaurant.csv.gz...")
import urllib.request

url = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/restaurant.csv.gz"
local_path = "restaurant.csv.gz"
urllib.request.urlretrieve(url, local_path)
print("✅ Download concluído!\n")

# 2 - Lendo o arquivo CSV com PySpark
print("📊 Lendo o arquivo CSV com PySpark...")
df_restaurant = spark.read.option("header", True).option("inferSchema", True).csv(local_path)
print("✅ Arquivo carregado em um DataFrame Spark!\n")

# 3 - Exibindo as primeiras linhas do DataFrame
print("🔍 Primeiras linhas do dataset:")
df_restaurant.show(5, truncate=False)

# 4 - Exibindo o schema do DataFrame
print("\n🧠 Estrutura geral do dataset:")
df_restaurant.printSchema()

📥 Baixando o arquivo restaurant.csv.gz...
✅ Download concluído!

📊 Lendo o arquivo CSV com PySpark...
✅ Arquivo carregado em um DataFrame Spark!

🔍 Primeiras linhas do dataset:
+----------------------------------------------------------------+-----------------------+-------+-----------+--------------+------------+-------------+-------------------+-----------------+--------------+--------------+----------------+
|id                                                              |created_at             |enabled|price_range|average_ticket|takeout_time|delivery_time|minimum_order_value|merchant_zip_code|merchant_city |merchant_state|merchant_country|
+----------------------------------------------------------------+-----------------------+-------+-----------+--------------+------------+-------------+-------------------+-----------------+--------------+--------------+----------------+
|d19ff6fca6288939bff073ad0a119d25c0365c407e9e5dd999e7a3e53c6d5d76|2017-01-23 12:52:30.91 |false  |3          

### Database Marcação de usuários que participaram do teste A/B

In [None]:
# 1 - Instalar Java e PySpark no Google Colab
!apt-get install openjdk-11-jdk -y
!pip install pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

# 2 - Iniciar SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("AB Test Case").getOrCreate()
print("✅ Spark iniciado!\n")

# 3 - Baixar o arquivo .tar.gz
import urllib.request
import tarfile

print("📥 Baixando o arquivo ab_test_ref.tar.gz...")
url = "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz"
local_tar_path = "ab_test_ref.tar.gz"
urllib.request.urlretrieve(url, local_tar_path)
print("✅ Download concluído!\n")

# 4 - Extrair o conteúdo
print("🗂 Extraindo arquivos...")
extract_path = "ab_test_ref_extracted"
os.makedirs(extract_path, exist_ok=True)
with tarfile.open(local_tar_path, "r:gz") as tar:
    tar.extractall(path=extract_path)
print("✅ Extração concluída!\n")

# 5 - Caminho direto para o arquivo extraído
csv_path = os.path.join(extract_path, "ab_test_ref.csv")
print(f"📄 Lendo arquivo: {csv_path}")

# 6 - Definir schema do arquivo
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("is_target", StringType(), True),
])

# 7 - Leitura do CSV com PySpark
df_ab_test = spark.read.option("header", True)\
                       .option("sep", ",")\
                       .schema(schema)\
                       .csv(csv_path)

# 8 - Visualizar os dados
print("🔍 Primeiras linhas do DataFrame:")
df_ab_test.show(5, truncate=False)

print("\n🧠 Estrutura do DataFrame:")
df_ab_test.printSchema()


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.27+6~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.
✅ Spark iniciado!

📥 Baixando o arquivo ab_test_ref.tar.gz...
✅ Download concluído!

🗂 Extraindo arquivos...
✅ Extração concluída!

📄 Lendo arquivo: ab_test_ref_extracted/ab_test_ref.csv
🔍 Primeiras linhas do DataFrame:
+----------------------------------------------------------------+---------+
|customer_id                                                     |is_target|
+----------------------------------------------------------------+---------+
|755e1fa18f25caec5edffb188b13fd844b2af8cf5adedcf77c028f36cb9382ea|target   |
|b821aa8372b8e5b82cdc283742757df8c45eecdd72adf411716e710525d4edf1|control  |
|d425d6ee4c9d4e211b71da8fc60bf6c5336b2ea9af9cc007f5297541ec40b63b|control  |
|6a7089eea0a5dc294fbccd4fa24d0d84a90c1cc12e829c8b535718bbc651ab02|target   |
|da

**Explanation of the fix:**

1.  **`items_schema`**: We define a `StructType` that matches the expected structure of each item within the `items` array. This includes nested `StructType` for fields like `addition`, `discount`, `unitPrice`, `totalValue`, `totalAddition`, and `totalDiscount`, and an `ArrayType` for `garnishItems`. I've used `StringType` for currency values and prices based on the example data in the output of cell `Bh7E-EBFhrMs`, and `LongType` for `sequence`.
2.  **`from_json(col("items"), items_schema)`**: This function parses the JSON string in the `items` column according to the `items_schema` we defined. The result is a new column named `parsed_items` which is of type `ArrayType(StructType(...))`.
3.  **`explode("parsed_items")`**: Now that `parsed_items` is an array type, we can successfully apply the `explode` function to it. This creates a new row for each item in the array.
4.  **`df_exploded.select("order_id", "item.*").show(5, truncate=False)`**: This line selects the original `order_id` and all the fields from the exploded `item` column to show the result of the explosion.

This corrected code will successfully extract the individual items from the `items` column and create a new row for each item, allowing you to analyze the items within each order.

## O Desafio – Análise do teste A/B (ETAPA 1)

✅ CÓDIGO PySpark: Análise A/B (impacto da campanha)


In [None]:
from pyspark.sql.functions import col, countDistinct, count, sum, avg, when
from pyspark.sql.window import Window


In [None]:
# 1 - Juntando pedidos com grupo de teste
df_ab_orders = df_order.join(df_ab_test, on="customer_id", how="inner")

In [None]:
# 2 - Calculando indicadores agregados por grupo (target vs control)
df_metrics = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    avg("order_total_amount").alias("ticket_medio"),
    sum("order_total_amount").alias("valor_total_pedidos")
)

In [None]:
# 3 - Calculando pedidos por usuário (frequência média)
df_metrics = df_metrics.withColumn(
    "pedidos_por_usuario",
    col("qtd_pedidos") / col("qtd_usuarios")
)

# 4 - Exibindo resultados
df_metrics.show(truncate=False)

+---------+------------+-----------+------------------+--------------------+-------------------+
|is_target|qtd_usuarios|qtd_pedidos|ticket_medio      |valor_total_pedidos |pedidos_por_usuario|
+---------+------------+-----------+------------------+--------------------+-------------------+
|control  |360542      |1525576    |47.897890947419995|7.30718728800012E7  |4.231340592774212  |
|target   |445924      |2136745    |47.73970213572941 |1.0200756984000914E8|4.79172459881056   |
+---------+------------+-----------+------------------+--------------------+-------------------+



### a) Definindo os indicadores relevantes para mensurar o sucesso da
campanha e analisar se ela teve impacto significativo dentro do
período avaliado:

In [None]:
from pyspark.sql.functions import col, countDistinct, count, sum, avg
import pandas as pd

# 1 - Juntando pedidos com grupo de teste
df_ab_orders = df_order.join(df_ab_test, on="customer_id", how="inner")

# 2 - Calculando indicadores agregados por grupo (target vs control)
df_metrics = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    avg("order_total_amount").alias("ticket_medio"),
    sum("order_total_amount").alias("valor_total_pedidos")
)

# 3 - Calculando pedidos por usuário
df_metrics = df_metrics.withColumn(
    "pedidos_por_usuario",
    col("qtd_pedidos") / col("qtd_usuarios")
)

# 4 - Convertendo para Pandas para aplicar a formatação personalizada
df_metrics_pd = df_metrics.toPandas()

# 5 - Formatando os valores para estilo brasileiro
df_metrics_formatado = pd.DataFrame()
#df_metrics_formatado["Grupo"] = df_metrics_pd["is_target"]

df_metrics_formatado["qtd_usuarios"] = df_metrics_pd["qtd_usuarios"].apply(lambda x: f"{x:,.0f}".replace(",", "."))
df_metrics_formatado["qtd_pedidos"] = df_metrics_pd["qtd_pedidos"].apply(lambda x: f"{x:,.0f}".replace(",", "."))
df_metrics_formatado["ticket_medio"] = df_metrics_pd["ticket_medio"].apply(lambda x: f"R$ {x:,.2f}".replace(",", "X").replace(".", ",").replace("X", "."))
df_metrics_formatado["valor_total_pedidos"] = df_metrics_pd["valor_total_pedidos"].apply(lambda x: f"R$ {x:,.2f}".replace(",", "X").replace(".", ",").replace("X", "."))
df_metrics_formatado["pedidos_por_usuario"] = df_metrics_pd["pedidos_por_usuario"].apply(lambda x: f"{x:.1f}".replace(".", ","))

# 6 - Exibindo resultado
print(df_metrics_formatado.to_string(index=False))

qtd_usuarios qtd_pedidos ticket_medio valor_total_pedidos pedidos_por_usuario
     360.542   1.525.576     R$ 47,90    R$ 73.071.872,88                 4,2
     445.924   2.136.745     R$ 47,74   R$ 102.007.569,84                 4,8


✅ 1. Join dos dados de pedidos com teste A/B e restaurantes


In [None]:
# Base principal de análise
df_ab_orders = df_order.join(df_ab_test, on="customer_id", how="inner")
df_ab_orders = df_ab_orders.join(df_restaurant, df_ab_orders["merchant_id"] == df_restaurant["id"], "inner")

📊 2. KPIs principais por grupo (usuários e pedidos)

In [None]:
from pyspark.sql.functions import col, countDistinct, count, sum, avg

df_metrics = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    sum("order_total_amount").alias("valor_total_pedidos"),
    avg("order_total_amount").alias("ticket_medio")
).withColumn(
    "pedidos_por_usuario", col("qtd_pedidos") / col("qtd_usuarios")
)

🔁 3. Retenção de usuários (usuários com >1 pedido)


In [None]:
from pyspark.sql.functions import when, round

df_freq = df_ab_orders.groupBy("is_target", "customer_id").agg(count("order_id").alias("qtd_pedidos"))

df_retencao = df_freq.groupBy("is_target").agg(
    count("*").alias("total_usuarios"),
    count(when(col("qtd_pedidos") > 1, True)).alias("usuarios_retidos")
).withColumn(
    "taxa_retencao", round((col("usuarios_retidos") / col("total_usuarios")) * 100, 2)
).select("is_target", "taxa_retencao")

📈 4. Incremento percentual do ticket médio


In [None]:
from pyspark.sql.functions import first

df_ticket_medio = df_ab_orders.groupBy("is_target").agg(round(avg("order_total_amount"), 2).alias("ticket_medio"))

df_incremento = df_ticket_medio.groupBy().pivot("is_target").agg(first("ticket_medio")).withColumn(
    "incremento_percentual", round(((col("target") - col("control")) / col("control")) * 100, 2)
).select("incremento_percentual")

🔝 5. Heavy users (usuários que estão no top 20% de pedidos)

In [None]:
df_user_freq = df_ab_orders.groupBy("customer_id").agg(count("order_id").alias("qtd_pedidos"))
percentil_80 = df_user_freq.approxQuantile("qtd_pedidos", [0.80], 0.01)[0]

df_user_freq = df_user_freq.withColumn(
    "heavy_user", when(col("qtd_pedidos") >= percentil_80, 1).otherwise(0)
)

df_heavy = df_user_freq.join(df_ab_test, on="customer_id", how="inner")

df_heavy_summary = df_heavy.groupBy("is_target").agg(
    count(when(col("heavy_user") == 1, True)).alias("qtd_heavy_users"),
    count("*").alias("total_usuarios")
).withColumn(
    "percentual_heavy_users", round((col("qtd_heavy_users") / col("total_usuarios")) * 100, 2)
).select("is_target", "percentual_heavy_users")

🕒 6. Tempo médio de entrega por grupo

In [None]:
df_delivery_time = df_ab_orders.groupBy("is_target").agg(
    round(avg("delivery_time"), 2).alias("tempo_medio_entrega")
)

🍽️ 7. Total de restaurantes por grupo (com base em pedidos)


In [None]:
df_qtd_rest = df_ab_orders.groupBy("is_target").agg(
    countDistinct("merchant_id").alias("qtd_restaurantes")
)

📦 8. Média de pedidos por restaurante

In [None]:
df_avg_pedidos_rest = df_ab_orders.groupBy("is_target", "merchant_id").agg(
    count("order_id").alias("qtd_pedidos_rest")
).groupBy("is_target").agg(
    round(avg("qtd_pedidos_rest"), 2).alias("media_pedidos_por_restaurante")
)

🧾 10. Juntando todos os KPIs em uma tabela final

In [77]:
# Juntando tudo com PySpark
df_final = df_metrics \
    .join(df_retencao, on="is_target", how="left") \
    .join(df_heavy_summary, on="is_target", how="left") \
    .join(df_delivery_time, on="is_target", how="left") \
    .join(df_qtd_rest, on="is_target", how="left") \
    .join(df_avg_pedidos_rest, on="is_target", how="left")

df_final.show(truncate=False)

+---------+------------+-----------+--------------------+------------------+-------------------+--------------+----------------+-------------+----------------------+-------------------+----------------+-----------------------------+
|is_target|qtd_usuarios|qtd_pedidos|valor_total_pedidos |ticket_medio      |pedidos_por_usuario|total_usuarios|usuarios_retidos|taxa_retencao|percentual_heavy_users|tempo_medio_entrega|qtd_restaurantes|media_pedidos_por_restaurante|
+---------+------------+-----------+--------------------+------------------+-------------------+--------------+----------------+-------------+----------------------+-------------------+----------------+-----------------------------+
|control  |360542      |1525576    |7.307187305451663E7 |47.897891061813134|4.231340592774212  |360542        |269341          |74.7         |21.53                 |22.63              |7196            |212.0                        |
|target   |445924      |2136745    |1.0200757006332143E8|47.73970224

#### 📊 Análise Comparativa — Grupo Controle vs Grupo Target

| Indicador                    | Controle  | Target    | Diferença | Interpretação                               |
| ---------------------------- | --------- | --------- | --------- | ------------------------------------------- |
| **Usuários (qtd\_usuarios)** | 360.542   | 445.924   | +85.382   | Target teve uma base maior.                 |
| **Pedidos (qtd\_pedidos)**   | 1.525.576 | 2.136.745 | +611.169  | Target pediu mais.                          |
| **Ticket médio**             | R\$ 47,90 | R\$ 47,74 | -R\$ 0,16 | Leve queda no valor por pedido no target.   |
| **Pedidos por usuário**      | 4,23      | 4,79      | +13,2%    | Mais engajamento no target.                 |
| **Retenção de usuários (%)** | 74,7%     | 79,51%    | +4,8 p.p. | A campanha reteve melhor os usuários.       |
| **Heavy Users (%)**          | 21,53%    | 25,87%    | +4,3 p.p. | Mais usuários muito ativos no target.       |
| **Tempo médio de entrega**   | 22,63 min | 22,66 min | ≈         | Não há impacto logístico relevante.         |
| **Pedidos por restaurante**  | 212,0     | 295,66    | +39,4%    | Restaurantes no grupo target venderam mais. |


**Análise de Engajamento e Produto:**

Observando as métricas de comportamento, a campanha foi um sucesso absoluto.


*   Frequência de Pedidos: Os usuários do grupo target fizeram, em média, 4,79 pedidos, um aumento de 13,2% em relação aos 4,23 do grupo control. Isso mostra que o incentivo funcionou para criar o hábito de pedir mais vezes.

*   Taxa de Retenção: A retenção do grupo target foi de 79,51%, quase 5 pontos percentuais maior que a do grupo control. Este é um ganho massivo e o indicador mais forte de que a campanha gerou lealdade.


*   "Heavy Users": O percentual de usuários de alta frequência subiu de 21,53% para 25,87%. A campanha foi eficaz em converter usuários e engajar a base.


Conclusão de Produto: Sem dúvida, a campanha foi um sucesso em engajar a base de usuários, aumentar a frequência e reter mais clientes.

### b) Análise da viabilidade financeira dessa iniciativa
como alavanca de crescimento

#### 💰 Análise da Viabilidade Financeira


**Premissas Adotadas:**

*   Custo da Campanha: R$ 10,00 por cada pedido realizado no grupo target.
*   Take Rate (Receita do iFood): Manteremos a premissa de 20% sobre o valor total dos pedidos (GMV).


**1 - Custo da Campanha**

In [82]:
from pyspark.sql.functions import col

# --- Premissas ---
# Ajuste o valor do custo do cupom conforme a sua necessidade.
CUSTO_CUPOM_POR_PEDIDO = 10.00

# --- Cálculo Direto ---
# O código filtra o DataFrame para o grupo 'target', conta o número de pedidos
# e multiplica pelo custo unitário do cupom para obter o valor total.
custo_total_campanha = df_ab_orders.filter(col("is_target") == "target").count() * CUSTO_CUPOM_POR_PEDIDO

# --- Resultado ---
print(f"Custo Total: R$ {custo_total_campanha:,.2f}")

Custo Total: R$ 21,367,450.00


**2 - Receita Incremental**

In [83]:
from pyspark.sql.functions import col, sum, countDistinct

# --- Premissas ---
# Ajuste o Take Rate (comissão do iFood estimada).
TAKE_RATE = 0.20

# --- 1. Agregação com PySpark ---
# Agrupamos os dados para obter os totais de usuários e GMV para cada grupo.
summary_df = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    sum("order_total_amount").alias("gmv_total")
)

# --- 2. Extração dos Dados Agregados ---
# Coletamos o pequeno DataFrame de resumo para o Python para facilitar os cálculos.
# O .collect() aqui é seguro e eficiente, pois o resultado tem apenas 2 linhas.
metrics = {row["is_target"]: row for row in summary_df.collect()}

control_metrics = metrics.get("control")
target_metrics = metrics.get("target")

# --- 3. Cálculo Financeiro ---
# Verificamos se ambos os grupos existem antes de calcular
if control_metrics and target_metrics:
    # Comportamento base: Qual foi o GMV por usuário no grupo de controle?
    gmv_por_usuario_control = control_metrics["gmv_total"] / control_metrics["qtd_usuarios"]

    # Projeção: Qual seria o GMV do grupo Target se eles fossem como o Controle?
    gmv_baseline_target = gmv_por_usuario_control * target_metrics["qtd_usuarios"]

    # Receita projetada (Baseline) vs. Receita Real
    receita_baseline_target = gmv_baseline_target * TAKE_RATE
    receita_real_target = target_metrics["gmv_total"] * TAKE_RATE

    # A Receita Incremental é a diferença entre o que aconteceu e o que teria acontecido.
    receita_incremental = receita_real_target - receita_baseline_target
else:
    receita_incremental = 0  # Define como 0 se um dos grupos não existir

# --- 4. Resultado ---
print("  CÁLCULO DA RECEITA INCREMENTAL DA CAMPANHA")
print(f"Take Rate (premissa): {TAKE_RATE:.0%}")
print("---------------------------------------------")
print(f"Receita Incremental Gerada: R$ {receita_incremental:,.2f}")

  CÁLCULO DA RECEITA INCREMENTAL DA CAMPANHA
Take Rate (premissa): 20%
---------------------------------------------
Receita Incremental Gerada: R$ 2,326,226.30


**3 - Resultado Líquido**

In [84]:
from pyspark.sql.functions import col, sum, count, countDistinct

# --- 1. Premissas da Análise ---
# Ajuste estes valores conforme sua necessidade.
TAKE_RATE = 0.20  # 20% de comissão para o iFood
CUSTO_CUPOM_POR_PEDIDO = 10.00  # R$ 10,00 por pedido no grupo Target

# --- 2. Agregação Única com PySpark ---
# Em uma única passagem, calculamos todos os totais necessários para ambos os grupos.
summary_df = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    sum("order_total_amount").alias("gmv_total")
)

# --- 3. Extração dos Dados para o Modelo Financeiro ---
# Coletamos o pequeno DataFrame de resumo para o Python para facilitar os cálculos.
metrics = {row["is_target"]: row for row in summary_df.collect()}

control_metrics = metrics.get("control")
target_metrics = metrics.get("target")

# --- 4. Cálculo dos Componentes e do Resultado Líquido ---
# Inicializamos as variáveis para evitar erros se um grupo não existir
receita_incremental = 0
custo_total_campanha = 0

if control_metrics and target_metrics:
    # --- Cálculo do Custo da Campanha ---
    custo_total_campanha = target_metrics["qtd_pedidos"] * CUSTO_CUPOM_POR_PEDIDO

    # --- Cálculo da Receita Incremental ---
    gmv_por_usuario_control = control_metrics["gmv_total"] / control_metrics["qtd_usuarios"]
    gmv_baseline_target = gmv_por_usuario_control * target_metrics["qtd_usuarios"]
    receita_baseline_target = gmv_baseline_target * TAKE_RATE
    receita_real_target = target_metrics["gmv_total"] * TAKE_RATE
    receita_incremental = receita_real_target - receita_baseline_target

# --- Cálculo Final ---
resultado_liquido = receita_incremental - custo_total_campanha


# --- 5. Exibição do Resultado ---
print("COMPONENTES DA ANÁLISE:")
print(f"  (+) Receita Incremental Gerada: R$ {receita_incremental:,.2f}")
print(f"  (-) Custo Total da Campanha:    R$ {custo_total_campanha:,.2f}")
print("--------------------------------------------------")
print(f"  📊 Resultado Líquido Final:    R$ {resultado_liquido:,.2f}")

COMPONENTES DA ANÁLISE:
  (+) Receita Incremental Gerada: R$ 2,326,226.30
  (-) Custo Total da Campanha:    R$ 21,367,450.00
--------------------------------------------------
  📊 Resultado Líquido Final:    R$ -19,041,223.70


**4 - ROI: Retorno sobre Investimento**

In [85]:
from pyspark.sql.functions import col, sum, count, countDistinct

# --- 1. Premissas da Análise ---
# Ajuste estes valores conforme sua necessidade.
TAKE_RATE = 0.20  # 20% de comissão para o iFood
CUSTO_CUPOM_POR_PEDIDO = 10.00  # R$ 10,00 por pedido no grupo Target

# --- 2. Agregação Única com PySpark ---
# Em uma única passagem, calculamos todos os totais necessários para ambos os grupos.
summary_df = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    sum("order_total_amount").alias("gmv_total")
)

# --- 3. Extração dos Dados para o Modelo Financeiro ---
# Coletamos o pequeno DataFrame de resumo para o Python para facilitar os cálculos.
metrics = {row["is_target"]: row for row in summary_df.collect()}

control_metrics = metrics.get("control")
target_metrics = metrics.get("target")

# --- 4. Cálculo dos Componentes Financeiros ---
# Inicializamos as variáveis para evitar erros se um grupo não existir
receita_incremental = 0
custo_total_campanha = 0
resultado_liquido = 0
roi = 0

if control_metrics and target_metrics:
    # --- Custo da Campanha ---
    custo_total_campanha = target_metrics["qtd_pedidos"] * CUSTO_CUPOM_POR_PEDIDO

    # --- Receita Incremental ---
    gmv_por_usuario_control = control_metrics["gmv_total"] / control_metrics["qtd_usuarios"]
    gmv_baseline_target = gmv_por_usuario_control * target_metrics["qtd_usuarios"]
    receita_baseline_target = gmv_baseline_target * TAKE_RATE
    receita_real_target = target_metrics["gmv_total"] * TAKE_RATE
    receita_incremental = receita_real_target - receita_baseline_target

    # --- Resultado Líquido ---
    resultado_liquido = receita_incremental - custo_total_campanha

    # --- Cálculo Final do ROI ---
    # Verificamos se o custo é maior que zero para evitar divisão por zero
    if custo_total_campanha > 0:
        roi = resultado_liquido / custo_total_campanha

# --- 5. Exibição do Relatório Financeiro Completo ---
print("PREMISSAS ADOTADAS:")
print(f"  - Take Rate (Comissão iFood): {TAKE_RATE:.0%}")
print(f"  - Custo do Cupom por Pedido (Target): R$ {CUSTO_CUPOM_POR_PEDIDO:.2f}\n")

print("ANÁLISE DE RESULTADOS:")
print(f"  - Receita Incremental Gerada:       R$ {receita_incremental:,.2f}")
print(f"  - Custo Total da Campanha:          R$ {custo_total_campanha:,.2f}")
print("-------------------------------------------------------")
print(f"  📊 Resultado Líquido Final:          R$ {resultado_liquido:,.2f}")
print("-------------------------------------------------------")
print(f"  📈 ROI (Retorno sobre Investimento): {roi:.2%}")
print("="*55)

PREMISSAS ADOTADAS:
  - Take Rate (Comissão iFood): 20%
  - Custo do Cupom por Pedido (Target): R$ 10.00

ANÁLISE DE RESULTADOS:
  - Receita Incremental Gerada:       R$ 2,326,226.30
  - Custo Total da Campanha:          R$ 21,367,450.00
-------------------------------------------------------
  📊 Resultado Líquido Final:          R$ -19,041,223.70
-------------------------------------------------------
  📈 ROI (Retorno sobre Investimento): -89.11%


**📉 Conclusão: A Campanha Teve Retorno?**


Não. A campanha **não teve retorno financeiro positivo.**

**Apesar de gerar:**

*   Maior engajamento dos usuários (mais pedidos por usuário)
*   Maior taxa de retenção
*   Mais heavy users
*   Aumento nas vendas dos restaurantes




👉 O custo da campanha (desconto) foi quase 10x maior que a receita adicional gerada, gerando **prejuízo** de mais de R$ 19 milhões e um ROI negativo de -89,11%.


💡 Insight
A estratégia melhorou comportamento e engajamento, mas não foi eficiente financeiramente.

Se o cupom fosse menor (ex: R$5) ou vinculado a pedidos acima de um valor mínimo de R\$ 60.00, poderia manter os efeitos positivos com menor custo.

### c) Recomendações de oportunidades de melhoria nessa ação e nova proposta de teste A/B para validar essas hipóte-ses.

### 📊 **Resumo da campanha atual**

A campanha com cupom aumentou o engajamento:

* Retenção subiu para **79,5%** (vs 74,7%)
* Mais **pedidos por usuário** (4,79 vs 4,23)
* Mais **heavy users** (25,8% vs 21,5%)

❌ **Problema**: o custo foi alto demais
💸 **Resultado líquido**: –R\$ 19 milhões
📉 **ROI**: –89.11%

---

## 🚀 **Oportunidades de melhoria**

1. **Focar em usuários inativos ou novos**, não em todos.
2. **Reduzir o valor do cupom** ou exigir **valor mínimo de compra**.
3. **Estimular o aumento do ticket médio** com frete grátis.

---

## 🧪 **Nova proposta de teste A/B**

| Grupo    | Estratégia                          |
| -------- | ----------------------------------- |
| Controle | Sem cupom                           |
| Teste A  | R\$10 só para usuários inativos  |
| Teste B  | R\$5 de desconto em pedidos > R\$40     |
| Teste C  | Frete Grátis para pedidos acima de R$ 65,00         |

**Métricas para analisar**: Receita incremental, Resultado Líquido, ROI, Retenção, Reativação, Ticket médio.


## O Desafio – Análise por Segmentação (ETAPA 2)

### a) Definindo as segmentações:

In [97]:
df_ab_orders

DataFrame[customer_id: string, cpf: string, customer_name: string, delivery_address_city: string, delivery_address_country: string, delivery_address_district: string, delivery_address_external_id: string, delivery_address_latitude: string, delivery_address_longitude: string, delivery_address_state: string, delivery_address_zip_code: string, items: string, merchant_id: string, merchant_latitude: string, merchant_longitude: string, merchant_timezone: string, order_created_at: string, order_id: string, order_scheduled: boolean, order_scheduled_date: string, order_total_amount: float, origin_platform: string, is_target: string]

In [96]:
from pyspark.sql import SparkSession

# Cria ou reinicia a sessão Spark
spark = SparkSession.builder \
    .appName("SegmentacaoUsuarios") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .getOrCreate()

In [98]:
from pyspark.sql.functions import col, count, sum, avg, when, round

# 1. Calcular métrica por usuário
df_user_metrics = df_ab_orders.groupBy("customer_id", "is_target").agg(
    count("order_id").alias("qtd_pedidos"),
    round(sum("order_total_amount") / count("order_id"), 2).alias("ticket_medio")
)

# 2. Calcular tercis
qtd_tercis = df_user_metrics.approxQuantile("qtd_pedidos", [0.33, 0.66], 0.01)
ticket_tercis = df_user_metrics.approxQuantile("ticket_medio", [0.33, 0.66], 0.01)

# 3. Segmentar por frequência
df_user_metrics = df_user_metrics.withColumn(
    "segmento_frequencia",
    when(col("qtd_pedidos") <= qtd_tercis[0], "Baixa frequência")
    .when(col("qtd_pedidos") <= qtd_tercis[1], "Média frequência")
    .otherwise("Alta frequência")
)

# 4. Segmentar por ticket médio
df_user_metrics = df_user_metrics.withColumn(
    "segmento_ticket",
    when(col("ticket_medio") <= ticket_tercis[0], "Ticket baixo")
    .when(col("ticket_medio") <= ticket_tercis[1], "Ticket médio")
    .otherwise("Ticket alto")
)

# 5. Contar usuários por grupo e segmentos
df_segmentos_final = df_user_metrics.groupBy("is_target", "segmento_frequencia", "segmento_ticket").agg(
    count("customer_id").alias("qtd_usuarios")
).orderBy("is_target", "segmento_frequencia", "segmento_ticket")

df_segmentos_final.show(truncate=False)

Py4JJavaError: An error occurred while calling o2440685.approxQuantile.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2707)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.$anonfun$apply$1(CoalesceShufflePartitions.scala:61)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:58)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:34)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$optimizeQueryStage$2(AdaptiveSparkPlanExec.scala:169)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.optimizeQueryStage(AdaptiveSparkPlanExec.scala:168)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:588)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:538)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:207)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:206)
	at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3849)
	at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3847)
	at org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:100)
	at org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:104)
	at org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:115)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [86]:
from pyspark.sql.functions import col, when, count, avg

# 1 - Agrupar usuários com seus KPIs individuais
df_user_stats = df_ab_orders.groupBy("customer_id", "is_target").agg(
    count("order_id").alias("qtd_pedidos"),
    avg("order_total_amount").alias("ticket_medio_usuario")
)

# 2 - Criar segmentos por frequência
df_user_segmentado = df_user_stats.withColumn(
    "segmento_frequencia",
    when(col("qtd_pedidos") <= 2, "Baixa frequência")
    .when((col("qtd_pedidos") >= 3) & (col("qtd_pedidos") <= 5), "Média frequência")
    .otherwise("Alta frequência")
)

# 3 - Criar segmentos por ticket médio
df_user_segmentado = df_user_segmentado.withColumn(
    "segmento_ticket",
    when(col("ticket_medio_usuario") <= 35, "Ticket baixo")
    .when((col("ticket_medio_usuario") > 35) & (col("ticket_medio_usuario") <= 60), "Ticket médio")
    .otherwise("Ticket alto")
)

# 4 - Quantidade de usuários por grupo e segmento
df_resultado_segmentado = df_user_segmentado.groupBy("is_target", "segmento_frequencia", "segmento_ticket").agg(
    count("customer_id").alias("qtd_usuarios")
).orderBy("is_target", "segmento_frequencia", "segmento_ticket")

# Mostrar resultado
df_resultado_segmentado.show(truncate=False)


+---------+-------------------+---------------+------------+
|is_target|segmento_frequencia|segmento_ticket|qtd_usuarios|
+---------+-------------------+---------------+------------+
|control  |Alta frequência    |Ticket alto    |16599       |
|control  |Alta frequência    |Ticket baixo   |25757       |
|control  |Alta frequência    |Ticket médio   |35265       |
|control  |Baixa frequência   |Ticket alto    |43860       |
|control  |Baixa frequência   |Ticket baixo   |84266       |
|control  |Baixa frequência   |Ticket médio   |78443       |
|control  |Média frequência   |Ticket alto    |16410       |
|control  |Média frequência   |Ticket baixo   |26954       |
|control  |Média frequência   |Ticket médio   |32988       |
|target   |Alta frequência    |Ticket alto    |24703       |
|target   |Alta frequência    |Ticket baixo   |38549       |
|target   |Alta frequência    |Ticket médio   |52090       |
|target   |Baixa frequência   |Ticket alto    |45679       |
|target   |Baixa frequên

In [101]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, when, percent_rank, round as spark_round
from pyspark.sql.window import Window

# ==============================================================================
# 1. SETUP E SIMULAÇÃO DE DADOS (SUBSTITUA PELO SEU DATAFRAME)
# ==============================================================================
spark = SparkSession.builder.appName("SegmentABTest_NoDate").getOrCreate()

# ==============================================================================
# 2. CRIAÇÃO DA BASE DE ANÁLISE POR USUÁRIO (FEATURE STORE)
# ==============================================================================
# (Item b) - Parte 1: Agregamos os dados para ter uma visão única por cliente.
print("--- Passo 1: Criando a base de análise por usuário... ---")
df_user_features = df_ab_orders.groupBy("customer_id", "is_target").agg(
    count("order_id").alias("total_pedidos"),
    avg("order_total_amount").alias("ticket_medio_usuario"),
    sum("order_total_amount").alias("gasto_total_usuario")
)

# ==============================================================================
# 3. APLICAÇÃO DOS CRITÉRIOS DE SEGMENTAÇÃO
# ==============================================================================
# (Item a e b) - Parte 2: Adicionamos as "etiquetas" de segmento a cada usuário.
print("--- Passo 2: Aplicando os critérios de segmentação... ---")

# --- Segmentação por Frequência ---
window_freq = Window.partitionBy("is_target").orderBy("total_pedidos")
df_user_features = df_user_features.withColumn("rank_freq", percent_rank().over(window_freq))
df_user_features = df_user_features.withColumn("segmento_frequencia",
    when(col("rank_freq") >= 0.8, "1. Heavy User (Top 20%)")
    .when((col("rank_freq") >= 0.3) & (col("rank_freq") < 0.8), "2. Casual (30-80%)")
    .otherwise("3. Leve (Bottom 30%)")
)

# --- Segmentação por Valor (Ticket Médio) ---
window_valor = Window.partitionBy("is_target").orderBy("ticket_medio_usuario")
df_user_features = df_user_features.withColumn("rank_valor", percent_rank().over(window_valor))
df_user_features = df_user_features.withColumn("segmento_valor",
    when(col("rank_valor") >= 0.5, "1. Alto Valor")
    .otherwise("2. Baixo Valor")
)

print("\n--- DataFrame de Usuários com Segmentos (Amostra) ---")
df_user_features.select(
    "customer_id", "is_target", "total_pedidos", "ticket_medio_usuario",
    "segmento_frequencia", "segmento_valor"
).show(10, truncate=False)


# ==============================================================================
# 4. ANÁLISE CRUZADA FINAL: KPIs POR SEGMENTO
# ==============================================================================
# (Item c) - Agrupamos por grupo de teste E por segmento para gerar os relatórios.
print("--- Passo 3: Gerando as análises cruzadas por segmento... ---")


print("\n\n" + "="*70)
print("          (c) ANÁLISE DE RESULTADOS POR SEGMENTO DE FREQUÊNCIA")
print("="*70)
df_analise_frequencia = df_user_features.groupBy("is_target", "segmento_frequencia").agg(
    count("customer_id").alias("qtd_usuarios"),
    spark_round(avg("total_pedidos"), 2).alias("pedidos_por_usuario"),
    spark_round(avg("ticket_medio_usuario"), 2).alias("ticket_medio_segmento")
).orderBy("segmento_frequencia", "is_target")
df_analise_frequencia.show(truncate=False)


print("\n" + "="*70)
print("          (c) ANÁLISE DE RESULTADOS POR SEGMENTO DE VALOR (TICKET MÉDIO)")
print("="*70)
df_analise_valor = df_user_features.groupBy("is_target", "segmento_valor").agg(
    count("customer_id").alias("qtd_usuarios"),
    spark_round(avg("total_pedidos"), 2).alias("pedidos_por_usuario"),
    spark_round(avg("ticket_medio_usuario"), 2).alias("ticket_medio_segmento")
).orderBy("segmento_valor", "is_target")
df_analise_valor.show(truncate=False)


--- Passo 1: Criando a base de análise por usuário... ---
--- Passo 2: Aplicando os critérios de segmentação... ---

--- DataFrame de Usuários com Segmentos (Amostra) ---


Py4JJavaError: An error occurred while calling o2455895.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2707)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.$anonfun$apply$1(CoalesceShufflePartitions.scala:61)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:58)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:34)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$optimizeQueryStage$2(AdaptiveSparkPlanExec.scala:169)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.optimizeQueryStage(AdaptiveSparkPlanExec.scala:168)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:588)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:538)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:534)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


## O Desafio – Próximos Passos Ifood (ETAPA 3)

## Gráficos para o relatório final

## Testes (Apagar)

In [None]:
from pyspark.sql.functions import col, to_date

df_order_distinct = df_order.select(to_date(col("order_created_at")).alias("order_date")).distinct().orderBy("order_date")
df_order_distinct.show()

+----------+
|order_date|
+----------+
|2018-12-03|
|2018-12-04|
|2018-12-05|
|2018-12-06|
|2018-12-07|
|2018-12-08|
|2018-12-09|
|2018-12-10|
|2018-12-11|
|2018-12-12|
|2018-12-13|
|2018-12-14|
|2018-12-15|
|2018-12-16|
|2018-12-17|
|2018-12-18|
|2018-12-19|
|2018-12-20|
|2018-12-21|
|2018-12-22|
+----------+
only showing top 20 rows



In [None]:
df_order_distinct.count()

60

In [None]:
from pyspark.sql.functions import col, countDistinct, count, when, round, avg, first

# 1 - Calculando taxa de ativação (% usuários que fizeram ao menos 1 pedido)
df_ativacao = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos")
).withColumn(
    "taxa_ativacao", round((col("qtd_pedidos") / col("qtd_usuarios")) * 100, 2)
)

# 2 - Calculando taxa de retenção (% usuários com mais de 1 pedido)
df_pedidos_usuario = df_ab_orders.groupBy("is_target", "customer_id").agg(
    count("order_id").alias("qtd_pedidos")
)

df_retencao = df_pedidos_usuario.groupBy("is_target").agg(
    count("customer_id").alias("total_usuarios"),
    count(when(col("qtd_pedidos") > 1, True)).alias("usuarios_retidos")
).withColumn(
    "taxa_retencao", round((col("usuarios_retidos") / col("total_usuarios")) * 100, 2)
)

# 3 - Calculando incremento percentual no ticket médio
df_ticket_medio = df_ab_orders.groupBy("is_target").agg(
    round(avg("order_total_amount"), 2).alias("ticket_medio")
)

df_incremento = df_ticket_medio.groupBy().pivot("is_target").agg(first("ticket_medio")).withColumn(
    "incremento_percentual", round(((col("target") - col("control")) / col("control")) * 100, 2)
)

# Exibindo os resultados
df_ativacao.show()
df_retencao.show()
df_incremento.show()

+---------+------------+-----------+-------------+
|is_target|qtd_usuarios|qtd_pedidos|taxa_ativacao|
+---------+------------+-----------+-------------+
|  control|      360542|    1525576|       423.13|
|   target|      445924|    2136745|       479.17|
+---------+------------+-----------+-------------+

+---------+--------------+----------------+-------------+
|is_target|total_usuarios|usuarios_retidos|taxa_retencao|
+---------+--------------+----------------+-------------+
|  control|        360542|          269341|         74.7|
|   target|        445924|          354538|        79.51|
+---------+--------------+----------------+-------------+

+-------+------+---------------------+
|control|target|incremento_percentual|
+-------+------+---------------------+
|   47.9| 47.74|                -0.33|
+-------+------+---------------------+



In [None]:
from pyspark.sql.functions import col, countDistinct, count, sum, avg, when, round, first
import pandas as pd

# 1 - Juntando pedidos com grupo de teste
df_ab_orders = df_order.join(df_ab_test, on="customer_id", how="inner")

# 2 - Calculando métricas principais
df_metrics = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    avg("order_total_amount").alias("ticket_medio"),
    sum("order_total_amount").alias("valor_total_pedidos")
).withColumn(
    "pedidos_por_usuario",
    col("qtd_pedidos") / col("qtd_usuarios")
)

# 3 - Taxa de Retenção
df_pedidos_usuario = df_ab_orders.groupBy("is_target", "customer_id").agg(
    count("order_id").alias("qtd_pedidos")
)

df_retencao = df_pedidos_usuario.groupBy("is_target").agg(
    count("customer_id").alias("total_usuarios"),
    count(when(col("qtd_pedidos") > 1, True)).alias("usuarios_retidos")
).withColumn(
    "taxa_retencao", round((col("usuarios_retidos") / col("total_usuarios")) * 100, 2)
)

# 4 - Incremento Percentual no Ticket Médio
df_ticket_medio = df_ab_orders.groupBy("is_target").agg(
    round(avg("order_total_amount"), 2).alias("ticket_medio")
)

df_incremento = df_ticket_medio.groupBy().pivot("is_target").agg(first("ticket_medio")).withColumn(
    "incremento_percentual", round(((col("target") - col("control")) / col("control")) * 100, 2)
)

# 5 - Convertendo para Pandas
df_metrics_pd = df_metrics.toPandas()
df_retencao_pd = df_retencao.select("is_target", "taxa_retencao").toPandas()
df_incremento_pd = df_incremento.select("incremento_percentual").toPandas()

# 6 - Mesclando tudo
df_final = df_metrics_pd.merge(df_retencao_pd, on="is_target", how="left")

# 7 - Formatando os valores
df_formatado = pd.DataFrame()
#df_formatado["Grupo"] = df_final["is_target"]
df_formatado["qtd_usuarios"] = df_final["qtd_usuarios"].apply(lambda x: f"{x:,.0f}".replace(",", "."))
df_formatado["qtd_pedidos"] = df_final["qtd_pedidos"].apply(lambda x: f"{x:,.0f}".replace(",", "."))
df_formatado["ticket_medio"] = df_final["ticket_medio"].apply(lambda x: f"R$ {x:,.2f}".replace(",", "X").replace(".", ",").replace("X", "."))
df_formatado["valor_total_pedidos"] = df_final["valor_total_pedidos"].apply(lambda x: f"R$ {x:,.2f}".replace(",", "X").replace(".", ",").replace("X", "."))
df_formatado["pedidos_por_usuario"] = df_final["pedidos_por_usuario"].apply(lambda x: f"{x:.1f}".replace(".", ","))
df_formatado["taxa_retencao"] = df_final["taxa_retencao"].apply(lambda x: f"{x:.2f}%".replace(".", ","))
df_formatado["incremento_percentual"] = df_incremento_pd["incremento_percentual"].apply(lambda x: f"{x:.2f}%".replace(".", ","))

# 8 - Exibindo resultado
print(df_formatado.to_string(index=False))

qtd_usuarios qtd_pedidos ticket_medio valor_total_pedidos pedidos_por_usuario taxa_retencao incremento_percentual
     360.542   1.525.576     R$ 47,90    R$ 73.071.872,88                 4,2        74,70%                -0,33%
     445.924   2.136.745     R$ 47,74   R$ 102.007.569,84                 4,8        79,51%                   NaN


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg

# PASSO 1: Contar o número de pedidos por cliente
# Agrupamos pelo identificador do cliente e contamos quantos pedidos ele tem.
print("\nPASSO 1: Agregando para contar pedidos por cliente...")
df_user_counts = df_ab_orders.groupBy("customer_id").agg(
    count("order_id").alias("order_count")
)
df_user_counts.show(5)


PASSO 1: Agregando para contar pedidos por cliente...
+--------------------+-----------+
|         customer_id|order_count|
+--------------------+-----------+
|0001226e517517758...|          4|
|00021cd56b6d6c980...|          5|
|00021f6dc15d10418...|          3|
|00022b8c0c7af061f...|          6|
|00029b26fb2121119...|          1|
+--------------------+-----------+
only showing top 5 rows



In [None]:
# PASSO 2: Calcular os percentis para definir o que é um "Heavy User"
# Usamos o DataFrame agregado do passo anterior para esta análise.
print("\nPASSO 2: Calculando os percentis sobre a contagem de pedidos...")
percentiles_to_check = [0.80, 0.90, 0.95]  # Top 20%, 10% e 5%
# O terceiro argumento (0.01) é a margem de erro permitida, para performance. 0.0 para exatidão.
quantiles = df_user_counts.approxQuantile("order_count", percentiles_to_check, 0.01)

p80_threshold = quantiles[0]
p90_threshold = quantiles[1]
p95_threshold = quantiles[2]

print(f"\nO limite para estar no TOP 20% (percentil 80) é: {p80_threshold} pedidos.")
print(f"O limite para estar no TOP 10% (percentil 90) é: {p90_threshold} pedidos.")
print(f"O limite para estar no TOP 5% (percentil 95) é: {p95_threshold} pedidos.")


PASSO 2: Calculando os percentis sobre a contagem de pedidos...

O limite para estar no TOP 20% (percentil 80) é: 6.0 pedidos.
O limite para estar no TOP 10% (percentil 90) é: 10.0 pedidos.
O limite para estar no TOP 5% (percentil 95) é: 14.0 pedidos.


In [None]:
# PASSO 3: Escolher um limite, filtrar os Heavy Users e calcular a nova média
# Vamos usar o Top 20% (percentil 80) como nosso critério para Heavy User.
# Sinta-se à vontade para trocar p80_threshold por p90_threshold ou outro valor.
heavy_user_threshold = p80_threshold

print(f"\nDefinindo 'Heavy Users' como clientes com mais de {heavy_user_threshold} pedidos...")

df_heavy_users = df_user_counts.filter(col("order_count") > heavy_user_threshold)

# Finalmente, calculamos a média de pedidos apenas para este grupo filtrado.
heavy_user_avg_df = df_heavy_users.agg(
    avg("order_count").alias("media_pedidos_heavy_users")
)


print("\n--- RESULTADO FINAL ---")
print("Média de pedidos para o grupo de Heavy Users:")
heavy_user_avg_df.show()


Definindo 'Heavy Users' como clientes com mais de 6.0 pedidos...

--- RESULTADO FINAL ---
Média de pedidos para o grupo de Heavy Users:
+-------------------------+
|media_pedidos_heavy_users|
+-------------------------+
|       13.258736224903803|
+-------------------------+



In [None]:
from pyspark.sql.functions import col, count, when, lit

# 1 - Calculando a quantidade de pedidos por usuário
df_pedidos_usuario = df_ab_orders.groupBy("customer_id").agg(
    count("order_id").alias("qtd_pedidos")
)

# 2 - Calculando o valor do percentil 80 (limiar dos heavy users)
percentil_80 = df_pedidos_usuario.approxQuantile("qtd_pedidos", [0.80], 0.01)[0]

# 3 - Classificando os usuários como heavy users (acima ou igual ao percentil 80)
df_pedidos_usuario = df_pedidos_usuario.withColumn(
    "heavy_user", when(col("qtd_pedidos") >= percentil_80, lit(1)).otherwise(lit(0))
)

# 4 - Juntando com a classificação A/B
df_heavy = df_pedidos_usuario.join(df_ab_test, on="customer_id", how="inner")

# 5 - Contando heavy users por grupo (target e control)
df_heavy_count = df_heavy.groupBy("is_target").agg(
    count(when(col("heavy_user") == 1, True)).alias("qtd_heavy_users"),
    count("*").alias("total_usuarios")
).withColumn(
    "percentual_heavy_users", round((col("qtd_heavy_users") / col("total_usuarios")) * 100, 2)
)

# 6 - Convertendo para Pandas
df_heavy_pd = df_heavy_count.toPandas()

# 7 - Formatando resultados
df_heavy_pd["qtd_heavy_users"] = df_heavy_pd["qtd_heavy_users"].apply(lambda x: f"{x:,}".replace(",", "."))
df_heavy_pd["total_usuarios"] = df_heavy_pd["total_usuarios"].apply(lambda x: f"{x:,}".replace(",", "."))
df_heavy_pd["percentual_heavy_users"] = df_heavy_pd["percentual_heavy_users"].apply(lambda x: f"{x:.2f}%".replace(".", ","))

# 8 - Exibindo
print("🔝 Percentil 80 (limiar de pedidos):", percentil_80)
print(df_heavy_pd.to_string(index=False))

🔝 Percentil 80 (limiar de pedidos): 6.0
is_target qtd_heavy_users total_usuarios percentual_heavy_users
  control          77.621        360.542                 21,53%
   target         115.342        445.924                 25,87%


In [None]:
from pyspark.sql.functions import col, count, when, lit, avg, round

# 1 - Calculando a quantidade de pedidos por usuário
df_pedidos_usuario = df_ab_orders.groupBy("customer_id").agg(
    count("order_id").alias("qtd_pedidos")
)

# 2 - Calculando o valor do percentil 80 (limiar dos heavy users)
percentil_80 = df_pedidos_usuario.approxQuantile("qtd_pedidos", [0.80], 0.01)[0]

# 3 - Classificando os usuários como heavy users (acima ou igual ao percentil 80)
df_pedidos_usuario = df_pedidos_usuario.withColumn(
    "heavy_user", when(col("qtd_pedidos") >= percentil_80, lit(1)).otherwise(lit(0))
)

# 4 - Juntando com a classificação A/B
df_heavy = df_pedidos_usuario.join(df_ab_test, on="customer_id", how="inner")

# 5 - Contando heavy users por grupo (target e control)
df_heavy_count = df_heavy.groupBy("is_target").agg(
    count(when(col("heavy_user") == 1, True)).alias("qtd_heavy_users"),
    count("*").alias("total_usuarios")
).withColumn(
    "percentual_heavy_users", round((col("qtd_heavy_users") / col("total_usuarios")) * 100, 2)
)

# 6 - Calculando média de pedidos dos heavy users
media_pedidos_heavy_users = df_heavy.filter(col("heavy_user") == 1).agg(
    round(avg("qtd_pedidos"), 2).alias("media_pedidos_heavy_users")
).collect()[0]["media_pedidos_heavy_users"]

# 7 - Convertendo para Pandas
df_heavy_pd = df_heavy_count.toPandas()

# 8 - Formatando resultados
df_heavy_pd["qtd_heavy_users"] = df_heavy_pd["qtd_heavy_users"].apply(lambda x: f"{x:,}".replace(",", "."))
df_heavy_pd["total_usuarios"] = df_heavy_pd["total_usuarios"].apply(lambda x: f"{x:,}".replace(",", "."))
df_heavy_pd["percentual_heavy_users"] = df_heavy_pd["percentual_heavy_users"].apply(lambda x: f"{x:.2f}%".replace(".", ","))

# 9 - Exibindo
print(f"📊 Média de pedidos dos heavy users: {media_pedidos_heavy_users}")
print(f"🔝 Limite para estar no TOP 20% (percentil 80): {percentil_80} pedidos")
print(df_heavy_pd.to_string(index=False))

📊 Média de pedidos dos heavy users: 11.75
🔝 Limite para estar no TOP 20% (percentil 80): 6.0 pedidos
is_target qtd_heavy_users total_usuarios percentual_heavy_users
  control          77.621        360.542                 21,53%
   target         115.342        445.924                 25,87%


In [None]:
df_restaurant.show(3)

+--------------------+--------------------+-------+-----------+--------------+------------+-------------+-------------------+-----------------+--------------+--------------+----------------+
|                  id|          created_at|enabled|price_range|average_ticket|takeout_time|delivery_time|minimum_order_value|merchant_zip_code| merchant_city|merchant_state|merchant_country|
+--------------------+--------------------+-------+-----------+--------------+------------+-------------+-------------------+-----------------+--------------+--------------+----------------+
|d19ff6fca6288939b...|2017-01-23 12:52:...|  false|          3|          60.0|           0|           50|               30.0|            14025|RIBEIRAO PRETO|            SP|              BR|
|631df0985fdbbaf27...|2017-01-20 13:14:...|   true|          3|          60.0|           0|            0|               30.0|            50180|     SAO PAULO|            SP|              BR|
|135c5c4ae4c1ec1fd...|2017-01-23 12:46:...|  

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, count, sum, avg, when, round, first, lit
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("iFood_AB_KPIs").getOrCreate()

# Tipagem
df_order = df_order.withColumn("order_total_amount", col("order_total_amount").cast("float"))
df_restaurant = df_restaurant.withColumn("delivery_time", col("delivery_time").cast("float"))

# Base principal
df_ab_orders = df_order.join(df_ab_test, on="customer_id", how="inner")

# KPIs principais
df_metrics = df_ab_orders.groupBy("is_target").agg(
    countDistinct("customer_id").alias("qtd_usuarios"),
    count("order_id").alias("qtd_pedidos"),
    sum("order_total_amount").alias("valor_total_pedidos"),
    avg("order_total_amount").alias("ticket_medio")
).withColumn("pedidos_por_usuario", col("qtd_pedidos") / col("qtd_usuarios"))

# Retenção
df_retent = df_ab_orders.groupBy("is_target", "customer_id").agg(count("order_id").alias("qtd_pedidos"))
df_retencao = df_retent.groupBy("is_target").agg(
    count("customer_id").alias("total_usuarios"),
    count(when(col("qtd_pedidos") > 1, True)).alias("usuarios_retidos")
).withColumn("taxa_retencao", round(col("usuarios_retidos") / col("total_usuarios") * 100, 2))

# Incremento percentual
df_ticket_medio = df_ab_orders.groupBy("is_target").agg(round(avg("order_total_amount"), 2).alias("ticket_medio"))
df_incremento = df_ticket_medio.groupBy().pivot("is_target").agg(first("ticket_medio")).withColumn(
    "incremento_percentual", round((col("target") - col("control")) / col("control") * 100, 2)
)

# Heavy users
df_user_pedidos = df_ab_orders.groupBy("customer_id", "is_target").agg(count("order_id").alias("qtd_pedidos"))
percentil_80 = df_user_pedidos.approxQuantile("qtd_pedidos", [0.80], 0.01)[0]
df_heavy_users = df_user_pedidos.withColumn("heavy_user", when(col("qtd_pedidos") >= percentil_80, 1).otherwise(0))
df_heavy_summary = df_heavy_users.groupBy("is_target").agg(
    count(when(col("heavy_user") == 1, True)).alias("qtd_heavy_users"),
    count("*").alias("total_users")
).withColumn("percentual_heavy_users", round(col("qtd_heavy_users") / col("total_users") * 100, 2))

# Join com restaurantes
df_ab_restaurant = df_ab_orders.join(df_restaurant, df_ab_orders["merchant_id"] == df_restaurant["id"], "inner")

# Tempo médio de entrega
df_delivery_time = df_ab_restaurant.groupBy("is_target").agg(
    round(avg("delivery_time"), 2).alias("tempo_medio_entrega")
)

# Restaurantes por grupo
df_qtd_rest = df_ab_restaurant.groupBy("is_target").agg(countDistinct("merchant_id").alias("qtd_restaurantes"))

# Média de pedidos por restaurante
df_pedidos_por_rest = df_ab_restaurant.groupBy("is_target", "merchant_id").agg(count("order_id").alias("pedidos"))
df_avg_pedidos_rest = df_pedidos_por_rest.groupBy("is_target").agg(
    round(avg("pedidos"), 2).alias("media_pedidos_por_restaurante")
)

# Pareto 80/20 dos restaurantes
df_total_pedidos = df_ab_restaurant.groupBy("merchant_id").agg(count("*").alias("total_pedidos"))
total_geral = df_total_pedidos.agg(sum("total_pedidos")).first()[0]
df_pareto = df_total_pedidos.orderBy(col("total_pedidos").desc())
df_pareto = df_pareto.withColumn("acumulado", F.sum("total_pedidos").over(Window.orderBy(col("total_pedidos").desc())))
df_pareto = df_pareto.withColumn("acumulado_perc", col("acumulado") / lit(total_geral))
top_20_restaurantes = df_pareto.filter(col("acumulado_perc") <= 0.8).count()

# Exibir resultados principais
print("🔝 Número de restaurantes que fazem 80% dos pedidos:", top_20_restaurantes)
df_metrics.show()
df_retencao.show()
df_incremento.show()
df_heavy_summary.show()
df_delivery_time.show()
df_qtd_rest.show()
df_avg_pedidos_rest.show()

🔝 Número de restaurantes que fazem 80% dos pedidos: 2629
+---------+------------+-----------+--------------------+------------------+-------------------+
|is_target|qtd_usuarios|qtd_pedidos| valor_total_pedidos|      ticket_medio|pedidos_por_usuario|
+---------+------------+-----------+--------------------+------------------+-------------------+
|  control|      360542|    1525576| 7.307187305451663E7|47.897891061813134|  4.231340592774212|
|   target|      445924|    2136745|1.0200757006332143E8| 47.73970224023991|   4.79172459881056|
+---------+------------+-----------+--------------------+------------------+-------------------+

+---------+--------------+----------------+-------------+
|is_target|total_usuarios|usuarios_retidos|taxa_retencao|
+---------+--------------+----------------+-------------+
|  control|        360542|          269341|         74.7|
|   target|        445924|          354538|        79.51|
+---------+--------------+----------------+-------------+

+-------+--