
## Ingestão e Análise exploratória dos dados
Notebook PySpark para carregar, limpar e preparar o conjunto de dados para modelagem.

**Tempo de execução alvo:** Databricks Community Edition ou qualquer ambiente habilitado para Spark.
<br>**Saídas:** Volume `/Volumes/workspace/ifood_case/marketing_recommender/data/processed/sv_customer_offer_relationship.json`

## Setup

In [0]:
import os

import matplotlib.pyplot as plt
from pyspark.sql.functions import (
    avg,
    coalesce,
    col,
    count,
    when,
)
from pyspark.sql.functions import (
    sum as spark_sum,
)

## Load data

In [0]:
try:
    data_raw_path = (
        "/Workspace"
        + dbutils.notebook.entry_point.getDbutils()  # noqa: F821
        .notebook()
        .getContext()
        .notebookPath()
        .get()
        .split("/notebooks")[0]
        + "/data/raw"
    )
except Exception:
    data_raw_path = "../data/raw"

In [0]:
DATA_PATHS = {
    "offers": f"{data_raw_path}/offers.json",
    "customers": f"{data_raw_path}/profile.json",
    "transactions": f"{data_raw_path}/transactions.json",
}

try:
    datasets = {name: spark.read.json(path) for name, path in DATA_PATHS.items()}  # noqa: F821
except NameError:
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.getOrCreate()
    datasets = {name: spark.read.json(path) for name, path in DATA_PATHS.items()}

offers, customers, transactions = (
    datasets["offers"],
    datasets["customers"],
    datasets["transactions"],
)

## Análise exploratória dos dados
### Ofertas
- Não há nulos
- Necessário explodir channels, no entanto não encontrei a coluna de canal nas transações, dessa forma não sei como poderia fazer algum filtro.
- Não há valores extremos
- Hipótese: quando offer type é information o valor de desconto é zero.

### Clientes
- Necessário transformar o registered on para data
- Somente clientes adultos, idades parecem ok
- Cerca de 12% de valores nulos nos limites de cartão e gênero, necessário tratamento.
- Clientes com 118 anos é bem improvável, possível problema de qualidade de dados.

### Transactions
- Necessário explodir a coluna value. Além disso, há duas colunas de offer id, precisaremos fazer um coalesce para ajustar.
- Vários nulos no offer id, será necessário tratar quando for mergeado os datasets


In [0]:
for dataset in [offers, customers, transactions]:
    dataset.limit(5).display()
    dataset.summary().display()
    total_rows = dataset.count()

    null_counts = (
        dataset.select(
            [
                spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
                for c in dataset.columns
            ]
        )
        .collect()[0]
        .asDict()
    )

    print(f"Total de linhas: {total_rows}\n")
    for column, null_count in null_counts.items():
        percentage = (null_count / total_rows) * 100
        print(f"{column}: {null_count} nulls ({percentage:.2f}%)")

- 2175 clientes parecem ter problemas de dados, com idade de 118 anos, sem as demais informações com exceção da data de registro, ideal seria comunicação com demais equipes para entender o que isso significa e porque ocorre.

In [0]:
display(customers.filter(col("age") == 118).summary())

Ajuste de nomes para deixar claro o que são os ids.

In [0]:
customers = customers.dropna(subset=["id"]).withColumnRenamed("id", "customer_id")
offers = offers.dropna(subset=["id"]).withColumnRenamed("id", "offer_id")
transactions = transactions.withColumnRenamed("account_id", "customer_id")

Explodindo a coluna de values para retirar as informações de offer id, reward e amount.

In [0]:
transactions = (
    transactions.withColumn(
        "offer_id", coalesce(col("value.offer id"), col("value.offer_id"))
    )
    .withColumn("reward", col("value.reward"))
    .withColumn("amount", col("value.amount"))
)

### Transactions
- 56% das transações foram sem ofertas
- 54% não há valor da transação
- cerca de 10% teve reward, transformar valores nulos em zero (sem reward)
- Não há data que o evento aconteceu, fazendo com que métricas usando janelas seja inviável.

In [0]:
transactions.limit(5).display()
transactions.summary().display()
total_rows = transactions.count()
null_counts = (
    transactions.select(
        [
            spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
            for c in transactions.columns
        ]
    )
    .collect()[0]
    .asDict()
)
print(f"Total de linhas: {total_rows}\n")
for column, null_count in null_counts.items():
    percentage = (null_count / total_rows) * 100
    print(f"{column}: {null_count} nulls ({percentage:.2f}%)")

Temos muito mais transaction do que os demais eventos.

In [0]:
transactions.groupBy("event").count().orderBy("count", ascending=False).show()

há amount somente quando evento é transaction e só há reward quando o evento é offer completed. O que queremos é maximizar esse valor total, de amount + reward, mostrando ofertas relevantes ao usuário.

In [0]:
transactions.groupBy("event").agg(
    spark_sum("amount").alias("total_amount"), spark_sum("reward").alias("total_reward")
).orderBy("total_amount", ascending=False).show()

Há dois offers sem nenhum reward, considerando uma otimização por reward, eles provavelmente não serão previstos.

In [0]:
transactions.groupby("offer_id").agg(
    spark_sum(col("reward")).alias("total_spent")
).show()

In [0]:
events = transactions.select(
    "customer_id", "event", "time_since_test_start", "offer_id", "amount", "reward"
)

- Gerando métricas gerais por cliente e oferta.
- Vamos preencher nulos usando none para offer_id e usando NI (Não identificiado) no gênero.
- Assumindo que rewards é o valor gasto quando o cliente aceita uma oferta e amount é o valor gasto sem oferta.

In [0]:
cust_offer_metrics = (
    events.fillna({"offer_id": "none"})
    .groupBy("customer_id", "offer_id")
    .agg(
        avg(
            when(col("amount").isNull(), 0.0).otherwise(col("amount"))
            + when(col("reward").isNull(), 0.0).otherwise(col("reward"))
        ).alias("avg_customer_offer_spent"),
        count("*").alias("total_rows_by_offer"),
        count(
            when((col("amount").isNotNull()) | (col("reward").isNotNull()), True)
        ).alias("completed_rows_by_offer"),
    )
    .withColumn("take_rate", col("completed_rows_by_offer") / col("total_rows_by_offer"))
    .withColumn("expected_spent", col("avg_customer_offer_spent") * col("take_rate"))
)

In [0]:
cust_metrics = events.groupBy("customer_id").agg(
            spark_sum(
            when(col("amount").isNull(), 0.0).otherwise(col("amount"))
            + when(col("reward").isNull(), 0.0).otherwise(col("reward"))
        ).alias("customer_total_spent"),
        count("*").alias("total_customer_interactions"),
        count(
            when((col("amount").isNotNull()) | (col("reward").isNotNull()), True)
        ).alias("total_customer_buy"),
        avg(
            when(col("amount").isNull(), 0.0).otherwise(col("amount"))
            + when(col("reward").isNull(), 0.0).otherwise(col("reward"))
        ).alias("avg_customer_spent"),
        count("*").alias("customer_interactions")
)

In [0]:
customers = customers.fillna({"gender": "NI"}).withColumn(
    "registered_year", col("registered_on").substr(1, 4)
).withColumn("registered_year", col("registered_year").cast("int")).join(cust_metrics, "customer_id")

- Juntando os datasets.
- As ofertas que ficaram nulas, não houve oferta, serão preenchidos como "none"
- Os dados parecem fazer sentido, sem outliers.

In [0]:
dataset = cust_offer_metrics.join(customers, on="customer_id", how="left")
dataset = dataset.join(offers, on="offer_id", how="left")

dataset.limit(5).display()
dataset.summary().display()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

def add_is_best_offer(df):
    """
    Prepara a variável target: identifica a melhor oferta por cliente
    """    
    window = Window.partitionBy("customer_id").orderBy(col("expected_spent").desc())
    
    df_with_rank = df.withColumn("rank", row_number().over(window))
    
    best_offers = df_with_rank.filter(col("rank") == 1).select(
        col("customer_id"),
        col("offer_id").alias("best_offer_id"),
        col("expected_spent").alias("max_expected_spent")
    )
    
    df_with_target = df.join(best_offers, on="customer_id", how="left")
    
    df_with_target = df_with_target.withColumn(
        "is_best_offer",
        (col("offer_id") == col("best_offer_id")).cast("int")
    ) 
    
    return df_with_target

dataset = add_is_best_offer(dataset)

In [0]:
display(dataset.filter(col("best_offer_id").isNull()))

## Visualizando distribuições

In [0]:
numeric_cols = [
    field.name
    for field in dataset.schema.fields
    if str(field.dataType)
    in ["DoubleType()", "IntegerType()", "LongType()", "FloatType()"]
]
if numeric_cols:
    sample_df = dataset.select(numeric_cols).toPandas()
    fig, axes = plt.subplots(len(numeric_cols), 1, figsize=(8, 4 * len(numeric_cols)))
    if len(numeric_cols) == 1:
        axes = [axes]
    for ax, col in zip(axes, numeric_cols):
        sample_df[col].hist(ax=ax, bins=30)
        ax.set_title(f"Distribution of {col}")
        ax.set_xlabel(col)
        ax.set_ylabel("Frequency")
    plt.tight_layout()
    plt.show()
else:
    print("No numeric columns found in the dataset.")

In [0]:
offer_expected = (
    dataset.groupBy("offer_id")
    .agg(avg("expected_spent").alias("avg_expected_spent"))
    .orderBy("avg_expected_spent", ascending=False)
)

offer_expected_pd = offer_expected.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(offer_expected_pd["offer_id"], offer_expected_pd["avg_expected_spent"])
plt.xlabel("Offer ID")
plt.ylabel("Average Expected Output")
plt.title("Average Expected Output by Offer")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

Dataset altamente desbalanceado

In [0]:
dataset.select("customer_id", "best_offer_id").dropDuplicates().groupBy("best_offer_id").count().display()

In [0]:
OUTPUT_PATH = "/Volumes/workspace/ifood_case/marketing_recommender/data/processed/sv_customer_offer_relationship.json"

if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    dataset.write.format("json").mode("overwrite").save(OUTPUT_PATH)
else:
    dataset.toPandas().to_json(
        "../data/processed/sv_customer_offer_relationship.json",
        orient="records",
        lines=True,
    )