# Análise Completa - Case Ifood: Teste A/B Estratégia de Cupons

Notebook único para orquestrar as tarefas de execução de *setup*, **ETL** e análise dos dados, integrando os diferentes módulos do repositório de origem:

- Clona/atualiza o repositório do projeto, com as dependências, no Colab
- Instala dependências e faz o **download** dos dados brutos
- Sobe Spark e executa o **ETL** (orders/consumers/restaurants + mapa A/B)
- Mantém `orders_silver` e `users_silver` em memória
- Realiza a análise exploratória dos dados


## Configuração do Ambiente e Preparação dos Dados

### Configuração de Ambiente e Download de Dados Brutos

In [None]:
import os, sys, subprocess
from pathlib import Path

GITHUB_USER = "silvaniacorreia"
REPO_NAME   = "ifood-case-cupons"
REPO_URL    = f"https://github.com/{GITHUB_USER}/{REPO_NAME}.git"

def run(cmd):
    print(">", " ".join(cmd))
    subprocess.check_call(cmd)

# clonar/atualizar repositório
ROOT = Path("/content")
PROJECT_DIR = ROOT / REPO_NAME
if not PROJECT_DIR.exists():
    run(["git", "clone", REPO_URL, str(PROJECT_DIR)])
else:
    os.chdir(PROJECT_DIR)
    run(["git", "fetch", "--all"])
    run(["git", "reset", "--hard", "origin/main"])
    run(["git", "checkout", "main"])
    run(["git", "pull", "origin", "main"])
os.chdir(PROJECT_DIR)

# dependências + doewnload bases de dados
run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt", "--no-cache-dir"])
run([sys.executable, "scripts/download_data.py"])

# sys.path
if str(PROJECT_DIR) not in sys.path:
    sys.path.insert(0, str(PROJECT_DIR))
print("✔️ Bootstrap concluído. Projeto:", PROJECT_DIR)

### Iniciando o Spark

In [None]:
from src.utils import load_settings, get_spark

s = load_settings()  
extra = dict(getattr(s.runtime.spark, "conf", {}) or {})
extra.setdefault("spark.sql.execution.arrow.pyspark.enabled", "true")

spark = get_spark(
    app_name=s.runtime.spark.app_name,
    shuffle_partitions=s.runtime.spark.shuffle_partitions,
    extra_conf=extra,
)
print("✔️ Spark ativo - versão:", spark.version)
spark.range(5).show()

### Análises Pré-Flight

In [None]:
# Checagens dados brutos
from src.checks import preflight
from pprint import pprint

rep = preflight(s.data.raw_dir, strict=False)
print("Pré-flight (resumo):")
pprint({
    "raw_dir": rep["raw_dir"],
    "orders_format_guess": rep["orders_format_guess"],
    "files": {k: {kk: vv for kk, vv in v.items() if kk in ("exists","size_bytes","gzip_ok","tar_ok")} for k, v in rep["files"].items()},
    "ab_csv_candidates": rep["ab_csv_candidates"][:3],
})


### ETL (Extração, Transformação e Carga)

In [None]:
from src import etl, checks
from pyspark.sql import functions as F
import os

def _get_exp_window(s):
    """
    Lê a janela do experimento a partir das configurações. Caso não exista, utiliza inferência automática.

    Parâmetros:
        s: Objeto de configurações carregado.

    Retorna:
        Tuple[str, str, bool]: Data de início, data de fim e flag de inferência automática.
    """
    win = getattr(s.analysis, "experiment_window", None)
    if isinstance(win, dict):
        start = win.get("start")
        end   = win.get("end")
    else:
        start = None
        end   = None
    auto = bool(getattr(s.analysis, "auto_infer_window", True))
    return start, end, auto

start, end, auto = _get_exp_window(s)

# Leitura dos dados brutos
orders, consumers, restaurants, abmap = etl.load_raw(spark, s.data.raw_dir)
checks.profile_loaded(orders, consumers, restaurants, abmap, n=5)

# Limpeza e conformidade dos dados
df = etl.clean_and_conform(
    orders, consumers, restaurants, abmap,
    business_tz=getattr(s.analysis, "business_tz", "America/Sao_Paulo"),
    treat_is_target_null_as_control=getattr(s.analysis, "treat_is_target_null_as_control", False),
    experiment_start=start,
    experiment_end=end,
    auto_infer_window=auto,
    use_quantile_window=True,     
    verbose=True
)

# Ajustes finais e agregações para análise
orders_silver = etl.build_orders_silver(df)
orders_silver = etl.enrich_orders_for_analysis(orders_silver)
users_silver  = etl.build_user_aggregates(orders_silver, start, end)

# Cálculo de recência com base no último timestamp observado
ref_ts = orders_silver.agg(F.max("event_ts_utc")).first()[0]
users_silver = users_silver.withColumn("recency", F.datediff(F.lit(ref_ts), F.col("last_order")))

# Salvar resultados em formato Parquet (opcional)
SAVE_PARQUET = False
if SAVE_PARQUET:
    (
        orders_silver
        .write
        .mode("overwrite")
        .partitionBy("event_date_brt")
        .parquet(f"{s.data.processed_dir}/orders_silver.parquet")
    )
    users_silver.write.mode("overwrite").parquet(f"{s.data.processed_dir}/users_silver.parquet")

# Contagem de linhas para validação
print("orders_silver:", orders_silver.count(), "linhas")
print("users_silver :", users_silver.count(), "linhas")

# Exibição de amostras para validação
print("Aviso: toPandas falhou, mostrando via Spark .show()")
orders_silver.show(5, truncate=False)
users_silver.show(5, truncate=False)

### Checagem dos Dados

Foram investigadas duplicatas semânticas na fato (IDs diferentes com mesmo cliente/loja/tempo/valor). Como apenas 1 caso foi encontrado, o que gera efeito desprezível, não foi aplicada a deduplicação adicional.

In [None]:
def check_post_etl(
    orders_silver,
    users_silver,
    *,
    light: bool = True,
    key_cols: list[str] | None = None,
    sample_frac: float = 0.001,
    preview_rows: int = 5,
    use_pandas_preview: bool = False,
    check_semantic_dups: bool = True,
):
    """
    Executa checagens leves pós-ETL para registro no Colab.
    - light=True: nulos apenas em colunas-chave e previews por sample.
    - light=False: nulos em todas as colunas (lento).
    - check_semantic_dups: investiga duplicatas semânticas na fato (lento moderado).
    """
    if key_cols is None:
        key_cols = [
            "order_id", "customer_id", "merchant_id",
            "event_ts_utc", "order_total_amount",
            "is_target", "price_range", "language", "active",
            "delivery_time_imputed", "minimum_order_value_imputed",
        ]
    key_cols = [c for c in key_cols if c in orders_silver.columns]

    print("Faixa de datas (UTC) em orders_silver:")
    orders_silver.agg(
        F.min("event_ts_utc").alias("min_utc"),
        F.max("event_ts_utc").alias("max_utc"),
    ).show(truncate=False)

    print("Split A/B (users):")
    users_silver.groupBy("is_target").count().orderBy("is_target").show()

    # Nulos em orders_silver
    def nulls_by_col(df, cols):
        exprs = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in cols]
        return df.select(exprs)

    if light:
        print(f"Nulos (colunas-chave): {key_cols}")
        nulls_by_col(orders_silver, key_cols).show(truncate=False)
    else:
        print("Nulos (todas as colunas) — operação pesada:")
        nulls_by_col(orders_silver, orders_silver.columns).show(truncate=False)

    # Duplicatas semânticas (order_ids diferentes com mesmo cliente/restaurante/ts/valor)
    if check_semantic_dups:
        print("\nPossíveis duplicatas sistêmicas (mesmo cliente/restaurante/ts/valor, order_id distinto):")
        dups = (
            orders_silver
            .groupBy("customer_id", "merchant_id", "event_ts_utc", "order_total_amount")
            .agg(
                F.countDistinct("order_id").alias("n_orders"),
                F.collect_set("order_id").alias("order_ids"),
            )
            .filter(F.col("n_orders") > 1)
        )
        total_dups = dups.count()
        print(f"Total de combinações com múltiplos order_id: {total_dups}")
        if total_dups > 0:
            dups.select("customer_id","merchant_id","event_ts_utc","order_total_amount","n_orders","order_ids")\
                .orderBy(F.col("n_orders").desc())\
                .show(10, truncate=False)

    # Previews rápidos
    print("\nPreview orders_silver (sample leve):")
    orders_preview_cols = [c for c in [
        "price_range","order_id","customer_id","merchant_id",
        "event_ts_utc","order_total_amount","origin_platform",
        "is_target","language","active"
    ] if c in orders_silver.columns]
    preview_df = orders_silver.sample(False, sample_frac, seed=42).select(*orders_preview_cols)
    if preview_df.rdd.isEmpty():
        preview_df = orders_silver.select(*orders_preview_cols).limit(preview_rows)
    preview_df.show(preview_rows, truncate=False)

    print("\nPreview users_silver (primeiras linhas):")
    users_preview_cols = [c for c in [
        "customer_id","last_order","frequency","monetary","is_target","recency"
    ] if c in users_silver.columns]
    users_silver.select(*users_preview_cols).show(preview_rows, truncate=False)

    if use_pandas_preview:
        try:
            from IPython.display import display
            display(orders_silver.limit(preview_rows).toPandas())
            display(users_silver.limit(preview_rows).toPandas())
        except Exception:
            pass

check_post_etl(orders_silver, users_silver, light=True, check_semantic_dups=True)


## A/B de cupons

### Visualizações para exploração

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from src.analysis_ab import (
    compute_ab_summary,
    compute_robust_metrics_spark,
    collect_user_level_for_tests,
    run_ab_tests,
    run_nonparam_tests,
    financial_viability
)
from src.viz_ab import (
    plot_group_bars, 
    plot_ab_box, 
    plot_ab_hist_overlay, 
    save_table_csv
)

settings = load_settings("config/settings.yaml")

# Amostragem 
SAMPLE_FRAC = 0.15    
MAX_ROWS    = 100_000  
CLIP_P      = 0.01     

users_pdf = collect_user_level_for_tests(users_silver, sample_frac=0.15, seed=42)

if len(users_pdf) > MAX_ROWS:
    users_pdf = users_pdf.sample(n=MAX_ROWS, random_state=42)

if "aov_user" not in users_pdf.columns:
    users_pdf["aov_user"] = users_pdf["monetary"] / users_pdf["frequency"].replace({0: np.nan})

# Recorte de cauda para visual 
q_low  = users_pdf[["monetary","frequency","aov_user"]].quantile(CLIP_P)
q_high = users_pdf[["monetary","frequency","aov_user"]].quantile(1-CLIP_P)
for col in ["monetary","frequency","aov_user"]:
    users_pdf[col] = users_pdf[col].clip(q_low[col], q_high[col])

users_pdf["grupo"] = users_pdf["is_target"].map({0:"Controle", 1:"Tratamento"}).astype("category")

# Boxplots
sns.set_theme()
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
sns.boxplot(x="grupo", y="monetary", data=users_pdf, ax=axes[0], showfliers=False)
axes[0].set_title("GMV por Usuário (amostra, caudas recortadas)")
sns.boxplot(x="grupo", y="frequency", data=users_pdf, ax=axes[1], showfliers=False)
axes[1].set_title("Pedidos por Usuário (amostra)")
sns.boxplot(x="grupo", y="aov_user", data=users_pdf, ax=axes[2], showfliers=False)
axes[2].set_title("AOV por Usuário (amostra)")
plt.tight_layout(); plt.show()

# Barras de médias (amostra)
metrics_means = (users_pdf
                 .groupby("grupo")[["monetary","frequency","aov_user"]]
                 .mean()
                 .reset_index())
ax = metrics_means.plot(x="grupo", kind="bar", figsize=(10,6))
plt.title("Médias por Grupo (GMV, Pedidos, AOV) — amostra")
plt.ylabel("Valor médio (amostra)")
plt.tight_layout(); plt.show()

# Histogramas
plt.figure(figsize=(10,6))
sns.histplot(users_pdf.loc[users_pdf["grupo"]=="Controle","frequency"],
             bins=30, stat="density", alpha=0.5, label="Controle")
sns.histplot(users_pdf.loc[users_pdf["grupo"]=="Tratamento","frequency"],
             bins=30, stat="density", alpha=0.5, label="Tratamento")
plt.legend(); plt.title("Distribuição de Pedidos por Usuário — amostra (caudas recortadas)")
plt.tight_layout(); plt.show()


### Métricas por grupo

Premissas:
* Valor do cupom: R$ 10,00
    *  Pago integralmente pelo iFood
* Taxa de resgate: 30%
* Take rate: 23%

Dada a distribuição assimétrica dos dados evidenciada pelos gráficos, com muitos outliers, optou-se por utilizar métricas robustas (medianas, p95, heavy users) para a análise de impacto. Métricas robustas ajudam a evitar decisões enviesadas por outliers, garantindo que sejam interpretadas à luz do comportamento da maioria dos usuários. Métricas baseadas em médias também são apresentadas para comparação, mas com cautela, pois podem ser influenciadas por valores extremos. O relatório final incluirá somente métricas robustas e testes estatísticos apropriados.

In [None]:
out_ab = "outputs/ab"

# Resumo por grupo (descrição em Spark)
ab_summary_spark = compute_ab_summary(users_silver)
ab_summary_spark.show(truncate=False)
ab_summary_pdf = ab_summary_spark.toPandas()
save_table_csv(ab_summary_pdf, out_ab, "ab_means")

# Métricas robustas (mediana, p95, heavy)
robust_spark = compute_robust_metrics_spark(users_silver, heavy_threshold=3)
robust_pdf = robust_spark.toPandas()
save_table_csv(robust_pdf, out_ab, "ab_robust")
display(robust_pdf)



### Testes de significância

In [None]:
# Teste paramétrico
ttest_out = run_ab_tests(users_pdf) 

# Teste não-paramétrico
mw_out = run_nonparam_tests(users_pdf)  

print("Welch t-test:", ttest_out)
print("Mann–Whitney:", mw_out)

### Viabilidade financeira

In [None]:
res_fin = financial_viability(
    users_silver,
    take_rate=s.finance.take_rate,
    coupon_cost=s.finance.coupon_cost_default,
    redemption_rate=0.30, 
)

print("Viabilidade financeira:")
display(res_fin)

### Visualizações para relatório

In [None]:
from src.viz_ab import plot_group_bars, plot_ab_box, plot_ab_hist_overlay

# barras com médias
plot_group_bars(
    ab_summary_pdf.rename(columns={"aov":"aov_user"}),
    metrics=["gmv_user","pedidos_user","aov_user"],
    labels_map={"gmv_user":"GMV/usuário","pedidos_user":"Pedidos/usuário","aov_user":"AOV"},
    outdir="outputs/ab", fname="bars_means", title="Médias por grupo"
)

# barras com medianas
plot_group_bars(
    robust_pdf.rename(columns={
        "median_gmv_user":"GMV mediano",
        "median_pedidos_user":"Pedidos medianos",
        "median_aov_user":"AOV mediano"
    })[["is_target","GMV mediano","Pedidos medianos","AOV mediano"]],
    metrics=["GMV mediano","Pedidos medianos","AOV mediano"],
    outdir="outputs/ab", fname="bars_medians", title="Métricas robustas por grupo"
)

# distribuições com users_pdf (amostra)
for metric in ["monetary","frequency","aov_user"]:
    plot_ab_box(users_pdf, metric, outdir="outputs/ab", fname=f"box_{metric}", clip_p=0.01)
    plot_ab_hist_overlay(users_pdf, metric, outdir="outputs/ab", fname=f"hist_{metric}", clip_p=0.01)


## Análise de Segmentos

### Construção de segmentos

In [None]:
from src.analysis_segments import build_rfm_buckets, ab_metrics_by_segment
from src.viz_segments import to_pandas_spark, save_table_csv

# RFM
users_with_rfm = build_rfm_buckets(users_silver)

sample_ids_spark = spark.createDataFrame(
    users_pdf[["customer_id"]].drop_duplicates()
)
rfm_small = (
    users_with_rfm
    .join(sample_ids_spark, "customer_id", "inner")
    .select("customer_id","rfm_segment")
    .toPandas()
)
users_pdf = users_pdf.merge(rfm_small, on="customer_id", how="left")

# Heavy / New / Platform / RFM
ab_heavy = ab_metrics_by_segment(users_silver, segment_col="heavy_user")
ab_new   = ab_metrics_by_segment(users_silver, segment_col="is_new_customer")
ab_plat  = ab_metrics_by_segment(users_silver, segment_col="origin_platform")
ab_rfm   = ab_metrics_by_segment(users_with_rfm, segment_col="rfm_segment", top_k_segments=10)

ab_heavy_pd = to_pandas_spark(ab_heavy)
ab_new_pd   = to_pandas_spark(ab_new)
ab_plat_pd  = to_pandas_spark(ab_plat)
ab_rfm_pd   = to_pandas_spark(ab_rfm)

outdir = "outputs/segments"
save_table_csv(ab_heavy_pd, outdir, "ab_heavy_summary")
save_table_csv(ab_new_pd,   outdir, "ab_new_summary")
save_table_csv(ab_plat_pd,  outdir, "ab_platform_summary")
save_table_csv(ab_rfm_pd,   outdir, "ab_rfm_summary")
ab_heavy_pd, ab_new_pd.head(), ab_plat_pd.head(), ab_rfm_pd.head()


### Testes e métricas robustas por segmento

In [None]:
from src.analysis_segments import robust_metrics_by_segment, nonparam_tests_by_segment, finance_by_segment

SEG_COLS_PANDAS = [c for c in ["heavy_user","is_new_customer","origin_platform","rfm_segment"] if c in users_pdf.columns]
SEG_COLS_SPARK  = [c for c in ["heavy_user","origin_platform","rfm_segment"] if c in users_silver.columns]  

# tabelas robustas (medianas/p95/heavy rate) (amostra)
robust_tables = {
    seg: robust_metrics_by_segment(users_pdf, segment_col=seg, heavy_threshold=3)
    for seg in SEG_COLS_PANDAS
}

# testes não-paramétricos (Mann–Whitney) (amostra)
mw_tests = {
    seg: nonparam_tests_by_segment(users_pdf, segment_col=seg)
    for seg in SEG_COLS_PANDAS
}

# financeiro por segmento EM SPARK (100% da base)
finance_tables = {
    seg: finance_by_segment(
        users_silver, segment_col=seg,
        take_rate=0.23, coupon_cost=10.0, redemption_rate=0.30
    )
    for seg in SEG_COLS_SPARK
}

# Imprimir testes
mw_tests.get("heavy_user"), list(finance_tables.get("heavy_user", {}).items())[:2]
mw_tests.get("origin_platform"), list(finance_tables.get("origin_platform", {}).items())[:2]
mw_tests.get("rfm_segment"), list(finance_tables.get("rfm_segment", {}).items())[:2]
print("Teste Mann-Whitney:", mw_tests.get("heavy_user"))
print("Teste Mann-Whitney:", mw_tests.get("origin_platform"))
print("Teste Mann-Whitney:", mw_tests.get("rfm_segment"))

# salvar tabelas 
outdir = "outputs/tables_segments"
os.makedirs(outdir, exist_ok=True)

# robust_* (um CSV por segmento)
for seg, df in robust_tables.items():
    df.to_csv(f"{outdir}/robust_{seg}.csv", index=False)

# finance_* (dict -> DataFrame por segmento)
for seg, d in finance_tables.items():
    if d:  # pode estar vazio se faltar um dos grupos no segmento
        pd.DataFrame.from_dict(d, orient="index").reset_index(names=[seg]).to_csv(
            f"{outdir}/finance_{seg}.csv", index=False
        )


### Visualização de Segmentos

In [None]:
from pathlib import Path
import pandas as pd
from src.viz_segments import plot_bars_from_robust, plot_rate_by_segment

figdir = "outputs/figs_segments"
Path(figdir).mkdir(parents=True, exist_ok=True)

# DFs robustos (prioriza o que já está em memória; se não houver, lê dos CSVs)
def get_robust_df(key, fallback_path):
    if "robust_tables" in globals() and key in robust_tables:
        return robust_tables[key]
    return pd.read_csv(fallback_path)

robust_heavy_pd = get_robust_df("heavy_user",      "outputs/tables_segments/robust_heavy_user.csv")
robust_plat_pd  = get_robust_df("origin_platform", "outputs/tables_segments/robust_origin_platform.csv")
robust_rfm_pd   = get_robust_df("rfm_segment",     "outputs/tables_segments/robust_rfm_segment.csv")

# Barras de MEDIANAS por segmento
plot_bars_from_robust(
    robust_heavy_pd, "segment", which="median",
    title="Medianas por segmento (Heavy vs Não-heavy)",
    outdir=figdir, fname="bars_heavy_medianas"
)

plot_bars_from_robust(
    robust_plat_pd, "segment", which="median",
    title="Medianas por segmento (Plataforma)",
    outdir=figdir, fname="bars_platform_medianas"
)

# Barras de P95 
plot_bars_from_robust(
    robust_heavy_pd, "segment", which="p95",
    title="p95 por segmento (Heavy vs Não-heavy)",
    outdir=figdir, fname="bars_heavy_p95"
)

# % de heavy users (≥3) por segmento
plot_rate_by_segment(
    robust_plat_pd, "segment",
    title="% de heavy users (≥3) por plataforma",
    outdir=figdir, fname="bars_platform_heavy_rate"
)


### Break-even do cupom 


In [None]:
import os

# Parâmetros 
try:
    TAKE_RATE = float(s.finance.take_rate)
    COUPON_COST = float(s.finance.coupon_cost_default)
except Exception:
    TAKE_RATE = 0.23
    COUPON_COST = 10.0

REDEMPTION = 0.30  
OUTDIR = "outputs/tables_segments"
os.makedirs(OUTDIR, exist_ok=True)

def break_even_table_spark(
    users_silver,
    *,
    take_rate: float,
    coupon_cost: float,
    redemption_rate: float,
    segment_col: str | None = None,
    id_col: str = "customer_id",
    group_col: str = "is_target",
    monetary_col: str = "monetary",
) -> pd.DataFrame:
    """
    Calcula, no Spark, o uplift de GMV por usuário tratado e compara com o break-even.
    Se segment_col=None, retorna 1 linha (overall). Caso contrário, 1 linha por segmento.
    """
    needed = [id_col, group_col, monetary_col]
    if segment_col:
        needed.append(segment_col)
    missing = [c for c in needed if c not in users_silver.columns]
    if missing:
        raise KeyError(f"Colunas ausentes no users_silver: {missing}")

    by = [segment_col, group_col] if segment_col else [group_col]
    ab = (
        users_silver
        .groupBy(*by)
        .agg(
            F.countDistinct(F.col(id_col)).alias("usuarios"),
            F.avg(F.col(monetary_col)).alias("gmv_user")
        )
    )

    cond_ctrl = (F.col(group_col) == 0)
    cond_trat = (F.col(group_col) == 1)

    if segment_col:
        ctrl = ab.filter(cond_ctrl).select(
            F.col(segment_col).alias("segment"),
            F.col("usuarios").alias("usuarios_ctrl"),
            F.col("gmv_user").alias("gmv_ctrl")
        )
        trat = ab.filter(cond_trat).select(
            F.col(segment_col).alias("segment"),
            F.col("usuarios").alias("usuarios_trat"),
            F.col("gmv_user").alias("gmv_trat")
        )
        joined = trat.join(ctrl, on="segment", how="inner")
    else:
        # cria uma chave única "ALL"
        ctrl = ab.filter(cond_ctrl).select(
            F.lit("ALL").alias("segment"),
            F.col("usuarios").alias("usuarios_ctrl"),
            F.col("gmv_user").alias("gmv_ctrl")
        )
        trat = ab.filter(cond_trat).select(
            F.lit("ALL").alias("segment"),
            F.col("usuarios").alias("usuarios_trat"),
            F.col("gmv_user").alias("gmv_trat")
        )
        joined = trat.join(ctrl, on="segment", how="inner")

    uplift_needed = (coupon_cost * redemption_rate) / take_rate if take_rate > 0 else None

    joined = (
        joined
        .withColumn("uplift_gmv_user", F.col("gmv_trat") - F.col("gmv_ctrl"))
        .withColumn("uplift_needed", F.lit(float(uplift_needed) if uplift_needed is not None else None))
        .withColumn("gap_uplift", F.col("uplift_gmv_user") - F.col("uplift_needed"))
        .withColumn("receita_total", F.lit(take_rate) * F.col("uplift_gmv_user") * F.col("usuarios_trat"))
        .withColumn("custo_total", F.lit(coupon_cost * redemption_rate) * F.col("usuarios_trat"))
        .withColumn("lucro_total", F.col("receita_total") - F.col("custo_total"))
        .withColumn("lucro_por_usuario", F.when(F.col("usuarios_trat") > 0, F.col("lucro_total")/F.col("usuarios_trat")).otherwise(F.lit(0.0)))
        .withColumn("roi_percent", F.when(F.col("custo_total") > 0, F.col("lucro_total")/F.col("custo_total")).otherwise(F.lit(None)))
        .withColumn("status", F.when(F.col("lucro_por_usuario") >= 0, F.lit("OK")).otherwise(F.lit("NEG")))
    )

    cols = [
        "segment","usuarios_trat","gmv_ctrl","gmv_trat","uplift_gmv_user",
        "uplift_needed","gap_uplift","receita_total","custo_total","lucro_total",
        "lucro_por_usuario","roi_percent","status"
    ]
    pdf = joined.select(*cols).toPandas()

    for c in ["gmv_ctrl","gmv_trat","uplift_gmv_user","uplift_needed","gap_uplift","lucro_por_usuario"]:
        if c in pdf.columns:
            pdf[c] = pdf[c].astype(float).round(2)
    for c in ["receita_total","custo_total","lucro_total"]:
        if c in pdf.columns:
            pdf[c] = pdf[c].astype(float).round(0)
    if "roi_percent" in pdf.columns:
        pdf["roi_percent"] = pdf["roi_percent"].astype(float).round(3)

    return pdf

# Tabelas de break-even (overall e por segmento)
be_overall = break_even_table_spark(
    users_silver,
    take_rate=TAKE_RATE, coupon_cost=COUPON_COST, redemption_rate=REDEMPTION,
    segment_col=None
)
be_heavy = break_even_table_spark(
    users_silver,
    take_rate=TAKE_RATE, coupon_cost=COUPON_COST, redemption_rate=REDEMPTION,
    segment_col="heavy_user" if "heavy_user" in users_silver.columns else None
)
be_platform = break_even_table_spark(
    users_silver,
    take_rate=TAKE_RATE, coupon_cost=COUPON_COST, redemption_rate=REDEMPTION,
    segment_col="origin_platform" if "origin_platform" in users_silver.columns else None
)

# Salva CSVs
be_overall.to_csv(f"{OUTDIR}/break_even_overall.csv", index=False)
if not be_heavy.empty:
    be_heavy.to_csv(f"{OUTDIR}/break_even_heavy_user.csv", index=False)
if not be_platform.empty:
    be_platform.to_csv(f"{OUTDIR}/break_even_origin_platform.csv", index=False)

display(be_overall)
display(be_heavy)
display(be_platform)
