# Configuração do Ambiente

PySpark e importação das bibliotecas necessárias para o processamento dos dados.


In [1]:
import warnings
warnings.filterwarnings('ignore')

# Core libraries
from pathlib import Path
import sys

# PySpark and Utils
try:
    sys.path.append(str(Path.cwd().parent / "src"))
    from pyspark.sql.functions import *
    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    from pyspark.sql.window import Window
    from utils import create_spark_session, load_json_data, display_data_info_spark
except ImportError as e:
    print(f"Import PySpark Error : {e}")    
    raise



# PySpark e configuração dos paths

In [2]:
# Criando Spark
spark = create_spark_session()

# Configuração de paths
project_root = Path.cwd().parent
data_raw_path = project_root / "data" / "raw"
data_processed_path = project_root / "data" / "processed"

print(f"project_root: {project_root}")
print(f"data_raw_path: {data_raw_path}")
print(f"data_processed_path: {data_processed_path}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/13 08:35:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/13 08:35:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


project_root: /Users/viniciusromano/vinicius/ifood-case
data_raw_path: /Users/viniciusromano/vinicius/ifood-case/data/raw
data_processed_path: /Users/viniciusromano/vinicius/ifood-case/data/processed


# Carregamento dos Dados

Carregamento dos três datasets principais:
- **offers.json**: Metadados das ofertas (tipos, valores, canais)
- **profile.json**: Perfis dos clientes (~17k registros)
- **transactions.json**: Eventos e transações (~300k registros)


In [3]:
offers_df_raw = load_json_data(data_raw_path / "offers.json", spark)
profile_df_raw  = load_json_data(data_raw_path / "profile.json", spark)
transactions_df_raw = load_json_data(data_raw_path / "transactions.json", spark)


Carregando: /Users/viniciusromano/vinicius/ifood-case/data/raw/offers.json
Carregando: /Users/viniciusromano/vinicius/ifood-case/data/raw/profile.json
Carregando: /Users/viniciusromano/vinicius/ifood-case/data/raw/transactions.json


In [4]:
display_data_info_spark(offers_df_raw, "OFFERS")
display_data_info_spark(profile_df_raw, "PROFILE")
display_data_info_spark(transactions_df_raw, "TRANSACTIONS")


INFORMATIONS - OFFERS
📏 Row Count: (10)
🔍 Columns: ['channels', 'discount_value', 'duration', 'id', 'min_value', 'offer_type']
root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- discount_value: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- min_value: long (nullable = true)
 |-- offer_type: string (nullable = true)

First 3 rows:
+----------------------------+--------------+--------+--------------------------------+---------+-------------+
|channels                    |discount_value|duration|id                              |min_value|offer_type   |
+----------------------------+--------------+--------+--------------------------------+---------+-------------+
|[email, mobile, social]     |10            |7.0     |ae264e3637204a6fb9bb56bc8210ddfd|10       |bogo         |
|[web, email, mobile, social]|10            |5.0     |4d5c57ea9a6940dd891ad53e9dbe8da0|10       |bogo         |
|[web,

# Transformação dos Dados - "Offers"
- "One-hot encoding" de dos tipos de ofertas e canais

In [5]:
offers_df = offers_df_raw.withColumnRenamed("id", "offer_id")

offers_df = offers_df \
       .withColumn("offer_type_discount", when(col("offer_type") == "discount", 1).otherwise(0)) \
       .withColumn("offer_type_bogo", when(col("offer_type") == "bogo", 1).otherwise(0)) \
       .withColumn("offer_type_informational", when(col("offer_type") == "informational", 1).otherwise(0)) \
       .withColumn("channel_web", when(array_contains(col("channels"), "web"), 1).otherwise(0)) \
       .withColumn("channel_email", when(array_contains(col("channels"), "email"), 1).otherwise(0)) \
       .withColumn("channel_mobile", when(array_contains(col("channels"), "mobile"), 1).otherwise(0)) \
       .withColumn("channel_social", when(array_contains(col("channels"), "social"), 1).otherwise(0)) \
       .drop("channels", "offer_type")

display_data_info_spark(offers_df, "OFFERS")


INFORMATIONS - OFFERS
📏 Row Count: (10)
🔍 Columns: ['discount_value', 'duration', 'offer_id', 'min_value', 'offer_type_discount', 'offer_type_bogo', 'offer_type_informational', 'channel_web', 'channel_email', 'channel_mobile', 'channel_social']
root
 |-- discount_value: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- offer_id: string (nullable = true)
 |-- min_value: long (nullable = true)
 |-- offer_type_discount: integer (nullable = false)
 |-- offer_type_bogo: integer (nullable = false)
 |-- offer_type_informational: integer (nullable = false)
 |-- channel_web: integer (nullable = false)
 |-- channel_email: integer (nullable = false)
 |-- channel_mobile: integer (nullable = false)
 |-- channel_social: integer (nullable = false)

First 3 rows:
+--------------+--------+--------------------------------+---------+-------------------+---------------+------------------------+-----------+-------------+--------------+--------------+
|discount_value|duration|offer_id    

# Transformação dos Dados - "Profile"

-	"One-hot encoding" de genero → colunas gender_F, gender_M, gender_O.
-	Converte registered_on para data
-	Trata valores inválidos em idade (118 → null).

In [6]:


profile_df = profile_df_raw.withColumnRenamed("id", "profile_id")

profile_df = profile_df \
       .withColumn("gender_F", when(col("gender") == "F", 1).otherwise(0)) \
       .withColumn("gender_M", when(col("gender") == "M", 1).otherwise(0)) \
       .withColumn("gender_O", when(col("gender") == "O", 1).otherwise(0)) \
       .drop("gender")

profile_df = profile_df \
    .withColumn("registered_on", to_date("registered_on", "yyyyMMdd")) \
    .withColumn("days_since_registration", datediff(current_date(), "registered_on")) \
    .drop("registered_on")

profile_df = profile_df \
    .withColumn("age", when(col("age") == 118, lit(None)).otherwise(col("age")))

display_data_info_spark(profile_df, "OFFERS")



INFORMATIONS - OFFERS
📏 Row Count: (17000)
🔍 Columns: ['age', 'credit_card_limit', 'profile_id', 'gender_F', 'gender_M', 'gender_O', 'days_since_registration']
root
 |-- age: long (nullable = true)
 |-- credit_card_limit: double (nullable = true)
 |-- profile_id: string (nullable = true)
 |-- gender_F: integer (nullable = false)
 |-- gender_M: integer (nullable = false)
 |-- gender_O: integer (nullable = false)
 |-- days_since_registration: integer (nullable = true)

First 3 rows:
+----+-----------------+--------------------------------+--------+--------+--------+-----------------------+
|age |credit_card_limit|profile_id                      |gender_F|gender_M|gender_O|days_since_registration|
+----+-----------------+--------------------------------+--------+--------+--------+-----------------------+
|NULL|NULL             |68be06ca386d4c31939f3a4f0e3dd783|0       |0       |0       |3104                   |
|55  |112000.0         |0610b486422d4921ae7d2bf64640c50b|1       |0       |0 

# Transformação dos Dados - "Transactions"
- Adicionada variável para visão de fúnil para eventos
- Tratamento dos "offer id"


In [7]:
transactions_df =  transactions_df_raw \
    .withColumnRenamed("account_id", "profile_id")

transactions_df = transactions_df \
    .select("*" , col("value.*")) \
    .withColumn("offer_id", coalesce(col("offer_id"), col("offer id"))) \
    .drop("value", "offer id")

# sorting df
transactions_df = transactions_df.withColumn("funnel_step", 
    when(col("event") == "offer received", 0)
    .when(col("event") == "offer viewed", 1)
    .when(col("event") == "offer completed", 2)
    .when(col("event") == "transaction", 3)
)


display_data_info_spark(transactions_df, "PROCESSED_TRANSACTIONS")



INFORMATIONS - PROCESSED_TRANSACTIONS
📏 Row Count: (306534)
🔍 Columns: ['profile_id', 'event', 'time_since_test_start', 'amount', 'offer_id', 'reward', 'funnel_step']
root
 |-- profile_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- time_since_test_start: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- offer_id: string (nullable = true)
 |-- reward: double (nullable = true)
 |-- funnel_step: integer (nullable = true)

First 3 rows:
+--------------------------------+--------------+---------------------+------+--------------------------------+------+-----------+
|profile_id                      |event         |time_since_test_start|amount|offer_id                        |reward|funnel_step|
+--------------------------------+--------------+---------------------+------+--------------------------------+------+-----------+
|78afa995795e4d85b5d9ceeca43f5fef|offer received|0.0                  |NULL  |9b98b8c7a33c4b65b9aebfe6a799e6d9|NULL  |0        

In [8]:
processed_transactions_df = transactions_df \
    .join(profile_df, on="profile_id", how="left") \
    .join(offers_df, on="offer_id", how="left")


display_data_info_spark(processed_transactions_df, "PROCESSED_TRANSACTIONS")


INFORMATIONS - PROCESSED_TRANSACTIONS
📏 Row Count: (306534)
🔍 Columns: ['offer_id', 'profile_id', 'event', 'time_since_test_start', 'amount', 'reward', 'funnel_step', 'age', 'credit_card_limit', 'gender_F', 'gender_M', 'gender_O', 'days_since_registration', 'discount_value', 'duration', 'min_value', 'offer_type_discount', 'offer_type_bogo', 'offer_type_informational', 'channel_web', 'channel_email', 'channel_mobile', 'channel_social']
root
 |-- offer_id: string (nullable = true)
 |-- profile_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- time_since_test_start: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- reward: double (nullable = true)
 |-- funnel_step: integer (nullable = true)
 |-- age: long (nullable = true)
 |-- credit_card_limit: double (nullable = true)
 |-- gender_F: integer (nullable = true)
 |-- gender_M: integer (nullable = true)
 |-- gender_O: integer (nullable = true)
 |-- days_since_registration: integer (nullable = true)
 |

# Criação de "Windows" para criar novas variáveis para inferência causal
- Particionar por cliente
- Ordenação pelo tempo de iniciação do teste, etapa no funil e offer_id para facilitar o processamento na linha do tempo.

In [9]:
# Janela principal para cálculos cumulativos dentro de cada perfil de cliente.
window_spec_cumulative = Window.partitionBy("profile_id") \
                               .orderBy("time_since_test_start", "funnel_step", "offer_id") \
                               .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Janela auxiliar para acessar a linha anterior (lag) ou seguinte (lead).
window_spec_lead_lag = Window.partitionBy("profile_id") \
                           .orderBy("time_since_test_start", "funnel_step", "offer_id")

# Janela para propagar o último valor válido (forward-fill).
window_ffill = Window.partitionBy("profile_id") \
                     .orderBy("time_since_test_start", "funnel_step", "offer_id") \
                     .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Janela para cálculos dentro do escopo de uma oferta específica para um cliente.
window_spec_offer = Window.partitionBy("profile_id", "offer_id")

# Criação de variáveis auxiliares
- **Criação de coluna intermediária `is_offer_driven_transaction`**  
  - Marca transações (`event = "transaction"`) que ocorrem no mesmo instante (`time_since_test_start`) e logo após (`lag`) um `offer completed`.

- **Criação de flag `offer_generated_transaction`**  
  - Marca eventos `offer completed` que são seguidos (`lead`) por uma transação no mesmo instante de tempo.

- **Propagação do valor `amount`**  
  - Para linhas `offer completed` com `offer_generated_transaction = 1`, copia o `amount` da transação vinculada (próxima linha).

In [10]:
# is_offer_driven_transaction

df_intermediate = processed_transactions_df.withColumn("prev_event", F.lag("event", 1).over(window_spec_lead_lag)) \
                                  .withColumn("prev_time", F.lag("time_since_test_start", 1).over(window_spec_lead_lag))

df_intermediate = df_intermediate.withColumn(
    "is_offer_driven_transaction",
    F.when(
        (F.col("event") == "transaction") &
        (F.col("prev_event") == "offer completed") &
        (F.col("time_since_test_start") == F.col("prev_time")),
        1
    ).otherwise(0)
)

#  offer_generated_transaction

df_intermediate = df_intermediate.withColumn("next_event", F.lead("event", 1).over(window_spec_lead_lag))
df_intermediate = df_intermediate.withColumn("next_time", F.lead("time_since_test_start", 1).over(window_spec_lead_lag))

df_intermediate = df_intermediate.withColumn(
    "offer_generated_transaction",
    F.when(
        (F.col("event") == "offer completed") &
        (F.col("next_event") == "transaction") &
        (F.col("time_since_test_start") == F.col("next_time")),
        1
    ).otherwise(0)
)


# next_amount
df_intermediate = df_intermediate.withColumn("next_amount", F.lead("amount", 1).over(window_spec_lead_lag))
df_intermediate = df_intermediate.withColumn(
    "amount",
    F.when(
        (F.col("event") == "offer completed") & (F.col("offer_generated_transaction") == 1),
        F.col("next_amount")
    ).otherwise(F.col("amount"))
)



# Lógica para Contagem de Sequências (Streaks)

- **Criação de `transaction_type`**  
  - Define `1` para transações geradas por oferta, `0` para transações orgânicas e `null` para não-transações.

- **Identificação de grupos de sequência (`streak_group`)**  
  - Marca mudanças com `streak_start_flag` e acumula para formar o identificador do grupo.

- **Contagem de transações por grupo (`streak_counter`)**  
  - Conta o número de transações dentro de cada grupo, ordenadas por tempo e outras chaves de ordenação.

- **Definição de contadores temporários**  
  - `offer_streak_temp`: contador de sequência para ofertas (zera para transações orgânicas).  
  - `organic_streak_temp`: contador de sequência para orgânicas (zera para transações de oferta).

- **Propagação dos contadores**  
  - Preenche (`ffill`) valores para linhas não-transação, fazendo eventos como `offer completed` herdarem o estado da última transação.

In [11]:

# transaction_type
df_with_streaks = df_intermediate.withColumn(
    "transaction_type",
    F.when(F.col("event") == "transaction",
        F.when(F.col("is_offer_driven_transaction") == 1, 1).otherwise(0)
    ).otherwise(None)
)

# streak_group

df_with_streaks = df_with_streaks.withColumn("last_valid_transaction_type", F.last("transaction_type", ignorenulls=True).over(window_ffill))
df_with_streaks = df_with_streaks.withColumn("prev_last_valid_transaction_type", F.lag("last_valid_transaction_type", 1).over(window_spec_lead_lag))
df_with_streaks = df_with_streaks.withColumn(
    "streak_start_flag",
    F.when(F.col("last_valid_transaction_type") != F.col("prev_last_valid_transaction_type"), 1).otherwise(0)
)
df_with_streaks = df_with_streaks.withColumn("streak_group", F.sum("streak_start_flag").over(window_spec_cumulative))

# streak_counter

window_spec_streak = Window.partitionBy("profile_id", "streak_group").orderBy("time_since_test_start", "funnel_step", "offer_id")
df_with_streaks = df_with_streaks.withColumn(
    "streak_counter",
    F.sum(F.when(F.col("transaction_type").isNotNull(), 1).otherwise(0)).over(window_spec_streak)
)

# offer_streak_temp e organic_streak_temp

df_with_streaks = df_with_streaks.withColumn(
    "offer_streak_temp",
    F.when(F.col("transaction_type") == 1, F.col("streak_counter"))
     .when(F.col("transaction_type") == 0, 0)
     .otherwise(None)
).withColumn(
    "organic_streak_temp",
    F.when(F.col("transaction_type") == 0, F.col("streak_counter"))
     .when(F.col("transaction_type") == 1, 0)
     .otherwise(None)
)

# offer_streak_counter e organic_streak_counter

df_with_streaks = df_with_streaks.withColumn(
    "offer_streak_counter",
    F.coalesce(F.last("offer_streak_temp", ignorenulls=True).over(window_ffill), F.lit(0))
).withColumn(
    "organic_streak_counter",
    F.coalesce(F.last("organic_streak_temp", ignorenulls=True).over(window_ffill), F.lit(0))
)

# Criação das Colunas Finais (Cumulativas e de Cálculo)

- **Contadores cumulativos**  
  - `cumulative_transactions`: total de transações.  
  - `cumulative_offer_driven_transactions`: total de transações geradas por oferta.  
  - `cumulative_organic_transactions`: total de transações orgânicas (`transações - por oferta`).  
  - `cumulative_offers_received`: total de ofertas recebidas.  
  - `cumulative_offers_viewed`: total de ofertas visualizadas.

- **Cálculos relacionados a ofertas**  
  - `offer_received_time`: tempo da primeira ocorrência de `offer received` no grupo da oferta.  
  - `days_to_complete_offer`: diferença de tempo entre `offer received` e `offer completed`.

- **Valores financeiros cumulativos**  
  - `cumulative_amount`: soma acumulada de `amount` (incluindo valores propagados para `offer completed`).  
  - `cumulative_reward`: soma acumulada de `reward`.  
  - `reward_to_spend_ratio`: proporção entre recompensa acumulada e valor gasto acumulado.

In [12]:
final_df = df_with_streaks.withColumn(
    "cumulative_transactions", F.sum(F.when(F.col("event") == "transaction", 1).otherwise(0)).over(window_spec_cumulative)
).withColumn(
    "cumulative_offer_driven_transactions", F.sum("is_offer_driven_transaction").over(window_spec_cumulative)
).withColumn(
    "cumulative_organic_transactions", F.col("cumulative_transactions") - F.col("cumulative_offer_driven_transactions")
).withColumn(
    "cumulative_offers_received", F.sum(F.when(F.col("event") == "offer received", 1).otherwise(0)).over(window_spec_cumulative)
).withColumn(
    "cumulative_offers_viewed", F.sum(F.when(F.col("event") == "offer viewed", 1).otherwise(0)).over(window_spec_cumulative)
).withColumn(
    "offer_received_time", F.first(F.when(F.col("event") == "offer received", F.col("time_since_test_start")), ignorenulls=True).over(window_spec_offer)
).withColumn(
    "days_to_complete_offer", F.when(F.col("event") == "offer completed", F.col("time_since_test_start") - F.col("offer_received_time")).otherwise(None)
).withColumn(
    # A soma cumulativa do 'amount' agora incluirá corretamente o valor na linha 'offer completed'.
    "cumulative_amount", F.sum("amount").over(window_spec_cumulative)
).withColumn(
    "cumulative_reward", F.sum("reward").over(window_spec_cumulative)
).withColumn(
    "reward_to_spend_ratio", F.when((F.col("cumulative_amount").isNotNull()) & (F.col("cumulative_amount") > 0), F.col("cumulative_reward") / F.col("cumulative_amount")).otherwise(0.0)
)

In [13]:
final_df = final_df.orderBy("profile_id", "time_since_test_start", "funnel_step", "offer_id")

final_df.show(30, truncate=False)

25/08/13 08:36:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 39:>                                                         (0 + 8) / 8]

+--------------------------------+--------------------------------+---------------+---------------------+------+------+-----------+----+-----------------+--------+--------+--------+-----------------------+--------------+--------+---------+-------------------+---------------+------------------------+-----------+-------------+--------------+--------------+---------------+---------+---------------------------+---------------+---------+---------------------------+-----------+----------------+---------------------------+--------------------------------+-----------------+------------+--------------+-----------------+-------------------+--------------------+----------------------+-----------------------+------------------------------------+-------------------------------+--------------------------+------------------------+-------------------+----------------------+------------------+-----------------+---------------------+
|offer_id                        |profile_id                      |eve

                                                                                

# Seleção do Estado Final de Cada Oferta

- **Filtragem inicial**  
  - Mantém apenas registros com `offer_id` válido, removendo transações não associadas a ofertas.

- **Definição de janela (`window_spec_last_event`)**  
  - Particiona por `profile_id` e `offer_id`.  
  - Ordena de forma descendente por `funnel_step` e `time_since_test_start`.

- **Atribuição de ranking (`rank`)**  
  - Usa `row_number()` para numerar eventos dentro de cada grupo.  

- **Seleção do estado final da oferta**  
  - Filtra para manter apenas linhas com `rank = 1`, resultando em uma única entrada por `(profile_id, offer_id)`.

In [14]:
final_df = final_df.where(F.col("offer_id").isNotNull())

window_spec_last_event = Window.partitionBy("profile_id", "offer_id") \
                               .orderBy(F.col("funnel_step").desc(), F.col("time_since_test_start").desc())

final_df_ranked = final_df.withColumn(
    "rank", F.row_number().over(window_spec_last_event))

offers_processed = final_df_ranked.where(F.col("rank") == 1).drop("rank")

# Exibição e Limpeza do Resultado Final

- **Remoção de colunas auxiliares**  
  - Exclui variáveis intermediárias e de controle, como `event`, `prev_event`, `streak_group`, entre outras.

- **Ajuste de formatação**  
  - Arredonda `cumulative_amount` para 2 casas decimais.  
  - Arredonda `reward_to_spend_ratio` para 4 casas decimais.

- **Resultado**  
  - DataFrame final possui uma linha por `(profile_id, offer_id)`.  
  - Contém apenas o último estado da oferta, sem registros de transações orgânicas.

In [17]:
columns_to_drop = ["event", "reward", "prev_event", "prev_time", "next_event", "next_time", "next_amount", "transaction_type", "streak_group", "streak_counter", "last_valid_transaction_type", "prev_last_valid_transaction_type", "streak_start_flag", "is_offer_driven_transaction", "offer_streak_temp", "organic_streak_temp"]
offers_processed = offers_processed \
    .drop(*columns_to_drop)\
    .withColumn("cumulative_amount", F.round(F.col("cumulative_amount"), 2)) \
    .withColumn("reward_to_spend_ratio", F.round(F.col("reward_to_spend_ratio"), 4))


offers_processed.count()
# offers_processed.show(50, truncate=False)

                                                                                

63288

In [16]:
offers_processed.write.mode("overwrite").parquet("../data/processed/offers_processed.parquet")

                                                                                