In [1]:
# Manipulação e análise de dados
import pandas as pd
import numpy as np

# Funções utilitárias
from functools import reduce
from pathlib import Path
import json

# Visualização de dados
import matplotlib.pyplot as plt
import seaborn as sns

# Machine Learning - pré-processamento, clustering e métricas
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.metrics import silhouette_score

In [2]:
# 1) Ler o parquet já limpo
df = pd.read_parquet(
    r"C:\Users\A200068494\Downloads\ENTERPRISE_CHALLENGE_CLICKBUS\nextrip_ai\data_intermediate\purchases_clean.parquet"
)

In [3]:
TZ = "America/Sao_Paulo"
pd.options.display.float_format = "{:,.2f}".format

In [4]:
base = Path.cwd().parent  # ...\nextrip_ai

# 1)	Padronização de nomes e tipos

In [5]:
# 2) (Re)criar datetime_purchase (date + time, com timezone)
#    Suporta quando 'time_purchase' veio como objeto time/objeto string
date_s = pd.to_datetime(df["date_purchase"], errors="coerce").dt.strftime("%Y-%m-%d")
time_s = df["time_purchase"].astype(str).str.slice(0, 8)  # garante "HH:MM:SS"
dt = pd.to_datetime(date_s + " " + time_s, errors="coerce")  # timestamp naive
df["datetime_purchase"] = dt.dt.tz_localize(TZ, nonexistent="NaT", ambiguous="NaT")

In [6]:
# 3) Indicadores de volta (EDA: relaxado x estrito)
#    Relaxado: basta ter origem e destino de volta preenchidos
df["has_return_relaxed"] = (
    df["place_origin_return"].notna() & df["place_destination_return"].notna()
).astype("Int64")

#    Estrito: sua regra (origem/destino de volta + tickets>=2)
df["has_return_strict"] = (
    df["has_return_relaxed"].astype(bool) & (df["total_tickets_quantity_success"] >= 2)
).astype("Int64")

#    Defina qual será o "oficial" nas análises. Sugiro o relaxado como padrão:
df["has_return"] = df["has_return_relaxed"]

In [7]:
# 4) Padronizar a rota de ida (apenas estética/consistência)
df["route_out"] = (
    df["place_origin_departure"].astype(str).str.strip()
    + "->"
    + df["place_destination_departure"].astype(str).str.strip()
)

In [8]:
# 5) Garantir a chave (fk_contact, datetime_purchase) única por compra
df = df.sort_values(["fk_contact", "datetime_purchase", "gmv_success"]).drop_duplicates(
    subset=["fk_contact", "datetime_purchase"], keep="first"
)

In [9]:
# 6) Outlier P99 como FLAG (não remover do silver)
if "avg_price_per_ticket" not in df.columns:
    df["avg_price_per_ticket"] = (
        df["gmv_success"] / df["total_tickets_quantity_success"]
    )

p99 = df["avg_price_per_ticket"].quantile(0.99)
df["is_price_outlier_p99"] = (df["avg_price_per_ticket"] > p99).astype("Int64")

In [10]:
destino_prt = base / "data_intermediate" / "purchases_ready_trimmed.parquet"
destino_prt.parent.mkdir(parents=True, exist_ok=True)

df.to_parquet(destino_prt, index=False)

In [17]:
REF_FREQ = "D"  # "D" (diário) ou "W-MON" (semanal ancorada na segunda)
MIN_HISTORY_DAYS = 14  # mínimo de histórico antes de t
HORIZONS = [7, 30]  # janelas de rótulo em dias

In [22]:
# ---------- 1) CARREGAR E NORMALIZAR DATAS ----------
raw = pd.read_parquet(destino_prt)  # destino_prt já é um Path, então funciona

# Crie purchase_dt ANTES de selecionar colunas
raw["purchase_dt"] = _get_purchase_dt(raw, TZ)

# Agora sim, mantenha apenas o necessário
df = (
    raw[["fk_contact", "purchase_dt"]]
    .assign(fk_contact=lambda d: d["fk_contact"].astype(str).str.strip())
    .dropna(subset=["fk_contact", "purchase_dt"])
)
df["purchase_dt"] = pd.to_datetime(df["purchase_dt"], errors="coerce")
df = (
    df[df["purchase_dt"].notna()]
    .sort_values(["fk_contact", "purchase_dt"])
    .reset_index(drop=True)
)

obs_start = df["purchase_dt"].min()
obs_end = df["purchase_dt"].max()
print("Janela de observação:", obs_start.date(), "→", obs_end.date())

Janela de observação: 2013-09-12 → 2024-04-01


In [23]:
# ---------- 2) PREPARO POR CLIENTE (DIAS ÚNICOS + CONTAGEM) ----------
# Vamos trabalhar em nível de DIA (sem hora), contando quantas compras ocorreram no dia.
by_day = (
    df.groupby(["fk_contact", "purchase_dt"])
    .size()
    .rename("cnt_day")
    .reset_index()
    .sort_values(["fk_contact", "purchase_dt"])
)

# Para facilitar o streaming, separemos os grupos
groups = by_day.groupby("fk_contact")

In [25]:
from pathlib import Path

# Onde salvar as labels
OUT_DIR = (
    (base / "data_products" / "labels")
    if "base" in locals()
    else Path.cwd() / "data_products" / "labels"
)
OUT_DIR.mkdir(parents=True, exist_ok=True)

CSV_OUT = OUT_DIR / "label_definitions.csv"

In [26]:
# ---------- 3) ESCREVER CSV INICIAL (cabeçalho) ----------
final_cols = [
    "fk_contact",
    "ref_date",
    "y_buy_7d",
    "y_buy_30d",
    "days_until_next",
    "censored",
    "last_purchase_dt",
    "recency_days",
    "n_purchases_life",
]
# zera/reescreve o arquivo com cabeçalho
pd.DataFrame(columns=final_cols).to_csv(CSV_OUT, index=False)
print("Escrevendo em:", CSV_OUT.resolve())

Escrevendo em: C:\Users\A200068494\Downloads\ENTERPRISE_CHALLENGE_CLICKBUS\nextrip_ai\data_products\labels\label_definitions.csv


In [27]:
# ---------- 4) FUNÇÃO DE ROTULAGEM POR CLIENTE ----------
def build_labels_for_customer(
    cid,
    gdf,
    ref_freq=REF_FREQ,
    min_hist=MIN_HISTORY_DAYS,
    horizons=HORIZONS,
    obs_end=obs_end,
):
    """
    gdf: DataFrame com colunas ['fk_contact','purchase_dt','cnt_day'] para um cliente.
    Retorna um DataFrame com as colunas finais para esse cliente (pode ser vazio).
    """
    gdf = gdf.sort_values("purchase_dt")
    first_purchase = gdf["purchase_dt"].iloc[0]
    ref_start = first_purchase + pd.Timedelta(days=min_hist)
    if ref_start > obs_end:
        return pd.DataFrame(columns=final_cols)  # sem referências válidas

    # Série de referências do cliente
    ref_dates = pd.date_range(start=ref_start, end=obs_end, freq=ref_freq)
    if len(ref_dates) == 0:
        return pd.DataFrame(columns=final_cols)

    # Dias de compra únicos e contagem por dia
    days = gdf["purchase_dt"].to_numpy(dtype="datetime64[ns]")
    day_counts = gdf["cnt_day"].to_numpy(dtype=np.int64)
    cum_counts = day_counts.cumsum()  # acumulado até cada dia da sequência 'days'

    # Vetores numpy para as refs
    refs = ref_dates.to_numpy(dtype="datetime64[ns]")

    # Índice da próxima compra STRICTAMENTE > t  => posição de inserção à direita
    idx_next = np.searchsorted(days, refs, side="right")
    # Índice da última compra <= t
    idx_prev = idx_next - 1

    # Máscaras de validade
    valid_prev = idx_prev >= 0
    valid_next = idx_next < len(days)

    # last_purchase_dt
    last_dt = np.full(refs.shape, np.datetime64("NaT"), dtype="datetime64[ns]")
    last_dt[valid_prev] = days[idx_prev[valid_prev]]

    # n_purchases_life (acumulado até a última compra <= t)
    n_life = np.zeros(refs.shape, dtype="int64")
    n_life[valid_prev] = cum_counts[idx_prev[valid_prev]]

    # next_purchase_dt
    next_dt = np.full(refs.shape, np.datetime64("NaT"), dtype="datetime64[ns]")
    next_dt[valid_next] = days[idx_next[valid_next]]

    # days_until_next (em dias)
    # (timedelta64[ns] -> dias inteiros)
    delta_next = (next_dt - refs).astype("timedelta64[D]").astype("float")
    # recency_days = (t - last_purchase_dt)
    delta_rec = (refs - last_dt).astype("timedelta64[D]").astype("float")

    # Monta DataFrame do cliente
    out = pd.DataFrame(
        {
            "fk_contact": cid,
            "ref_date": ref_dates,
            "days_until_next": pd.Series(delta_next).astype("Int64"),  # NaN vira <NA>
            "last_purchase_dt": pd.to_datetime(last_dt),
            "recency_days": pd.Series(delta_rec).astype("Int64"),
            "n_purchases_life": pd.Series(n_life, dtype="Int64"),
        }
    )
    out["censored"] = out["days_until_next"].isna().astype("Int64")

    # Rótulos (t, t+h]
    for h in horizons:
        col = f"y_buy_{h}d"
        out[col] = (
            out["days_until_next"].notna()
            & (out["days_until_next"] > 0)
            & (out["days_until_next"] <= h)
        ).astype("Int64")

    return out[final_cols]

In [28]:
# ---------- 5) LOOP STREAMING POR CLIENTE (APPEND NO CSV) ----------
# Para dar feedback, conte clientes processados
n_clients = groups.ngroups
print("Clientes a processar:", n_clients)

batch = []  # buffer para juntar resultados de alguns clientes
BATCH_ROWS = 100_000  # flush a cada ~100k linhas (ajuste conforme memória)

processed = 0
total_rows = 0

for cid, gdf in groups:
    out_c = build_labels_for_customer(
        cid,
        gdf,
        ref_freq=REF_FREQ,
        min_hist=MIN_HISTORY_DAYS,
        horizons=HORIZONS,
        obs_end=obs_end,
    )
    if len(out_c) == 0:
        processed += 1
        continue

    batch.append(out_c)
    total_rows += len(out_c)

    # flush por tamanho
    if sum(len(b) for b in batch) >= BATCH_ROWS:
        pd.concat(batch, ignore_index=True).to_csv(
            CSV_OUT, mode="a", header=False, index=False
        )
        batch = []

    processed += 1
    if processed % 1000 == 0:
        print(
            f"... {processed}/{n_clients} clientes; {total_rows:,} linhas rótulo até agora"
        )

# flush final
if batch:
    pd.concat(batch, ignore_index=True).to_csv(
        CSV_OUT, mode="a", header=False, index=False
    )

print(f"\nConcluído! Linhas totais (aprox.): {total_rows:,}")
print("Arquivo:", CSV_OUT.resolve())

Clientes a processar: 576365
... 1000/576365 clientes; 1,295,256 linhas rótulo até agora
... 2000/576365 clientes; 2,549,079 linhas rótulo até agora
... 3000/576365 clientes; 3,753,243 linhas rótulo até agora
... 4000/576365 clientes; 5,035,276 linhas rótulo até agora
... 5000/576365 clientes; 6,288,911 linhas rótulo até agora
... 6000/576365 clientes; 7,624,781 linhas rótulo até agora
... 7000/576365 clientes; 8,937,009 linhas rótulo até agora
... 8000/576365 clientes; 10,252,727 linhas rótulo até agora
... 9000/576365 clientes; 11,515,449 linhas rótulo até agora
... 10000/576365 clientes; 12,786,764 linhas rótulo até agora
... 11000/576365 clientes; 14,035,682 linhas rótulo até agora
... 12000/576365 clientes; 15,265,589 linhas rótulo até agora
... 13000/576365 clientes; 16,550,998 linhas rótulo até agora
... 14000/576365 clientes; 17,866,534 linhas rótulo até agora
... 15000/576365 clientes; 19,164,255 linhas rótulo até agora
... 16000/576365 clientes; 20,509,747 linhas rótulo até a

In [29]:
# ---------- 6) SANIDADE RÁPIDA (opcional: lê uma amostra) ----------
# Atenção: ler tudo pode ser pesado. Aqui, apenas head.
sample = pd.read_csv(CSV_OUT, nrows=10, parse_dates=["ref_date", "last_purchase_dt"])
print("\nAmostra:")
print(sample)


Amostra:
                                          fk_contact   ref_date  y_buy_7d  \
0  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-23         0   
1  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-24         0   
2  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-25         0   
3  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-26         0   
4  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-27         0   
5  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-28         0   
6  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-29         0   
7  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-30         0   
8  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-01-31         0   
9  0000029b76ad3cf9d86ad430754fb1d4478069affda61e... 2021-02-01         0   

   y_buy_30d      days_until_next  censored last_purchase_dt  recency_days  \
0          0 -9223372036854775808         0       2021-01-09    