# Inicialização

## Carregando dependências

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import col, to_date, datediff, current_date, explode, array_contains, sum
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce
import pandas as pd
import os
import sys
sys.path.append(os.path.dirname(os.getcwd()))

from pyspark.sql.window import Window


## Funções

In [0]:
def generate_histogram(
    df: pd.DataFrame(),
    column: str,
    title: str,
    eixo_x: str,
    hue: str=None
) -> None:
    '''
    Info:
        Funcao usada para gerar um histograma da coluna desejada.
    ----------
    Input:
        df: Dataset contendo os dados a serem analisado.
        column: Coluna a ser utilizada no grafico.
        title: Titulo da imagem.
        eixo_x: Titulo do eixo X da imagem.
        hue: Coluna para ser utilizada em caso de analise por diferentes segmentos.
    ----------
    Output:
        None.
    '''
    fig, ax = plt.subplots(figsize = [8, 6])
    if hue:
        sns.histplot(data = df, x = column, hue=hue)
    else:
        sns.histplot(data = df, x = column)

    # Ajustando o gráfico
    ax.set_title(title)
    ax.set_xlabel(column.capitalize())
    ax.set_ylabel('Frequência')
    ax.set_xlabel(eixo_x)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.grid(axis='y', visible=True, alpha = 0.7)


## Carregando os dados

In [0]:
# df_offers = spark.read.format("json").load("ifood-case/data/raw/offers.json")
# df_customers = spark.read.format("json").load("ifood-case/data/raw/profile.json")
# df_transactions = spark.read.format("json").load("ifood-case/data/raw/transactions.json")

df_offers = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/killbjade@gmail.com/offers.json")
df_customers = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/killbjade@gmail.com/profile.json")
df_transactions = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/killbjade@gmail.com/transactions.json")

# Pré-processamento dos dados

## Base customers

In [0]:
display(df_customers)

In [0]:
# Quantidade de clientes unicos
print(f"Quantidade de amostras na base de dados: {df_customers.count()}")
print(f"Quantidade de IDs únicos na base de dados: {df_customers.select('id').distinct().count()}")

Não existem usuários duplicados na base de dados

### Age

In [0]:
# Análise da idade
# Hipótese: Idade 118 parece ser um erro no banco de dados - valor fora do normal acompanhado de valores nulos nos demais campos
df_plot_customers = df_customers.toPandas()

generate_histogram(
    df=df_plot_customers,
    column='age',
    title='Frequência das idades',
    eixo_x='Idade'
)


Pelo gráfio os registros com idade 118 parecem de fato um problema no banco de dados e serão removidos

In [0]:
# Filtrando a base de dados
df_customers_age_clean = df_customers.filter(col('age') < 118)

# Conferindo os valores nulos após o filtro
print(f'Número de amostras: {df_customers_age_clean.count()}')
print(f"Número de amostras com nulos: {df_customers_age_clean.filter(col('gender').isNull()).count()}")

In [0]:
# Analisando as idades por genero
generate_histogram(
    df=df_plot_customers,
    column='age',
    title='Frequência das idades',
    eixo_x='Idade',
    hue='gender'
)

In [0]:
# Extraindo o tempo em que os usuários estão registrados
df_customers_reg_date = (
    df_customers_age_clean
    .withColumn("registered_on", to_date("registered_on", "yyyyMMdd"))
    .withColumn("reg_age_in_days", datediff(current_date(), "registered_on"))
    .drop('registered_on')
)
display(df_customers_reg_date)

In [0]:
# Analisando os tempos de registro
df_plot_age_reg = df_customers_reg_date.toPandas()
generate_histogram(
    df=df_plot_age_reg,
    column='reg_age_in_days',
    title='Frequência dos tempos de registro',
    eixo_x='Tempo de registro (em dias)',
    hue='gender'
)

Em geral, as contas mais antigas são de usuários do gênero Masculino

In [0]:
# Analisando os limites do cartão
generate_histogram(
    df=df_plot_age_reg,
    column='credit_card_limit',
    title='Frequência dos limites de cartão',
    eixo_x='Limite do cartão',
    hue='gender'
)

Existem mais usuários com gênero Feminino que possuem maior limite no cartão

In [0]:
# Aplicando One-hot-encoding na coluna gender
unique_genders = [row.gender for row in df_customers_reg_date.select(F.col('gender')).distinct().collect()]
for gender in unique_genders:
    df_customers_reg_date = (
        df_customers_reg_date
        .withColumn(
            f'gender_{gender}', 
            F.when(F.col('gender') == gender, 1)
            .otherwise(0)
        )
    )

# Removendo a coluna de genero
df_customers_gender_oh = df_customers_reg_date.drop('gender')
display(df_customers_gender_oh)

## Base offers

In [0]:
display(df_offers)

### Channels

In [0]:
# Criando uma coluna com número total de canais pelos quais a oferta foi vinculada
df_offers = df_offers.withColumn("num_channels", F.size(F.col("channels")))

In [0]:
# Aplicando one-hot-encoding na coluna channels
# Obtendo os possívels valores do canais
unique_channels = [row.value for row in df_offers.select(explode(col("channels")).alias("value")).distinct().collect()]

# Criando as novas colunas
for channel in unique_channels:
    df_offers = df_offers.withColumn(
        f"channel_{channel}",
        array_contains(col("channels"), channel).cast("integer")
    )

# Excluindo a coluna original
df_offers_clean = df_offers.drop('channels')

display(df_offers_clean)

### Offer_type

In [0]:
# Criando manualmente One-Hot Encoding para o tipo de oferta
unique_offer_types = [row.offer_type for row in df_offers_clean.select(F.col('offer_type')).distinct().collect()]
for offer_type in unique_offer_types:
    df_offers_clean = (
        df_offers_clean
        .withColumn(
            f'offer_type_{offer_type}', 
            F.when(F.col('offer_type') == offer_type, 1)
            .otherwise(0)
        )
    )

# Excluindo a coluna original
df_offers_clean = df_offers_clean.drop('offer_type')

display(df_offers_clean)

## Base transactions

In [0]:
display(df_transactions)

### Value

In [0]:
# Extraindo as informações do campo values
df_transactions_extracted = (
    df_transactions
    .withColumn("amount", col("value.amount"))
    .withColumn("offer_id_original", col("value.offer id"))
    .withColumn("offer_id_null", col("value.offer_id"))
    .withColumn("reward", col("value.reward"))
    .drop('value')
)

display(df_transactions_extracted)

In [0]:
#Análise em um único cliente para entender o fluxo de transação
cliente = df_transactions_extracted.filter(col('account_id')=='0861b9ca31b741bb8b411b18f82d18f6')
display(cliente)

In [0]:
# Adicionando a informação de disconto utilizados no registro de transação
df_transactions_extracted = (
    df_transactions_extracted.filter(col('event') != 'offer completed').drop('reward', 'offer_id_null')
    .join(
        df_transactions_extracted.filter(col('event') == 'offer completed').select('account_id', 'reward', 'offer_id_null', 'time_since_test_start'),
        how = 'left',
        on = ['account_id', 'time_since_test_start']
    )
)

#Conferindo no mesmo cliente
display(df_transactions_extracted.filter(col('account_id')=='0861b9ca31b741bb8b411b18f82d18f6'))

In [0]:
# Foi observado que offer_id_null apresenta o id da oferta somente quando há transação completada. Desse modo, juntou-se as colunas de offer_id

# Cria a nova coluna 'offer_id' pegando o primeiro valor não nulo entre 'offer_id_original' e 'offer_id_null' e apaga coluna original
df_transactions_formatted = (
    df_transactions_extracted
    .withColumn("offer_id", coalesce("offer_id_original", "offer_id_null"))
    .drop("offer_id_original", "offer_id_null")
)

#Conferindo no mesmo cliente
# cliente = df_transactions_formatted.filter(col('account_id')=='0861b9ca31b741bb8b411b18f82d18f6')
display(cliente)

## Juntando as bases de dados

In [0]:
df_join = (
    df_transactions_formatted
    .join(
        df_offers_clean.withColumnRenamed('id', 'offer_id'),
        how='left',
        on='offer_id'
    )
    .join(
        df_customers_gender_oh.withColumnRenamed('id', 'account_id'),
        how='left',
        on='account_id'
    )
)

display(df_join.orderBy('account_id', 'time_since_test_start'))

In [0]:
# Removendo amostras com informações de conta nulos
df_join_clean = df_join.filter(col('age').isNotNull())
display(df_join_clean)

## Feature Engineering

Criação de variáveis que podem ser úteis na modelagem.


total_transactions_client = numero total de transacoes feitas pelo cliente ate a transação atual

total_spent_client = valor total gasto pelo cliente em transacoes ate ate a transação atual

offer_completion_rate_client = taxa de completude de ofertas do cliente (numero de ofertas completadas/numero de ofertas recebidas) até o evento

days_since_last_transaction = dias desde a última transacao do cliente ate o evento

offer_historical_completion_rate = taxa histórica de completude para essa oferta especifica

min_value_vs_avg_transaction_client = relacao entre o valor minimo da oferta e o valor médio de transação do cliente (total_spent_client/total_transactions_client)

In [0]:
# 1. Definição da Janela
# Particionando por 'account_id' para que a contagem seja por cliente.
# Ordenando por 'time_since_test_start' para garantir a ordem cronológica.
# 'rowsBetween(Window.unboundedPreceding, Window.currentRow)' define que a janela
# inclui todas as linhas desde o início da partição até a linha atual.

window_spec = Window.partitionBy("account_id").orderBy("time_since_test_start").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Criação das features descritas acima
df_feat_eng = (
    df_join_clean

    ### Calculos de features ao longo do tempo ###
    # Numero de transacoes feitas
    .withColumn(
        "hist_total_transactions",
        F.sum(F.when(F.col("event") == "transaction", 1).otherwise(0)).over(window_spec)
    )
    # Valor gasto com transacoes
    .withColumn(
        'hist_total_spent',
        F.sum(F.when(F.col("event") == "transaction", F.col("amount")).otherwise(0.0)).over(window_spec)
    )
    # Numero de cupons usados
    .withColumn(
        'hist_total_offer_used',
        F.sum(F.when(F.col("reward") > 0, 1).otherwise(0)).over(window_spec)
    )
    # Valor economizado com cupons
    .withColumn(
        'hist_total_offer_amount_used',
        F.sum(F.when(F.col("reward") > 0, F.col("amount")).otherwise(0.0)).over(window_spec)
    )
    # Numero de ofertas recebidas
    .withColumn(
        'hist_total_offer_received',
        F.sum(F.when(F.col("event") == "offer received", 1).otherwise(0.0)).over(window_spec)
    )
    # Dias desde a ultima transacao
    .withColumn(
        'last_transaction_time',
        F.last(F.when(F.col("event") == "transaction", F.col("time_since_test_start")), ignorenulls=True).over(window_spec)
    )
    .withColumn(
        'days_sice_last_transaction',
        coalesce(F.col('time_since_test_start') - F.col('last_transaction_time'), F.col('time_since_test_start'))
    )
    # Media de valor gasto em transacoes
    .withColumn(
        'hist_avg_amount_spent',
        F.col('hist_total_spent')/F.col('hist_total_transactions')
    )

    ### Calculos de taxas e relacoes ###
    # Taxa de uso de ofertas
    .withColumn(
        'offer_usage_rate',
        F.col('hist_total_offer_used')/F.col('hist_total_offer_received')
    )

    # Relacao entre transacoes e ofertas usados
    .withColumn(
        'trx_offer_usage_rate',
        F.col('hist_total_offer_used')/F.col('hist_total_transactions')
    )
    # Relacao entre dinheiro gasto e dinheiro economizado com ofertas
    .withColumn(
        'offer_money_saved_rate',
        F.col('hist_total_spent')/F.col('hist_total_offer_amount_used')
    )
    # Relacao entre o valor minimo da oferta e a media do valor gasto em transcoes
    .withColumn(
        'min_amount_offer_by_avg_spent_rate',
        F.col('min_value')/F.col('hist_avg_amount_spent')
    )

    # Removendo colunas
    .drop('last_transaction_time', 'reward')

    # Ordenando os dados
    .orderBy("account_id", "time_since_test_start")
)

# Limpandos nulos
for null_column in [c for c in df_feat_eng.columns if c != 'offer_id']:
    df_feat_eng = df_feat_eng.fillna({null_column: 0})

# Mostra o DataFrame com a nova coluna
display(df_feat_eng)

In [0]:
# Limpando o dataset
df_clean = (
    df_feat_eng
    .filter(F.col('event') == 'transaction')
    .drop(
        'event'
    )
)

display(df_clean)

## Salvando os dados processados

In [0]:
df_clean.write.mode("overwrite").json("dbfs:/FileStore/shared_uploads/killbjade@gmail.com/data_processed.json")
# df_clean.write.mode("overwrite").json("ifood-case/data/processed/data_processed.json")