
# An√°lise de Sentimentos no IMDb com Transformers

**Autores:** S√©rgio Barreto (slbp) e Isaac Ferreira Silva (ifs5)

---

## 1) Defini√ß√£o do Problema

**Tarefa:** Classificar cr√≠ticas de filmes do IMDb em **positivas** (1) ou **negativas** (0) usando modelos *Transformers*.  
**Objetivo desta etapa:** Definir claramente a aplica√ß√£o, treinar um **baseline reprodut√≠vel** e apresentar **resultados parciais**.  
**Extens√µes inclu√≠das neste notebook:**  
- **Pr√©-processamento** textual inicial;  
- **Treinamento baseline** (DistilBERT por padr√£o);  
- **Busca de hiperpar√¢metros com Optuna** para refinar *learning rate*, *epochs*, *batch size*, etc.;  
- **Relato conciso de resultados** (acur√°cia, F1 e matriz de confus√£o).



## 2) Depend√™ncias

In [None]:
!pip install -q datasets scikit-learn torch
!pip install -U transformers

## Carregando modelo j√° treinado

In [None]:
import zipfile
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification

zip_path = "/content/imdb_model.zip"
extract_dir = "./melhor_modelo_imdb"

# 1) Extrair o zip
if not os.path.exists(extract_dir):
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(extract_dir)
        print(f"Arquivos extra√≠dos em: {extract_dir}")
else:
    print(f"Pasta {extract_dir} j√° existe, pulando extra√ß√£o.")


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_dir = "./melhor_modelo_imdb/imdb_model"

tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForSequenceClassification.from_pretrained(model_dir)

print("Modelo e tokenizer carregados de:", model_dir)



## 3) Configura√ß√£o do Experimento


In [None]:
from dataclasses import dataclass
from typing import Optional

@dataclass
class Config:
    # Modelo
    model_name: str = "distilbert-base-uncased"
    max_length: int = 256
    # Baseline training
    epochs: int = 3
    lr: float = 2e-5
    train_bs: int = 16
    eval_bs: int = 16
    grad_accum_steps: int = 2
    seed: int = 42
    fp16: bool = True
    N_TRAIN: Optional[int] = None
    N_TEST: Optional[int]  = None
    use_optuna: bool = True
    n_trials: int = 10
    N_TRAIN_HPO: int = 6000
    N_VAL_HPO: int = 2000

cfg = Config()
cfg



## 4) Importa√ß√µes, Ambiente e Semente


In [None]:
import re, random, numpy as np, torch
from datasets import load_dataset, Dataset
from transformers import (AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments,
                          DataCollatorWithPadding, EarlyStoppingCallback)
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(cfg.seed)
device = "cuda" if torch.cuda.is_available() else "cpu"
device



## 5) Carregamento do Dataset IMDb


In [None]:
dataset = load_dataset("stanfordnlp/imdb")
train_raw = dataset["train"]
test_raw  = dataset["test"]
if cfg.N_TRAIN is not None:
    train_raw = train_raw.shuffle(seed=cfg.seed).select(range(cfg.N_TRAIN))
if cfg.N_TEST is not None:
    test_raw = test_raw.shuffle(seed=cfg.seed).select(range(cfg.N_TEST))

len(train_raw), len(test_raw)



## 6) Pr√©-processamento (Leve)

Transformers funcionam bem com texto quase bruto, mas aplicamos **limpezas leves** e **sanidade de tamanho**:
- remo√ß√£o de tags HTML simples;
- normaliza√ß√£o de espa√ßos;
- *clipping* de tamanho por tokeniza√ß√£o (feito na etapa de tokeniza√ß√£o).


In [None]:
_html_tag = re.compile(r"<[^>]+>")
_spaces = re.compile(r"\s+")

def clean_text(s: str) -> str:
    s = _html_tag.sub(" ", s)
    s = s.replace("\n", " ").replace("\t", " ")
    s = _spaces.sub(" ", s).strip()
    return s

def apply_clean(ds):
    return ds.map(lambda x: {"text": clean_text(x["text"])}, batched=False)

train_clean = apply_clean(train_raw)
test_clean  = apply_clean(test_raw)

# Estat√≠sticas simples de tamanho (caracteres)
train_lens = [len(x["text"]) for x in train_clean.select(range(min(2000, len(train_clean))))]
test_lens  = [len(x["text"]) for x in test_clean.select(range(min(2000, len(test_clean))))]

print("Exemplo limpo:", train_clean[0]["text"][:200], "...")
print("Tamanhos (amostra) - train/test:", (np.mean(train_lens), np.mean(test_lens)))



## 7) Tokeniza√ß√£o


In [None]:
tokenizer = AutoTokenizer.from_pretrained(cfg.model_name)

def tokenize_fn(batch):
    return tokenizer(batch["text"], truncation=True, padding=False, max_length=cfg.max_length)

train_tok = train_clean.map(tokenize_fn, batched=True, remove_columns=["text"])
test_tok  = test_clean.map(tokenize_fn, batched=True, remove_columns=["text"])

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)



## 8) M√©tricas e Modelo


In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    return {"accuracy": accuracy_score(labels, preds), "f1": f1_score(labels, preds)}

def model_init():
    return AutoModelForSequenceClassification.from_pretrained(cfg.model_name, num_labels=2)



## 9) Treinamento Baseline


In [None]:
fp16_flag = cfg.fp16 and (device == "cuda")

args_base = TrainingArguments(
    output_dir="./results_baseline",
    eval_strategy="epoch",
    save_strategy="epoch",
    num_train_epochs=cfg.epochs,
    per_device_train_batch_size=cfg.train_bs,
    per_device_eval_batch_size=cfg.eval_bs,
    gradient_accumulation_steps=cfg.grad_accum_steps,
    learning_rate=cfg.lr,
    logging_dir="./logs",
    report_to="none",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    fp16=fp16_flag
)

trainer_base = Trainer(
    model_init=model_init,
    args=args_base,
    train_dataset=train_tok,
    eval_dataset=test_tok,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

out_base = trainer_base.train()
metrics_base = trainer_base.evaluate()
metrics_base



## 10) Busca de Hiperpar√¢metros com Optuna

Para acelerar, usamos subconjuntos menores durante a busca (**HPO**).  
Depois, **re-treinamos** com os melhores hiperpar√¢metros.

In [None]:
!pip install optuna

In [None]:
best_params = None
if cfg.use_optuna:
    hpo_train = train_clean.shuffle(seed=cfg.seed).select(range(min(cfg.N_TRAIN_HPO, len(train_clean))))
    hpo_val   = test_clean.shuffle(seed=cfg.seed).select(range(min(cfg.N_VAL_HPO, len(test_clean))))
    hpo_train_tok = hpo_train.map(tokenize_fn, batched=True, remove_columns=["text"])
    hpo_val_tok   = hpo_val.map(tokenize_fn, batched=True, remove_columns=["text"])

    def hp_space(trial):
        return {
            "learning_rate": trial.suggest_float("learning_rate", 1e-5, 5e-5, log=True),
            "num_train_epochs": trial.suggest_int("num_train_epochs", 2, 5),
            "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [8, 16, 32]),
            "weight_decay": trial.suggest_float("weight_decay", 0.0, 0.2),
            "warmup_ratio": trial.suggest_float("warmup_ratio", 0.0, 0.2),
        }

    args_hpo = TrainingArguments(
        output_dir="./results_hpo",
        eval_strategy="epoch",
        save_strategy="no",
        per_device_eval_batch_size=cfg.eval_bs,
        gradient_accumulation_steps=cfg.grad_accum_steps,
        logging_dir="./logs_hpo",
        report_to="none",
        fp16=fp16_flag
    )

    trainer_hpo = Trainer(
        model_init=model_init,
        args=args_hpo,
        train_dataset=hpo_train_tok,
        eval_dataset=hpo_val_tok,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics
    )

    best_run = trainer_hpo.hyperparameter_search(
        direction="maximize",
        backend="optuna",
        hp_space=hp_space,
        n_trials=cfg.n_trials
    )

    best_params = best_run.hyperparameters
    best_params
else:
    print("HPO desativado; pulando Optuna.")



## 11) Re-Treinamento com Melhores Hiperpar√¢metros


In [None]:
metrics_best = None
trainer_best = None

if best_params is not None:
    # Monta novos argumentos de treino com melhores hiperpar√¢metros
    args_best = TrainingArguments(
        output_dir="./results_best",
        eval_strategy="epoch",
        save_strategy="epoch",
        num_train_epochs=int(best_params.get("num_train_epochs", cfg.epochs)),
        per_device_train_batch_size=int(best_params.get("per_device_train_batch_size", cfg.train_bs)),
        per_device_eval_batch_size=cfg.eval_bs,
        gradient_accumulation_steps=cfg.grad_accum_steps,
        learning_rate=float(best_params.get("learning_rate", cfg.lr)),
        weight_decay=float(best_params.get("weight_decay", 0.0)),
        warmup_ratio=float(best_params.get("warmup_ratio", 0.0)),
        logging_dir="./logs_best",
        report_to="none",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        greater_is_better=True,
        fp16=fp16_flag
    )

    trainer_best = Trainer(
        model_init=model_init,
        args=args_best,
        train_dataset=train_tok,
        eval_dataset=test_tok,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
    )

    out_best = trainer_best.train()
    metrics_best = trainer_best.evaluate()

metrics_best


### Salvar e carregar o modelo

In [None]:
# Definir o diret√≥rio onde o modelo ser√° salvo
model_save_path = "./melhor_modelo_imdb"

print(f"Salvando o melhor modelo em: {model_save_path}")

# Salva o modelo (pesos)
trainer_best.save_model(model_save_path)

# √â crucial salvar o tokenizador tamb√©m para garantir que o pr√©-processamento seja id√™ntico
tokenizer.save_pretrained(model_save_path)

print("Modelo e tokenizador salvos com sucesso!")

In [None]:
import zipfile
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification

zip_path = "/content/imdb_model.zip"
extract_dir = "./melhor_modelo_imdb"

# 1) Extrair o zip
if not os.path.exists(extract_dir):
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(extract_dir)
        print(f"Arquivos extra√≠dos em: {extract_dir}")
else:
    print(f"Pasta {extract_dir} j√° existe, pulando extra√ß√£o.")


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_dir = "./melhor_modelo_imdb/imdb_model"

tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForSequenceClassification.from_pretrained(model_dir)

print("Modelo e tokenizer carregados de:", model_dir)



## 12) Avalia√ß√£o Final e Matriz de Confus√£o


In [None]:
# Compara√ß√£o manual entre baseline e modelo otimizado
if metrics_best is not None:
    print("üìä Comparando baseline vs melhor modelo:")
    print(f"Baseline -> Acur√°cia: {metrics_base['eval_accuracy']:.4f}, F1: {metrics_base['eval_f1']:.4f}")
    print(f"Melhor (Optuna) -> Acur√°cia: {metrics_best['eval_accuracy']:.4f}, F1: {metrics_best['eval_f1']:.4f}")

    if metrics_best['eval_f1'] > metrics_base['eval_f1']:
        print("‚úÖ O modelo otimizado com Optuna teve melhor desempenho.")
    else:
        print("‚öôÔ∏è O baseline teve desempenho igual ou superior.")
else:
    print("Usando apenas o modelo baseline (Optuna n√£o executado).")


In [None]:
_evaluator = trainer_base

pred = _evaluator.predict(test_tok)
y_true = pred.label_ids
y_pred = pred.predictions.argmax(axis=1)

print("=== Relat√≥rio de Classifica√ß√£o (Teste) ===")
print(classification_report(y_true, y_pred, target_names=["negativo", "positivo"]))

cm = confusion_matrix(y_true, y_pred)

fig = plt.figure(figsize=(5,4))
plt.imshow(cm, interpolation='nearest')
plt.title("Matriz de Confus√£o (Baseline)")
plt.xticks([0,1], ["negativo", "positivo"])
plt.yticks([0,1], ["negativo", "positivo"])
for (i, j), v in np.ndenumerate(cm):
    plt.text(j, i, int(v), ha='center', va='center')
plt.xlabel("Predito")
plt.ylabel("Verdadeiro")
plt.tight_layout()
plt.show()



## 13) Resultados Parciais

O baseline com DistilBERT alcan√ßou desempenho consistente (acur√°cia e F1 elevados).
Com Optuna, foram explorados hiperpar√¢metros-chave (learning rate, epochs, batch size, weight decay, warmup_ratio), e o melhor conjunto foi re-treinado no corpus completo, mantendo equil√≠brio entre classes na matriz de confus√£o.
No entanto, o fine-tuning com Optuna n√£o superou o baseline, possivelmente porque o modelo original j√° estava bem ajustado ao dataset IMDb ‚Äî um corpus limpo, balanceado e de dom√≠nio est√°vel, no qual pequenos ajustes de hiperpar√¢metros t√™m impacto marginal. Al√©m disso, o espa√ßo de busca limitado e o baixo n√∫mero de trials reduziram a chance de encontrar combina√ß√µes significativamente melhores, e varia√ß√µes estat√≠sticas (como a semente aleat√≥ria e o particionamento dos dados) podem explicar diferen√ßas sutis.
Esses resultados formam um baseline s√≥lido e est√°vel, servindo de ponto de partida confi√°vel para as pr√≥ximas etapas de robustez, interpretabilidade e ataques advers√°rios.


## 14) Avalia√ß√£o de Robustez (Stress Testing)


In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"

import random
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer
from sklearn.metrics import accuracy_score

# 1. Configura√ß√µes Iniciais
random.seed(42)
model_path = "./melhor_modelo_imdb/imdb_model"

print(f"Carregando modelo de: {model_path}...")

loaded_tokenizer = AutoTokenizer.from_pretrained(model_path)
loaded_model = AutoModelForSequenceClassification.from_pretrained(model_path)

device = "cuda" if torch.cuda.is_available() else "cpu"
loaded_model.to(device)

robustness_trainer = Trainer(model=loaded_model)

# 3. Definir Fun√ß√µes de Perturba√ß√£o
def perturb_typos(text, prob=0.05):
    chars = list(text)
    for i in range(len(chars) - 1):
        if random.random() < prob:
            chars[i], chars[i+1] = chars[i+1], chars[i]
    return "".join(chars)

def perturb_uppercase(text):
    return text.upper()

def perturb_spam_noise(text):
    noises = [" http://bit.ly/fake", " <br> CLICK HERE", " #ad #promo"]
    return text + " " + random.choice(noises)

scenarios = {
    "Original": lambda x: x,
    "Typos (5%)": perturb_typos,
    "Caixa Alta (UPPER)": perturb_uppercase,
    "Ru√≠do (Spam)": perturb_spam_noise
}

# 4. Executar o Teste
test_subset = test_clean.shuffle(seed=42).select(range(1000))

print("\n=== Resultados de Robustez (Modelo Carregado) ===")
print(f"{'Cen√°rio':<25} | {'Acur√°cia':<10} | {'Diferen√ßa':<10}")
print("-" * 50)

results = {}

# Fun√ß√£o auxiliar para tokenizar dentro do loop (usando o tokenizador carregado)
def tokenize_for_test(batch):
    return loaded_tokenizer(batch["text"], truncation=True, padding=True, max_length=256)

for name, func in scenarios.items():
    # Aplica a perturba√ß√£o
    perturbed_ds = test_subset.map(lambda x: {"text": func(x["text"])}, batched=False)

    # Tokeniza
    perturbed_tok = perturbed_ds.map(tokenize_for_test, batched=True, remove_columns=["text"])

    # Predi√ß√£o usando o modelo carregado
    preds = robustness_trainer.predict(perturbed_tok)
    y_pred = preds.predictions.argmax(axis=1)
    y_true = preds.label_ids

    acc = accuracy_score(y_true, y_pred)
    results[name] = acc

    # Calcular diferen√ßa para o original
    diff = ""
    if name != "Original":
        delta = acc - results["Original"]
        diff = f"{delta:.2%}"

    print(f"{name:<25} | {acc:.4f}     | {diff}")

### 14.1) O que exatamente foi perturbado / quantidade de ru√≠do

- **Typos (prob=0.05):** para cada par de caracteres adjacentes, h√° **5% de chance** de trocar a ordem (swap).  
  *Observa√ß√£o:* isso gera **m√∫ltiplas trocas por review**, proporcional ao tamanho do texto.
- **Caixa alta:** converte o texto inteiro para `UPPERCASE` (n√£o altera tokens sem√¢nticos, mas muda superf√≠cie).
- **Ru√≠do/Spam:** **adiciona 1 sufixo** ao final do texto, escolhido aleatoriamente entre:
  1) `http://bit.ly/fake`  2) `<br> CLICK HERE`  3) `#ad #promo`

A seguir, al√©m de **acur√°cia**, vamos reportar:
- **flip rate** (quantos exemplos mudam de classe vs. original)
- **confian√ßa (softmax)** m√©dia do modelo (geral e apenas nos acertos)
- **exemplos (antes/depois)** das transforma√ß√µes com predi√ß√£o + confian√ßa


In [None]:
import torch
import torch.nn.functional as F
import pandas as pd
from sklearn.metrics import accuracy_score

def _ensure_str_list(texts):
    # pandas Series / numpy etc.
    if hasattr(texts, "tolist"):
        texts = texts.tolist()

    # single example vira lista
    if isinstance(texts, str):
        return [texts]
    if not isinstance(texts, (list, tuple)):
        texts = [texts]

    out = []
    for t in texts:
        if t is None:
            out.append("")
        elif isinstance(t, str):
            out.append(t)
        elif isinstance(t, (list, tuple)):
            out.append(" ".join(map(str, t)))
        else:
            out.append(str(t))
    return out

def _predict_subset(texts, labels):
    texts = _ensure_str_list(texts)
    labels = list(labels) if hasattr(labels, "__iter__") else [labels]

    tok = loaded_tokenizer(
        texts, truncation=True, padding=True, max_length=256, return_tensors="pt"
    ).to(device)

    with torch.no_grad():
        logits = loaded_model(**tok).logits

    probs = F.softmax(logits, dim=1).detach().cpu().numpy()
    y_pred = probs.argmax(axis=1)
    conf = probs.max(axis=1)
    return y_pred, conf, probs, labels


In [None]:
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score

def safe_text(x):
    return x if isinstance(x, str) else ""

def tokenize_for_test(batch):
    return loaded_tokenizer(batch["text"], truncation=True, padding=True, max_length=256)

def predict_on_dataset(ds_tok):
    out = robustness_trainer.predict(ds_tok)
    logits = out.predictions
    y_true = out.label_ids

    probs = F.softmax(torch.tensor(logits), dim=1).numpy()
    y_pred = probs.argmax(axis=1)
    conf = probs.max(axis=1)
    return y_true, y_pred, conf, probs

# subset
test_subset = test_clean.shuffle(seed=42).select(range(1000))
texts_orig = [safe_text(t) for t in test_subset["text"]]

tok_orig = test_subset.map(lambda x: {"text": safe_text(x["text"])}, batched=False) \
                     .map(tokenize_for_test, batched=True, remove_columns=["text"])
y_true_orig, y_pred_orig, conf_orig, _ = predict_on_dataset(tok_orig)
acc_orig = accuracy_score(y_true_orig, y_pred_orig)

rows = []
example_rows = []
example_idx = [0, 1, 2]

print("=== Resultados de Robustez (com confian√ßa + flips) ===")

for name, func in scenarios.items():
    perturbed_ds = test_subset.map(lambda x: {"text": safe_text(func(safe_text(x["text"])))}, batched=False)
    perturbed_tok = perturbed_ds.map(tokenize_for_test, batched=True, remove_columns=["text"])

    y_true, y_pred, conf, _ = predict_on_dataset(perturbed_tok)

    acc = accuracy_score(y_true, y_pred)
    flip_rate = float((y_pred != y_pred_orig).mean())
    mean_conf = float(conf.mean())
    mean_conf_correct = float(conf[y_pred == y_true].mean()) if (y_pred == y_true).any() else float("nan")

    rows.append({
        "cenario": name,
        "accuracy": acc,
        "acc_delta_vs_orig": acc - acc_orig,
        "flip_rate_vs_orig": flip_rate,
        "mean_conf": mean_conf,
        "mean_conf_delta_vs_orig": mean_conf - float(conf_orig.mean()),
        "mean_conf_on_correct": mean_conf_correct,
    })

    # exemplos antes/depois
    texts_pert = [safe_text(func(t)) for t in texts_orig]
    for j in example_idx:
        example_rows.append({
            "cenario": name,
            "orig_text": texts_orig[j][:220].replace("\n"," "),
            "pert_text": texts_pert[j][:220].replace("\n"," "),
            "y_true": int(y_true[j]),
            "pred": int(y_pred[j]),
            "conf": float(conf[j]),
            "pred_orig": int(y_pred_orig[j]),
            "conf_orig": float(conf_orig[j]),
        })

df_rob = pd.DataFrame(rows).sort_values("accuracy", ascending=False)
display(df_rob)

print("\nExemplos (antes/depois) + predi√ß√£o + confian√ßa:")
df_examples = pd.DataFrame(example_rows)
display(df_examples)


In [None]:
import matplotlib.pyplot as plt

plot_df = df_rob.copy()

if (plot_df["cenario"] == "Original").any():
    plot_df["ord"] = (plot_df["cenario"] != "Original").astype(int)
    plot_df = plot_df.sort_values(["ord", "accuracy"], ascending=[True, False])

names = plot_df["cenario"].tolist()
values = plot_df["accuracy"].tolist()

original_acc = float(plot_df.loc[plot_df["cenario"]=="Original","accuracy"].iloc[0]) \
    if (plot_df["cenario"]=="Original").any() else max(values)

drops = [(original_acc - v) * 100 for v in values]

plt.figure(figsize=(10, 6))
bars = plt.bar(names, values)

plt.ylim(0, 1.1)
plt.title("Robustez do Modelo sob Diferentes Cen√°rios")
plt.ylabel("Acur√°cia")
plt.axhline(y=original_acc, linestyle='--', alpha=0.5, label='Performance Original')

for bar, drop in zip(bars, drops):
    h = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2.0, h, f'{h:.1%}\n(-{drop:.1f}%)',
             ha='center', va='bottom', fontweight='bold')

plt.xticks(rotation=20, ha="right")
plt.legend()
plt.tight_layout()
plt.show()

## 14.2) Discuss√£o dos resultados de robustez (com m√©tricas objetivas)

**Configura√ß√£o do stress test**
- Avalia√ß√£o em **subset de 1000 reviews** do teste (para rapidez e reprodutibilidade).
- Typos: `prob=0.05` de **swap de caracteres adjacentes**.
- Caixa alta: `text.upper()`.
- Ru√≠do/Spam: adiciona **1 sufixo** (entre 3 op√ß√µes) ao final da review.

**O que reportar (m√≠nimo para n√£o ficar vago)**
- **Acur√°cia** por cen√°rio e **Œî vs original**
- **Flip rate** vs original (percentual de exemplos cuja classe muda)
- **Confian√ßa (softmax)** m√©dia do modelo:
  - geral
  - somente nos acertos

‚û°Ô∏è Use a tabela `df_rob` gerada acima como evid√™ncia principal e inclua 2‚Äì3 exemplos do `df_examples`.


## 15) Avalia√ß√£o de Interpretabilidade

In [None]:
model_path = "./melhor_modelo_imdb/imdb_model"

In [None]:
!pip install -q shap

In [None]:
loaded_tokenizer = AutoTokenizer.from_pretrained(model_path)
loaded_model = AutoModelForSequenceClassification.from_pretrained(model_path)

In [None]:
import numpy as np
import pandas as pd
import shap
import torch
from transformers import pipeline

N_SHAP = 30
MAX_LEN = 256
SEED = 42

shap_subset = test_clean.shuffle(seed=SEED).select(range(min(N_SHAP, len(test_clean))))
texts_to_explain = [t if isinstance(t, str) else "" for t in shap_subset["text"]]

print(f"Rodando SHAP em {len(texts_to_explain)} textos...")

device_id = 0 if torch.cuda.is_available() else -1

sentiment_pipe = pipeline(
    "text-classification",
    model=loaded_model,
    tokenizer=loaded_tokenizer,
    return_all_scores=True,
    device=device_id,
    truncation=True,
    max_length=MAX_LEN,
)

explainer = shap.Explainer(sentiment_pipe)
shap_values = explainer(texts_to_explain)

print("SHAP pronto. Gerando ranking global...")

def _token_list(x):
    # shap_values.data costuma vir tokenizado ou em string
    if isinstance(x, (list, np.ndarray)):
        return list(x)
    return str(x).split()

def _vals_1d(v):
    v = np.array(v)
    if v.ndim == 1:
        return v
    if v.ndim == 2:
        # usa a classe 1 (positivo) se existir; sen√£o pega a √∫ltima
        if v.shape[1] >= 2:
            return v[:, 1]
        return v[:, -1]
    return v.reshape(-1)

signed_sum, abs_sum, cnt = {}, {}, {}

for i in range(len(shap_values)):
    toks = _token_list(shap_values.data[i])
    vals = _vals_1d(shap_values.values[i])

    m = min(len(toks), len(vals))
    for t, s in zip(toks[:m], vals[:m]):
        t = str(t).strip().lower()
        if not t or not t.isalpha():
            continue
        signed_sum[t] = signed_sum.get(t, 0.0) + float(s)
        abs_sum[t] = abs_sum.get(t, 0.0) + float(abs(s))
        cnt[t] = cnt.get(t, 0) + 1

df_shap_global = pd.DataFrame({
    "token": list(cnt.keys()),
    "mean_shap": [signed_sum[t] / cnt[t] for t in cnt.keys()],
    "mean_abs_shap": [abs_sum[t] / cnt[t] for t in cnt.keys()],
    "n_occurrences": [cnt[t] for t in cnt.keys()],
}).sort_values("mean_abs_shap", ascending=False)

MIN_OCC = 2
df_shap_global_filt = df_shap_global[df_shap_global["n_occurrences"] >= MIN_OCC].copy()

# ‚ÄúImpacto consider√°vel‚Äù = top 5% por mean_abs_shap (agora com filtro)
if len(df_shap_global_filt) > 0:
    threshold = df_shap_global_filt["mean_abs_shap"].quantile(0.95)
    df_shap_global_filt["is_considerable"] = df_shap_global_filt["mean_abs_shap"] >= threshold
    print(f"Threshold (top 5%) em mean_abs_shap com n>={MIN_OCC}: {threshold:.6f}")
else:
    threshold = None
    print("Aviso: ap√≥s filtro n_occurrences>=2, n√£o sobrou token suficiente. Aumente N_SHAP.")

print("\nTop tokens (global) ap√≥s filtro de ocorr√™ncia:")
display(df_shap_global_filt.head(20))

i = 0
toks0 = _token_list(shap_values.data[i])
vals0 = _vals_1d(shap_values.values[i])
m0 = min(len(toks0), len(vals0))
df_local0 = pd.DataFrame({"token": toks0[:m0], "shap": vals0[:m0]})
df_local0["abs_shap"] = df_local0["shap"].abs()
df_local0 = df_local0.sort_values("abs_shap", ascending=False).head(15)

print("\nExemplo local (top 15 tokens por |SHAP|) no texto 0:")
display(df_local0)

## 16) Ataques Advers√°rios

Nesta se√ß√£o avaliamos a vulnerabilidade do modelo a ataques advers√°rios em PLN.
Diferente dos testes de robustez (ru√≠do, typos e caixa alta), aqui criamos
entradas modificadas **intencionalmente** para tentar induzir o modelo a errar,
com pequenas altera√ß√µes no texto.

O foco √© responder:

- O modelo muda de decis√£o com perturba√ß√µes bem pequenas?
- Qual a taxa de exemplos cuja predi√ß√£o muda ap√≥s o ataque?
- Esses ataques s√£o realistas do ponto de vista sem√¢ntico?


### 16.1) Defini√ß√µes + evid√™ncias (exemplos, como escolhemos ‚Äútokens emocionais‚Äù)

- O dataset IMDb tem **r√≥tulo no n√≠vel da review** (positivo/negativo). **N√£o existe label por palavra**.  
  Portanto, ‚Äúpalavra com carga emocional‚Äù aqui significa **tokens com polaridade forte** (alta val√™ncia), identificados **por heur√≠stica**.

No notebook original, os gatilhos foram **manuais** (ex.: *terrible/awful* vs *excellent/wonderful*; *great/fantastic/wonderful*).  
Para tornar isso ‚Äúaudit√°vel‚Äù, abaixo n√≥s:
1) mostramos **exemplos reais** onde a predi√ß√£o **vira** (antes/depois + confian√ßa), e  
2) extra√≠mos uma lista de tokens ‚Äúfortes‚Äù automaticamente via **SHAP agregado** (quando dispon√≠vel).


In [None]:
def adv_injection_opposite_sentiment(example):
    """
    Ataque 1: injetar uma frase com sentimento oposto ao r√≥tulo.
    label==1 (positivo) -> injeta termos negativos
    label==0 (negativo) -> injeta termos positivos
    """
    text = example.get("text", "")
    label = int(example.get("label", 0))

    if label == 1:
        suffix = " However, some people might say this movie is terrible and absolutely awful."
    else:
        suffix = " However, some people might say this movie is excellent and absolutely wonderful."

    return {"text": text + " " + suffix}

def adv_trigger_neutral(example):
    """
    Ataque 2: frase aparentemente neutra, mas com palavras polarizadas.
    """
    text = example.get("text", "")
    trigger = (
        " This sentence is only for analysis and should not change the real opinion, "
        "but it mentions that the movie is great, fantastic and wonderful."
    )
    return {"text": text + " " + trigger}


In [None]:
import torch
import torch.nn.functional as F
import pandas as pd
import numpy as np

def _ensure_str_list(texts):

    if isinstance(texts, str):
        return [texts]

    # datasets.Column geralmente tem to_pylist()
    if hasattr(texts, "to_pylist"):
        texts = texts.to_pylist()
    elif hasattr(texts, "tolist"):
        texts = texts.tolist()
    elif not isinstance(texts, (list, tuple)) and hasattr(texts, "__iter__"):
        texts = list(texts)

    out = []
    for t in texts:
        if t is None:
            out.append("")
        elif isinstance(t, str):
            out.append(t)
        elif isinstance(t, (list, tuple, np.ndarray)):
            out.append(" ".join(map(str, t)))
        else:
            out.append(str(t))
    return out


def _predict_texts(texts, labels):
    texts = _ensure_str_list(texts)
    tok = loaded_tokenizer(
        texts, truncation=True, padding=True, max_length=256, return_tensors="pt"
    ).to(device)
    with torch.no_grad():
        logits = loaded_model(**tok).logits
    probs = F.softmax(logits, dim=1).detach().cpu().numpy()
    y_pred = probs.argmax(axis=1)
    conf = probs.max(axis=1)
    return y_pred, conf, probs, labels

def apply_attack_safely(ds, attack_fn):
    def _wrap(ex):
        out = attack_fn(ex)
        # attack_fn pode retornar dict ou string; se n√£o retornar nada, usa o original
        if isinstance(out, dict) and "text" in out:
            t = out["text"]
        elif isinstance(out, str):
            t = out
        else:
            t = ex.get("text", "")
        # sanitiza
        if t is None:
            t = ""
        elif not isinstance(t, str):
            if isinstance(t, (list, tuple, np.ndarray)):
                t = " ".join(map(str, t))
            else:
                t = str(t)
        return {"text": t}
    return ds.map(_wrap, batched=False)

# Trabalhar com um subset pequeno para relat√≥rio
N = 300
subset_adv = test_subset.select(range(min(N, len(test_subset))))

texts = _ensure_str_list(subset_adv["text"])
y_true_adv = np.array(subset_adv["label"])

# Original
pred0, conf0, _, _ = _predict_texts(texts, y_true_adv)
acc0 = (pred0 == y_true_adv).mean()
print(f"Acur√°cia (original) no subset: {acc0:.4f}")

def run_attack(attack_name, attack_fn, k_examples=5):
    global texts_adv
    adv_ds = apply_attack_safely(subset_adv, attack_fn)
    texts_adv = _ensure_str_list(adv_ds["text"])

    pred1, conf1, _, _ = _predict_texts(texts_adv, y_true_adv)
    acc1 = (pred1 == y_true_adv).mean()
    flip = pred1 != pred0
    flip_rate = flip.mean()
    print(f"\n[{attack_name}] acc={acc1:.4f} | Œîacc={acc1-acc0:+.4f} | flip_rate={flip_rate:.2%}")

    idx = np.where(flip)[0]
    if len(idx) == 0:
        idx = np.argsort(conf0 - conf1)[-k_examples:]
    else:
        idx = idx[:k_examples]

    rows = []
    for i in idx:
        rows.append({
            "i": int(i),
            "y_true": int(y_true_adv[i]),
            "pred_orig": int(pred0[i]),
            "conf_orig": float(conf0[i]),
            "pred_adv": int(pred1[i]),
            "conf_adv": float(conf1[i]),
            "delta_conf": float(conf1[i] - conf0[i]),
            "orig_prefix": texts[i][:220].replace("\n"," "),
            "adv_prefix": texts_adv[i][:220].replace("\n"," "),
            "orig_suffix": texts[i][-220:].replace("\n"," "),
            "adv_suffix": texts_adv[i][-220:].replace("\n"," "),
            "len_orig": len(texts[i]),
            "len_adv": len(texts_adv[i]),

        })
    return pd.DataFrame(rows)

df_flip1 = run_attack("OppositeSentimentInjection", adv_injection_opposite_sentiment)
display(df_flip1)

df_flip2 = run_attack("NeutralTriggerWithSentimentWords", adv_trigger_neutral)
display(df_flip2)

In [None]:
import difflib

def show_insert(i, texts_adv_local):
    o = texts[i]
    a = texts_adv_local[i]

    sm = difflib.SequenceMatcher(None, o, a)
    inserts = []
    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        if tag == "insert":
            inserts.append(a[j1:j2])

    print("Inser√ß√µes encontradas:")
    for k, ins in enumerate(inserts[:5], 1):
        print(f"{k}. {ins.strip()[:400]}")

show_insert(int(df_flip1.iloc[0]["i"]), texts_adv)

### Gr√°ficos e an√°lises

In [None]:
import os
import matplotlib.pyplot as plt

FIG_DIR = "figs"
os.makedirs(FIG_DIR, exist_ok=True)

def savefig(name):
    path = os.path.join(FIG_DIR, name)
    plt.tight_layout()
    plt.savefig(path, dpi=200, bbox_inches="tight")
    print("Salvo em:", path)


#### Robustez

In [None]:
plot_df = df_rob.copy()

if (plot_df["cenario"] == "Original").any():
    plot_df["ord"] = (plot_df["cenario"] != "Original").astype(int)
    plot_df = plot_df.sort_values(["ord", "accuracy"], ascending=[True, False])

names = plot_df["cenario"].tolist()
acc = plot_df["accuracy"].tolist()

plt.figure(figsize=(9,4))
plt.bar(names, acc)
plt.ylim(0, 1.05)
plt.ylabel("Acur√°cia")
plt.title("Robustez: Acur√°cia por Cen√°rio")
plt.xticks(rotation=20, ha="right")
savefig("robustez_acuracia.png")
plt.show()


In [None]:
if "flip_rate_vs_orig" in plot_df.columns:
    plt.figure(figsize=(9,4))
    plt.bar(names, plot_df["flip_rate_vs_orig"].tolist())
    plt.ylim(0, max(0.02, float(plot_df["flip_rate_vs_orig"].max())*1.2))
    plt.ylabel("Flip rate vs Original")
    plt.title("Robustez: Taxa de Invers√£o de Predi√ß√£o")
    plt.xticks(rotation=20, ha="right")
    savefig("robustez_fliprate.png")
    plt.show()


#### Ataques Adversariais

In [None]:
import pandas as pd

df_adv_plot = pd.DataFrame([
    {"ataque":"Original", "accuracy": 0.9000, "flip_rate": 0.0},
    {"ataque":"OppositeSentimentInjection", "accuracy": 0.7833, "flip_rate": 0.1167},
    {"ataque":"NeutralTrigger", "accuracy": 0.8833, "flip_rate": 0.0567},
])

plt.figure(figsize=(9,4))
plt.bar(df_adv_plot["ataque"], df_adv_plot["accuracy"])
plt.ylim(0, 1.05)
plt.ylabel("Acur√°cia")
plt.title("Ataques Adversariais: Acur√°cia")
plt.xticks(rotation=20, ha="right")
savefig("adv_acuracia.png")
plt.show()

plt.figure(figsize=(9,4))
plt.bar(df_adv_plot["ataque"], df_adv_plot["flip_rate"])
plt.ylim(0, max(0.02, df_adv_plot["flip_rate"].max()*1.2))
plt.ylabel("Flip rate")
plt.title("Ataques Adversariais: Taxa de Invers√£o (flip rate)")
plt.xticks(rotation=20, ha="right")
savefig("adv_fliprate.png")
plt.show()


#### SHAP Global: top tokens por impacto

In [None]:
topn = 15
top = df_shap_global_filt.sort_values("mean_abs_shap", ascending=False).head(topn).copy()
top = top.sort_values("mean_abs_shap", ascending=True)

plt.figure(figsize=(7,5))
plt.barh(top["token"], top["mean_abs_shap"])
plt.xlabel("mean(|SHAP|)")
plt.title(f"SHAP Global: Top {topn} Tokens por Impacto M√©dio")
savefig("shap_top_tokens.png")
plt.show()


In [None]:
import numpy as np
import random

# garante seed igual ao seu experimento de robustez
random.seed(42)

texts_orig = test_subset["text"]
typo_fn = scenarios["Typos (5%)"]

def count_char_diffs(a: str, b: str) -> int:
    # conta posi√ß√µes diferentes + diferen√ßa de tamanho
    m = min(len(a), len(b))
    diff = sum(1 for i in range(m) if a[i] != b[i])
    diff += abs(len(a) - len(b))
    return diff

# gera textos com typos
texts_typos = [typo_fn(t) for t in texts_orig]

# calcula m√©dia de caracteres alterados
deltas = [count_char_diffs(o, p) for o, p in zip(texts_orig, texts_typos)]
mean_delta_chars = float(np.mean(deltas))

print(f"Œî_chars m√©dio (Typos 5%): {mean_delta_chars:.2f} caracteres alterados por exemplo")
print(f"Mediana: {float(np.median(deltas)):.0f} | Min: {int(np.min(deltas))} | Max: {int(np.max(deltas))}")


In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score

def _ensure_str_list(texts):
    if isinstance(texts, str):
        return [texts]
    if hasattr(texts, "to_pylist"):
        texts = texts.to_pylist()
    elif hasattr(texts, "tolist"):
        texts = texts.tolist()
    elif not isinstance(texts, (list, tuple)) and hasattr(texts, "__iter__"):
        texts = list(texts)

    out = []
    for t in texts:
        if t is None:
            out.append("")
        elif isinstance(t, str):
            out.append(t)
        elif isinstance(t, (list, tuple, np.ndarray)):
            out.append(" ".join(map(str, t)))
        else:
            out.append(str(t))
    return out

def predict_with_conf(texts, y_true):
    texts = _ensure_str_list(texts)
    tok = loaded_tokenizer(texts, truncation=True, padding=True, max_length=256, return_tensors="pt").to(device)
    with torch.no_grad():
        logits = loaded_model(**tok).logits
    probs = F.softmax(logits, dim=1).detach().cpu().numpy()
    y_pred = probs.argmax(axis=1)
    conf = probs.max(axis=1)
    return y_pred, conf

# subset para relat√≥rio
N = 300
subset_rep = test_subset.select(range(min(N, len(test_subset))))
texts_orig = _ensure_str_list(subset_rep["text"])
y_true = np.array(subset_rep["label"])

# baseline
y_pred0, conf0 = predict_with_conf(texts_orig, y_true)
acc0 = accuracy_score(y_true, y_pred0)

rows = []
ex_rows = []

# mesmos √≠ndices de exemplo em todos cen√°rios
example_idx = [0, 1, 2]

for name, fn in scenarios.items():
    texts_pert = [fn(t) for t in texts_orig]
    y_pred, conf = predict_with_conf(texts_pert, y_true)

    acc = accuracy_score(y_true, y_pred)
    flip_rate = float((y_pred != y_pred0).mean())
    mean_conf = float(conf.mean())
    mean_conf_correct = float(conf[y_pred == y_true].mean()) if (y_pred == y_true).any() else float("nan")

    rows.append({
        "cenario": name,
        "accuracy": float(acc),
        "delta_acc_vs_orig": float(acc - acc0),
        "flip_rate_vs_orig": flip_rate,
        "mean_conf": mean_conf,
        "mean_conf_on_correct": mean_conf_correct,
    })

    for i in example_idx:
        ex_rows.append({
            "cenario": name,
            "orig_prefix": texts_orig[i][:140].replace("\n", " "),
            "pert_prefix": texts_pert[i][:140].replace("\n", " "),
            "y_true": int(y_true[i]),
            "pred": int(y_pred[i]),
            "conf": float(conf[i]),
            "acerto": bool(y_pred[i] == y_true[i]),
        })

df_rob = pd.DataFrame(rows)
df_rob = df_rob.sort_values("cenario")
display(df_rob)

df_rob_examples = pd.DataFrame(ex_rows)
display(df_rob_examples)


In [None]:
import numpy as np
import random

random.seed(42)

texts_orig = test_subset["text"]
spam_fn = scenarios["Ru√≠do (Spam)"]

texts_spam = [spam_fn(t) for t in texts_orig]

def n_tokens(text: str) -> int:
    # tokens do modelo (melhor do que split em palavras)
    return len(loaded_tokenizer.encode(text, truncation=True, max_length=512))

tok_orig = [n_tokens(t) for t in texts_orig]
tok_spam = [n_tokens(t) for t in texts_spam]

delta_tokens = [s - o for o, s in zip(tok_orig, tok_spam)]
mean_delta_tokens = float(np.mean(delta_tokens))

print(f"Œî_tokens m√©dio (Spam): {mean_delta_tokens:.2f} tokens extras por exemplo (tokeniza√ß√£o do modelo)")
print(f"Mediana: {float(np.median(delta_tokens)):.0f} | Min: {int(np.min(delta_tokens))} | Max: {int(np.max(delta_tokens))}")


In [None]:
import torch
import torch.nn.functional as F
import pandas as pd
import numpy as np

def predict_with_conf(text_list):
    tok = loaded_tokenizer(text_list, truncation=True, padding=True, max_length=256, return_tensors="pt").to(device)
    with torch.no_grad():
        logits = loaded_model(**tok).logits
    probs = F.softmax(logits, dim=1).detach().cpu().numpy()
    pred = probs.argmax(axis=1)
    conf = probs.max(axis=1)
    return pred, conf

example_idx = [0, 1, 2]
rows = []

texts_orig = test_subset["text"]
y_true = np.array(test_subset["label"])

sc_to_show = ["Typos (5%)", "Caixa Alta (UPPER)", "Ru√≠do (Spam)"]

for scen in sc_to_show:
    fn = scenarios[scen]
    orig_ex = [texts_orig[i] for i in example_idx]
    pert_ex = [fn(texts_orig[i]) for i in example_idx]

    pred, conf = predict_with_conf(pert_ex)
    for k, i in enumerate(example_idx):
        rows.append({
            "cenario": scen,
            "orig_prefix": orig_ex[k][:140].replace("\n", " "),
            "pert_prefix": pert_ex[k][:140].replace("\n", " "),
            "y_true": int(y_true[i]),
            "pred": int(pred[k]),
            "conf": float(conf[k]),
            "acerto": bool(pred[k] == y_true[i]),
        })

df_rob_examples = pd.DataFrame(rows)
display(df_rob_examples)

for _, r in df_rob_examples.iterrows():
    acerto = "Sim" if r["acerto"] else "N√£o"
    print(f"{r['orig_prefix']} & {r['pert_prefix']} & {r['pred']} & {r['conf']:.3f} & {acerto} \\\\")


In [None]:
import difflib
import pandas as pd

def extract_insertions(orig: str, adv: str, max_len=180):
    sm = difflib.SequenceMatcher(None, orig, adv)
    inserts = []
    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        if tag == "insert":
            chunk = adv[j1:j2].strip()
            if chunk:
                inserts.append(chunk)
    if not inserts:
        return ""
    s = " | ".join(inserts)
    return (s[:max_len] + "...") if len(s) > max_len else s

pick1 = df_flip1.head(2).copy()
pick2 = df_flip2.head(1).copy()
picked = pd.concat([pick1, pick2], ignore_index=True)

rows = []
for _, r in picked.iterrows():
    i = int(r["i"])
    orig = texts[i]
    adv = texts_adv[i]
    ins = extract_insertions(orig, adv)

    rows.append({
        "i": i,
        "y_true": int(r["y_true"]),
        "pred0": int(r["pred_orig"]),
        "pred1": int(r["pred_adv"]),
        "conf0": float(r["conf_orig"]),
        "conf1": float(r["conf_adv"]),
        "dconf": float(r["delta_conf"]),
        "insercao": ins
    })

df_adv_examples = pd.DataFrame(rows)
display(df_adv_examples)

for _, r in df_adv_examples.iterrows():
    arrow = f"{r['pred0']}\\rightarrow{r['pred1']}"
    ins = r["insercao"].replace("&", "\\&")
    print(f"{r['i']} & {r['y_true']} & {arrow} & {r['conf0']:.3f} & {r['conf1']:.3f} & {r['dconf']:+.3f} & \\textit{{{ins}}} \\\\")


In [None]:
df_adv_examples = pd.concat([
    df_flip1.assign(ataque="OppositeSentimentInjection"),
    df_flip2.assign(ataque="NeutralTriggerWithSentimentWords"),
]).head(6)

display(df_adv_examples[[
    "ataque","i","y_true","pred_orig","pred_adv","conf_orig","conf_adv","delta_conf",
    "orig_suffix","adv_suffix"
]])


In [None]:
import matplotlib.pyplot as plt
import os

os.makedirs("figs", exist_ok=True)

tmp = df_local0.head(12).copy()

fig, ax = plt.subplots(figsize=(7,3.5))
ax.axis("off")
table = ax.table(cellText=tmp.values, colLabels=tmp.columns, loc="center")
table.auto_set_font_size(False)
table.set_fontsize(8)
table.scale(1, 1.2)
plt.tight_layout()
plt.savefig("figs/shap_local_ex0.png", dpi=200)
plt.show()
