
# LSTM — Classificação de Cliente a partir de Sequências de Engajamento

**Objetivo:** prever `classe_cliente` (ex.: *Alto Valor Ticket Médio*, *Familiar Mensalista*, etc.)
a partir de uma **sequência curta** (T=8 períodos) com variáveis de **engajamento em WhatsApp** e **consumo**.

Este notebook lê `dataset-buonopreco-registro_de_clientes.csv` do seu repositório **AI-Lab**
ou via **raw GitHub** e gera uma **sequência sintética** por cliente (8 períodos).
Em seguida, treina uma **LSTM** para **classificação multiclasse**.


In [None]:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import random

SEED = 123
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

plt.rcParams.update({"figure.figsize": (7,4)})



## 1) Leitura do dataset (local ou raw GitHub)


In [None]:

CANDIDATES = [
    './AI-Lab/datasets/dataset-buonopreco-registro_de_clientes.csv',
    '../AI-Lab/datasets/dataset-buonopreco-registro_de_clientes.csv',
    '../../AI-Lab/datasets/dataset-buonopreco-registro_de_clientes.csv',
    '../datasets/dataset-buonopreco-registro_de_clientes.csv',
    '../../datasets/dataset-buonopreco-registro_de_clientes.csv',
    'https://raw.githubusercontent.com/sousamaf/AI-Lab/main/datasets/dataset-buonopreco-registro_de_clientes.csv',
    'https://raw.githubusercontent.com/sousamaf/AI-Lab/master/datasets/dataset-buonopreco-registro_de_clientes.csv',
]

df = None
last_err = None
for p in CANDIDATES:
    try:
        df = pd.read_csv(p)
        print("Carregado de:", p)
        break
    except Exception as e:
        last_err = e

if df is None:
    raise RuntimeError(f"Falha ao carregar. Último erro: {last_err}")

# Tipagem e categorias
num_cols = ['idade','frequencia_mensal','gasto_medio','pct_compras_com_desconto',
            'whatsapp_envios','whatsapp_respostas','taxa_resposta_whatsapp']
for c in num_cols:
    df[c] = pd.to_numeric(df[c], errors='coerce')

df['categoria_preferida'] = df['categoria_preferida'].astype('category')
df['classe_cliente'] = df['classe_cliente'].astype('category')

class_names = list(df['classe_cliente'].cat.categories)
class_to_idx = {c:i for i,c in enumerate(class_names)}
df['classe_idx'] = df['classe_cliente'].map(class_to_idx)

cat_names = list(df['categoria_preferida'].cat.categories)
cat_to_idx = {c:i for i,c in enumerate(cat_names)}
df['categoria_idx'] = df['categoria_preferida'].map(cat_to_idx)

df.head()



## 2) Geração de sequência sintética (T=8) por cliente

- Variáveis por passo: `whatsapp_envios`, `whatsapp_respostas`, `pct_compras_com_desconto`, `gasto_medio`.
- A dinâmica é calibrada pela **classe** e **categoria** do cliente para induzir padrões distintos.


In [None]:

T = 8

def gen_sequence(row, T=8):
    # Bases por cliente
    env = float(row['whatsapp_envios'])
    rsp = float(row['whatsapp_respostas'])
    pct = float(row['pct_compras_com_desconto'])/100.0
    gmd = float(row['gasto_medio'])
    rr  = float(row['taxa_resposta_whatsapp'])
    cat = int(row['categoria_idx'])
    cls = int(row['classe_idx'])

    # Calibração por classe (induzir padrões)
    # Alto valor: respostas e gasto tendem a ser maiores
    if 'Alto Valor' in row['classe_cliente']:
        base_env, base_rsp, base_pct, base_g = env+1.0, rsp+0.7, pct*0.9, gmd*1.05
    elif 'Familiar' in row['classe_cliente']:
        base_env, base_rsp, base_pct, base_g = env, rsp+0.3, pct*1.1, gmd
    else:
        base_env, base_rsp, base_pct, base_g = env, rsp, pct, gmd

    seq = []
    e, r, p, g = base_env, base_rsp, base_pct, base_g
    for t in range(T):
        # leve sazonalidade e ruído
        e = max(0.0, 0.6*e + 0.4*base_env + 0.5*np.sin(2*np.pi*t/6) + np.random.normal(0, 0.5))
        r = np.clip(0.7*r + 0.3*base_rsp + 0.1*np.sin(2*np.pi*t/5) + np.random.normal(0, 0.2), 0, e+1e-3)
        p = np.clip(0.8*p + 0.2*base_pct + np.random.normal(0, 0.02), 0, 1)
        g = max(5.0, 0.7*g + 10.0*e + 30.0*(r/(e+1e-3)) - 15.0*p + (cat%3)*3.0 + np.random.normal(0, 8.0))
        seq.append([e, r, p*100.0, g])

    return np.array(seq, dtype=np.float32), cls

X, y = [], []
for _, row in df.iterrows():
    s, cls = gen_sequence(row, T=T)
    X.append(s)
    y.append(cls)

X = np.stack(X)  # (N, T, 4)
y = np.array(y, dtype=np.int64)

print("X shape:", X.shape, " y shape:", y.shape, " num classes:", len(class_names))



## 3) Split treino/val e DataLoader


In [None]:

idx = np.arange(len(X))
np.random.shuffle(idx)
split = int(0.8*len(idx))
tr_idx, va_idx = idx[:split], idx[split:]

Xtr, ytr = X[tr_idx], y[tr_idx]
Xva, yva = X[va_idx], y[va_idx]

class SeqClsDS(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)   # (N, T, F)
        self.y = torch.from_numpy(y)   # (N,)
    def __len__(self): return self.X.shape[0]
    def __getitem__(self, i): return self.X[i], self.y[i]

from torch.utils.data import Dataset, DataLoader
train_dl = DataLoader(SeqClsDS(Xtr, ytr), batch_size=64, shuffle=True)
val_dl   = DataLoader(SeqClsDS(Xva, yva), batch_size=128, shuffle=False)
Xtr.shape, Xva.shape



## 4) Modelo LSTM para classificação


In [None]:

class LSTMClassifier(nn.Module):
    def __init__(self, input_size=4, hidden=64, layers=1, num_classes=3, dropout=0.0):
        super().__init__()
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden, num_layers=layers, batch_first=True, dropout=dropout if layers>1 else 0.0)
        self.fc = nn.Linear(hidden, num_classes)
    def forward(self, x):
        out, (h, c) = self.lstm(x)     # (B, T, H)
        logits = self.fc(out[:, -1, :])
        return logits

device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.mps.is_available() else "cpu")

num_classes = len(class_names)
model = LSTMClassifier(input_size=4, hidden=64, layers=1, num_classes=num_classes).to(device)

opt = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

def accuracy(logits, y):
    pred = logits.argmax(dim=1)
    return (pred == y).float().mean().item()

def run_epoch(dl, train=True):
    model.train() if train else model.eval()
    total_loss, total_acc, count = 0.0, 0.0, 0
    for xb, yb in dl:
        xb, yb = xb.to(device), yb.to(device)
        with torch.set_grad_enabled(train):
            logits = model(xb)
            loss = loss_fn(logits, yb)
            if train:
                opt.zero_grad()
                loss.backward()
                opt.step()
        total_loss += float(loss.item()) * xb.size(0)
        total_acc  += accuracy(logits, yb) * xb.size(0)
        count += xb.size(0)
    return total_loss/max(1,count), total_acc/max(1,count)

tr_hist, va_hist = [], []
EPOCHS = 25
for ep in range(EPOCHS):
    tl, ta = run_epoch(train_dl, True)
    vl, va = run_epoch(val_dl, False)
    tr_hist.append((tl, ta)); va_hist.append((vl, va))
    if (ep+1) % 5 == 0 or ep == 1:
        print(f"ép {ep+1:02d} | loss_t {tl:.3f} acc_t {ta:.3f} | loss_v {vl:.3f} acc_v {va:.3f}")



## 5) Curvas de treino/validação


In [None]:

tr_loss = [x[0] for x in tr_hist]; tr_acc = [x[1] for x in tr_hist]
va_loss = [x[0] for x in va_hist]; va_acc = [x[1] for x in va_hist]

plt.figure()
plt.plot(tr_loss, label='treino')
plt.plot(va_loss, label='validação')
plt.xlabel('época'); plt.ylabel('loss'); plt.title('Loss'); plt.legend(); plt.show()

plt.figure()
plt.plot(tr_acc, label='treino')
plt.plot(va_acc, label='validação')
plt.xlabel('época'); plt.ylabel('acurácia'); plt.title('Acurácia'); plt.legend(); plt.show()



## 6) Matriz de confusão (validação)


In [None]:

from sklearn.metrics import confusion_matrix
import numpy as np

# Coletar predições em val
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for xb, yb in val_dl:
        logits = model(xb.to(device)).cpu().numpy()
        y_hat = np.argmax(logits, axis=1)
        y_pred.extend(y_hat.tolist())
        y_true.extend(yb.numpy().tolist())

cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))
cm



## 7) Observações didáticas
- A tarefa mostra **dependência temporal** entre estímulos (envios), **respostas** e consumo (`gasto_medio`).  
- A LSTM aprende **padrões de engajamento** característicos de cada classe.  
- Para uso real: substitua a simulação por **histórico real** (semanas/meses), normalize variáveis e avalie métricas mais robustas (macro-F1, ROC por classe, etc.).
