## 4. Transformação de Features de Baseline - Camada Gold


O dataset contém:

* `TRANSACTION_ID`: ID único da transação.
* `TX_DATETIME`: Data e hora da transação.
* `CUSTOMER_ID`: ID do cliente que fez a transação.
* `TERMINAL_ID`: ID do terminal (maquininha de cartão) onde a transação ocorreu.
* `TX_AMOUNT`: Valor da transação.
* `TX_FRAUD`: Label da transação → 1 se for fraude, 0 se for legítima.
* `TX_FRAUD_SCENARIO`:  Label para os tipos de fraudes 


Ou seja, a base é composta por transações simuladas, cada uma com informações mínimas para análise e detecção de fraudes.


Cenário de Fraude (coluna `TX_FRAUD_SCENARIO`)
* 1	Fraudes com valores acima de R$220 — detecção simples baseada em threshold.
* 2	Terminais comprometidos por 28 dias — exige análise temporal por terminal.
* 3	Cartões clonados por 14 dias com transações infladas — exige análise de padrão de gasto por cliente.

### imports


In [0]:
from pyspark.sql.functions import col, to_timestamp, date_format, hour, when, unix_timestamp, count, avg, desc, sum as spark_sum
from pyspark.sql.window import Window


In [0]:
transactions = spark.sql("""
                          SELECT * FROM workspace.fraud_detection.transaction_silver
                          """)


In [0]:
transactions.show()

In [0]:
transactions.printSchema()

### 4.1. Feature Engineering

> **Objetivo**: Criar variáveis (features) enriquecidas com base nas tabelas silver para preparar os dados para um modelo de detecção de fraudes.

#### 4.1.1. transformação de features a partir da coluna `TX_DATETIME`


1. `TX_DURING_WEEKEND`: 1 se a transação ocorreu no fim de semana (sábado ou domingo), 0 caso contrário.

2. `TX_DURING_NIGHT`: 1 se a transação ocorreu entre 00:00h e 06:00h, 0 caso contrário.

In [0]:
# Cria coluna indicando se é fim de semana (1 = sábado ou domingo, 0 = dia útil)
transactions = transactions.withColumn(
    "TX_DURING_WEEKEND",
    when(date_format(("TX_DATETIME"), "E").isin("Sat", "Sun"), 1).otherwise(0)
)

transactions  = transactions.withColumn(
     "TX_DURING_NIGHT",
     when(hour(col("TX_DATETIME")).between(0, 5), 1).otherwise(0)
)

In [0]:
display(transactions.select("TX_DATETIME", "TX_DURING_WEEKEND", "TX_DURING_NIGHT"))


Databricks data profile. Run in Databricks to view.

#### 4.1.2. transformação de features a partir da coluna `CUSTOMER_ID`

Perfeito! Vamos agora transformar o `CUSTOMER_ID` com base nas ideias do livro, que usa o conceito de **RFM (Recency, Frequency, Monetary value)** para descrever o comportamento dos clientes.

> Objetivo

Criar **6 novas features** com base no `CUSTOMER_ID` para **diferentes janelas de tempo (últimos 1, 7 e 30 dias)**:

---

> **Novas Features (2 tipos × 3 janelas)**

1. **Frequência (Frequency):**
   Número de transações feitas por um cliente nos últimos `n` dias.
   ➤ Nome da feature: `CUSTOMER_ID_NB_TX_nDAY_WINDOW`

2. **Valor monetário médio (Monetary value):**
   Valor médio das transações do cliente nos últimos `n` dias.
   ➤ Nome da feature: `CUSTOMER_ID_AVG_AMOUNT_nDAY_WINDOW`

Para `n = 1`, `7` e `30`.



In [0]:
def add_customer_spending_features(df, windows_days=[1, 7, 30]):
    # Adiciona coluna de timestamp em segundos (necessário para rangeBetween)
    df = df.withColumn("TX_TS", unix_timestamp("TX_DATETIME"))
    
    for n in windows_days:
        window_secs = n * 86400  # número de segundos na janela
        
        # Janela para o cliente ordenada por timestamp, olhando para trás N dias
        w = (
            Window.partitionBy("CUSTOMER_ID")
                  .orderBy(col("TX_TS"))
                  .rangeBetween(-window_secs, 0)
        )
        
        # Frequência de transações
        df = df.withColumn(
            f"CUSTOMER_ID_NB_TX_{n}DAY_WINDOW",
            count("*").over(w)
        )
        
        # Valor médio das transações
        df = df.withColumn(
            f"CUSTOMER_ID_AVG_AMOUNT_{n}DAY_WINDOW",
            avg("TX_AMOUNT").over(w)
        )
        
    return df

In [0]:
transactions = add_customer_spending_features(transactions)


In [0]:
display(transactions)

#### 4.1.3. transformação de features a partir da coluna `TERMINAL_ID`

Nesta etapa, geramos recursos históricos que capturam o nível de risco de cada terminal com base em atividades fraudulentas anteriores. Esses recursos simulam o conhecimento disponível em um cenário real, onde as informações sobre fraudes são atrasadas em 7 dias. Para cada terminal e cada transação, calculamos:

- O número de transações anteriores dentro de [t - (n + atraso), t - atraso]
- A taxa de fraude (fraudes / transações) dentro desse mesmo intervalo

Esses recursos são úteis para identificar terminais que podem ter sido comprometidos recentemente.

> **Objetivo:**

Criar 6 colunas para cada transação, com base no terminal:

| Coluna                              | O que representa                                             |
| ----------------------------------- | ------------------------------------------------------------ |
| `TERMINAL_ID_NB_TX_1DAY_WINDOW`     | Qtde de transações no terminal nos últimos 1 dia (com delay) |
| `TERMINAL_ID_RISK_1DAY_WINDOW`      | Fraudes / transações nos últimos 1 dia (com delay)           |
| `...7DAY_WINDOW`, `...30DAY_WINDOW` | Mesma lógica para 7 e 30 dias                                |




In [0]:
transactions.printSchema()

In [0]:
def add_terminal_risk_features(df, delay_days=7, window_days_list=[1, 7, 30]):
    # Adiciona coluna de timestamp em segundos
    df = df.withColumn("TX_TS", unix_timestamp("TX_DATETIME"))
    
    delay_sec = delay_days * 86400  # segundos de delay
    
    for window_days in window_days_list:
        window_sec = window_days * 86400
        total_range = delay_sec + window_sec
        
        # Cria a janela com range de delay + janela
        w_full = (
            Window.partitionBy("TERMINAL_ID")
            .orderBy("TX_TS")
            .rangeBetween(-total_range, -delay_sec - 1)  # garante a exclusão do delay
        )
        
        # Transações e fraudes na janela (excluindo o delay)
        nb_tx_col = f"TERMINAL_ID_NB_TX_{window_days}DAY_WINDOW"
        risk_col = f"TERMINAL_ID_RISK_{window_days}DAY_WINDOW"
        
        df = df.withColumn(nb_tx_col, count("*").over(w_full))
        df = df.withColumn(
            risk_col,
            (spark_sum("TX_FRAUD").over(w_full) / count("*").over(w_full))
        )
        
    risk_cols = [f"TERMINAL_ID_RISK_{d}DAY_WINDOW" for d in window_days_list]
    df = df.fillna({col: 0 for col in risk_cols})
   
    return df


In [0]:

subset = transactions
subset = add_terminal_risk_features(subset)


display(subset.select('TRANSACTION_ID', "TX_DATETIME", "TX_FRAUD", 'TERMINAL_ID_NB_TX_1DAY_WINDOW', 'TERMINAL_ID_RISK_1DAY_WINDOW', 'TERMINAL_ID_NB_TX_7DAY_WINDOW', 'TERMINAL_ID_RISK_7DAY_WINDOW', 'TERMINAL_ID_NB_TX_30DAY_WINDOW', 'TERMINAL_ID_RISK_30DAY_WINDOW'))

In [0]:
# Verificando linha para ver se bate os dados
display(subset.filter(col('TRANSACTION_ID') == 1754154))

In [0]:
# Aplicando no dataset completo
trasactions = add_terminal_risk_features(transactions)

In [0]:
# verificando as colunas
print(trasactions.columns)


In [0]:
display(trasactions)

In [0]:
transactions.write.format("delta").mode("overwrite").saveAsTable("fraud_detection.transactions_feature_engineering_gold")