# Tarefa A
Na tarefa um, o objetivo vai ser: Dado um título e uma mensagem, queremos classificar qual a inclinação política de determinada pessoa.

Vamos separar a tarefa em alguns passos:

1. Criando ambiente virtual e importando bibliotecas
2. Importar os dados
3. Tokenizar os dados
4. Usar o encoder para obter a representação vetorial das features
5. Treinar o modelo de rede neural para classificação
6. Métricas de avaliação
7. Testes empíricos

# Criando ambiente virtual e importando bibliotecas

Para isso, rode os seguintes comandos:

> python3 -m venv .venv
> pip install -r requiriments.txt

# 2) Importar os dados

Os dados estão em arquivos em formato de json. Vamos criar uma função que pega esses dados e importa como um map.

In [1]:
def import_json(path):
    from json import loads
    from os import listdir

    names = list(
        filter(
            lambda x: x if not x.startswith(".") else None,
            listdir(path)
        )
    )

    data = []
    for name in names:
        file_name = f"{path}/{name}"
        text = "".join(open(file_name).readlines())
        json = loads(text)
        data.append({
            "title":json["title"],
            "content":json["content"],
            "label":json["label"],
        })
    return data

# 3) Tokeniza os dados

Queremos tokenizar os dados que acabamos de importar, por isso, vamos criar uma função que vai fazer essa tokenização por nós.

In [8]:
from transformers import AutoTokenizer

MODEL_NAME = "microsoft/deberta-v3-base"
MAX_LENGTH_TOKENS = 256
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

def tokenize(string):
    return tokenizer(
        string,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=MAX_LENGTH_TOKENS,
    )



# 4) Usar o encoder para obter a representação vetorial das features

Antes de tudo, vamos escolher o dispositivo que usaremos para fazer os cálculos:

In [3]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")

Antes de iniciar o processo de formatação dos dados, vamos criar uma função que pega a representação dos tokens e encodifica na representação do nosso modelo. 

Vamos carregar o nosso modelo e associar a um dispositivo. Além disso, vamos colocar o modelo em modo de inferência, não de treinamento (para o Autocad não guardar o grafo computacional do gradiente).

In [4]:
from transformers import AutoModel

model_encode = AutoModel.from_pretrained(MODEL_NAME)
model_encode.to(device)

for param in model_encode.parameters():
    param.requires_grad = False
model_encode.eval()

DebertaV2Model(
  (embeddings): DebertaV2Embeddings(
    (word_embeddings): Embedding(128100, 768, padding_idx=0)
    (LayerNorm): LayerNorm((768,), eps=1e-07, elementwise_affine=True)
    (dropout): StableDropout()
  )
  (encoder): DebertaV2Encoder(
    (layer): ModuleList(
      (0-11): 12 x DebertaV2Layer(
        (attention): DebertaV2Attention(
          (self): DisentangledSelfAttention(
            (query_proj): Linear(in_features=768, out_features=768, bias=True)
            (key_proj): Linear(in_features=768, out_features=768, bias=True)
            (value_proj): Linear(in_features=768, out_features=768, bias=True)
            (pos_dropout): StableDropout()
            (dropout): StableDropout()
          )
          (output): DebertaV2SelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (LayerNorm): LayerNorm((768,), eps=1e-07, elementwise_affine=True)
            (dropout): StableDropout()
          )
        )
        (intermedia

Vamos agora criar o código para encodificar os tokens com a saída contextual da última camada do modelo.

In [5]:
def encode(tokens):
    tokens = {k: v.to(model_encode.device) for k, v in tokens.items()}
    return model_encode(**tokens).last_hidden_state

Note que o DeBERTa tem uma representação vetorial relativa a quantidade de tokens. Precisamos juntar tudo isso em um único vetor. Vamos fazer isso utilizando uma técnica de pooling que utiliza média. 

In [6]:
def mean_pooling(last_hidden_state, attention_mask):
    mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float().to(device)
    summed = (last_hidden_state * mask).sum(1)
    counts = mask.sum(1).clamp(min=1e-9)
    return (summed / counts).squeeze(1)

Vamos criar agora o código que faz a leitura de todos os dados dos arquivos json, aplica a tokenização e pega a representação vetorial contextual de cada par de título e mensagem. Vamos colocar tudo em um único map com também a label. Essa etapa inteira é para a normalização dos dados para o treinamento. As separações de dataset de treinamento, desenvolvimento e teste serão feitos no treinamento.

In [11]:
NORMALIZED_SUFIX_FILE = "obj"
DATA_PATH = "../data/"

In [32]:
paths = [
    "dev_json",
    "train_json"
]
c = 0
for path in paths:
    data = import_json(path)
    with torch.no_grad():
        for d in data:
            tokenized_title = tokenize(d["title"])
            tokenized_content = tokenize(d["content"])
            
            title = mean_pooling(
                encode(tokenized_title),
                tokenized_title["attention_mask"]
            ).squeeze()
            
            content = mean_pooling(
                encode(tokenized_content),
                tokenized_content["attention_mask"]
            ).squeeze()

            example = {
                "title":title,
                "content":content,
                "label":torch.tensor(d["label"], dtype=torch.long)
            }

            torch.save(
                example,
                DATA_PATH + NORMALIZED_SUFIX_FILE + str(c)
            )
            c += 1

# 5) Treinar o modelo de rede neural para classificação

Vamos inicialmente preparar o dataset para o treinamento.

In [None]:
import os

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, files, device):
        self.device = device
        self.files = files

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        path = self.files[idx]
        example = torch.load(
            self.files[idx], 
            map_location=self.device
        )
        return example["title"], example["content"], example["label"]


files = [os.path.join(DATA_PATH, f) for f in os.listdir(DATA_PATH)]

train_files = files[:len(files)//11 * 9]
dev_files = files[len(files)//11 * 9:len(files)//11 * 10]
test_files = files[len(files)//11 * 10:]
del files

train_set = MyDataset(train_files, device)
test_set = MyDataset(test_files, device)
dev_set = MyDataset(dev_files, device)

BATCH_SIZE = 256
train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=BATCH_SIZE, 
    shuffle=True
)

dev_loader = torch.utils.data.DataLoader(
    dev_set,
    batch_size=BATCH_SIZE,
    shuffle=False
)

test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=BATCH_SIZE,
    shuffle=False
)

with open("json_model_data.json", "r") as f:
    from json import load
    modelB_file = load(f)
modelB_set = MyDataset(dev_files, device)
modelB_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=BATCH_SIZE,
    shuffle=False
)

Em seguida, vamos criar o modelo. Ele vai receber o vetor de contexto do título e da mensagem e devolver os logits da última camada (para utilizar, posteriormente, o softmax).

In [13]:
class ClassifierModel(torch.nn.Module):
    def __init__(self, hidden=768):
        super().__init__()
        
        dropout= 0.2
        input_dim = hidden * 2
        self.dropout = torch.nn.Dropout(dropout)
        self.classifier = torch.nn.Sequential(
            torch.nn.Linear(input_dim, hidden),
            torch.nn.GELU(),
            torch.nn.LayerNorm(hidden),
            torch.nn.Dropout(dropout),
            torch.nn.Linear(hidden, hidden // 2),
            torch.nn.GELU(),
            torch.nn.Linear(hidden // 2, 3)
        )

    def forward(self, title_emb, message_emb):
        x = torch.cat([title_emb, message_emb], dim=-1)
        x = self.dropout(x)
        logits = self.classifier(x)
        return logits

classifier = ClassifierModel().to(device)

Já temos uma classe que gerencia o dataset e pega os dados já transformados. Já temos o modelo de inferência. Resta agora preparar alguns aspectos de arquitetura para o treinamento. Precisamos definir:

1. Função de Custo
2. Otimizador
3. Escalonador

In [91]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(classifier.parameters(), lr=2e-3, weight_decay=1e-2)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

Agora, resta treinar o modelo. Vamos preparar um treinamento em batch com earling stop:

In [17]:
NUM_EPOCHS = 50

In [92]:
import numpy as np

all_preds = []
all_labels = []
patience = 5
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(NUM_EPOCHS):
    classifier.train()
    epoch_loss = 0
    epoch_preds = []
    epoch_labels = []
    
    for title, content, label in train_loader:
        title, content, label = title.to(device), content.to(device), label.to(device)
        optimizer.zero_grad()

        logits = classifier(title, content)
        loss = criterion(logits, label)
        
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        
        preds = torch.argmax(logits, dim=1).cpu().numpy()
        labels_np = label.cpu().numpy()
        epoch_preds.extend(preds)
        epoch_labels.extend(labels_np)
    
    accuracy = np.mean(np.array(epoch_preds) == np.array(epoch_labels))
    precision = np.mean([np.mean(np.array(epoch_preds)[np.array(epoch_labels) == i] == i) for i in range(3) if np.sum(np.array(epoch_labels) == i) > 0])
    recall = np.mean([np.mean(np.array(epoch_preds)[np.array(epoch_labels) == i] == i) for i in range(3) if np.sum(np.array(epoch_labels) == i) > 0])
    
    avg_loss = epoch_loss / len(train_loader)
    
    classifier.eval()
    val_loss = 0
    val_preds = []
    val_labels = []
    
    with torch.no_grad():
        for title, content, label in dev_loader:
            title, content, label = title.to(device), content.to(device), label.to(device)
            logits = classifier(title, content)
            loss = criterion(logits, label)
            
            val_loss += loss.item()
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            labels_np = label.cpu().numpy()
            val_preds.extend(preds)
            val_labels.extend(labels_np)
    
    avg_val_loss = val_loss / len(dev_loader)
    val_accuracy = np.mean(np.array(val_preds) == np.array(val_labels))
    
    print(f"epoch {epoch} — train_loss {avg_loss:.4f} — accuracy {accuracy:.4f} — precision {precision:.4f} — recall {recall:.4f} — val_loss {avg_val_loss:.4f} — val_acc {val_accuracy:.4f}")
    
    # Early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
        torch.save(classifier.state_dict(), "classifier_model.pt")
        print(f"  ✓ Melhor modelo salvo (val_loss: {avg_val_loss:.4f})")
    else:
        patience_counter += 1
        print(f"  → Sem melhora ({patience_counter}/{patience})")
        if patience_counter >= patience:
            print(f"\n⛔ Early stopping acionado após {epoch + 1} epochs!")
            break
    
    scheduler.step()

print("\nModelo final carregado: classifier_model.pt")

epoch 0 — train_loss 0.9241 — accuracy 0.5702 — precision 0.5533 — recall 0.5533 — val_loss 0.7912 — val_acc 0.6421
  ✓ Melhor modelo salvo (val_loss: 0.7912)
epoch 1 — train_loss 0.8054 — accuracy 0.6421 — precision 0.6296 — recall 0.6296 — val_loss 0.8727 — val_acc 0.6085
  → Sem melhora (1/5)
epoch 2 — train_loss 0.7712 — accuracy 0.6585 — precision 0.6464 — recall 0.6464 — val_loss 0.7159 — val_acc 0.6874
  ✓ Melhor modelo salvo (val_loss: 0.7159)
epoch 3 — train_loss 0.7347 — accuracy 0.6785 — precision 0.6689 — recall 0.6689 — val_loss 0.7244 — val_acc 0.6982
  → Sem melhora (1/5)
epoch 4 — train_loss 0.7081 — accuracy 0.6946 — precision 0.6852 — recall 0.6852 — val_loss 0.9275 — val_acc 0.6257
  → Sem melhora (2/5)
epoch 5 — train_loss 0.7018 — accuracy 0.7056 — precision 0.6964 — recall 0.6964 — val_loss 0.7117 — val_acc 0.6913
  ✓ Melhor modelo salvo (val_loss: 0.7117)
epoch 6 — train_loss 0.6847 — accuracy 0.7091 — precision 0.6994 — recall 0.6994 — val_loss 0.6745 — val_acc 

In [None]:
# 6) Métricas de avaliação

Vamos agora utilizar métricas de avaliação do modelo.

In [15]:
classifier.load_state_dict(torch.load("classifier_model.pt"))

<All keys matched successfully>

In [None]:
import numpy as np
num_epochs = len(test_set) // BATCH_SIZE

all_preds = []
all_labels = []
best_val_loss = float('inf')
for epoch in range(NUM_EPOCHS):
    classifier.eval()
    val_loss = 0
    val_preds = []
    val_labels = []
    
    with torch.no_grad():
        for title, content, label in test_loader:
            title, content, label = title.to(device), content.to(device), label.to(device)
            logits = classifier(title, content)
            
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            labels_np = label.cpu().numpy()
            
            all_preds.extend(preds)
            all_labels.extend(labels_np)

test_accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
test_precision = np.mean([np.mean(np.array(all_preds)[np.array(all_labels) == i] == i) for i in range(3) if np.sum(np.array(all_labels) == i) > 0])
test_recall = np.mean([np.mean(np.array(all_preds)[np.array(all_labels) == i] == i) for i in range(3) if np.sum(np.array(all_labels) == i) > 0])

In [38]:
print(f"accuracy {test_accuracy:.4f} — precision {test_precision:.4f} — recall {test_recall:.4f}")

accuracy 0.7276 — precision 0.7198 — recall 0.7198


Vamos tentar verificar agora se tem alguma diferença de performance relacionada ao alinhamento político. Para fazer isso vamos utilizar macroaveraging.

In [None]:
# y é o valor predito e x é o valor real
confuse_matrix = np.zeros([3,3])
for i in range(len(all_labels)):
    confuse_matrix[all_preds[i]][all_labels[i]] += 1 

def precision(matrix, i): return matrix[i][i] / (matrix[0][i] + matrix[1][i] + matrix[2][i])
def recall(matrix, i): return matrix[i][i] / (matrix[i][0] + matrix[i][1] + matrix[i][2])
def fmeasure(p, r, b): return (b**2 + 1) * p*r / (b**2 * p + r)

for i in range(3):
    p = precision(confuse_matrix, i)
    r = recall(confuse_matrix, i)
    b = 1
    print(f"{i} -> \nprecision ({p})")
    print(f'recall ({r})')
    print(f'f-measure ({fmeasure(p, r, b)})')
    print()

0 -> 
precision (0.6528925619834711)
recall (0.7022222222222222)
f-measure (0.676659528907923)

1 -> 
precision (0.731457800511509)
recall (0.7606382978723404)
f-measure (0.7457627118644068)

2 -> 
precision (0.7750281214848144)
recall (0.7165886635465418)
f-measure (0.7446636044312347)



In [None]:
# 7) Testes empíricos

Agora podemos brincar com o modelo! Vamos fazer, inicialmente, uma função que dá a resposta:

In [93]:
label_map = [
    "esquerda",
    "centro",
    "direita"
]

def predict(content, title, return_probs=False, label_map=None):
    tokenized_title = tokenize(title)
    tokenized_content = tokenize(content)

    title_emb = mean_pooling(
        encode(tokenized_title),
        tokenized_title["attention_mask"]
    )
    content_emb = mean_pooling(
        encode(tokenized_content),
        tokenized_content["attention_mask"]
    )

    with torch.no_grad():
        logits = classifier(title_emb, content_emb)
        probs = torch.softmax(logits, dim=1).cpu().numpy()
        pred_idx = int(probs.argmax(axis=1)[0])

    return label_map[pred_idx]

In [94]:
content = "News\n\nThe Number Of Desperate Immigrants Who Die While Trying To Get Into The US Keeps Rising"
title = "The questions that 800,000 people are waiting for Trump and Jeff Sessions to answer about DACA"
print(predict(content, title, label_map=label_map))

esquerda


# Parte dedicada a testar o modelo B

In [None]:
with open("json_model_data.json", "r") as f:
    from json import load
    modelB_file = load(f)
modelB_set = MyDataset(dev_files, device)
modelB_loader = torch.utils.data.DataLoader(
    modelB_set,
    batch_size=BATCH_SIZE,
    shuffle=False
)

In [None]:
import numpy as np

def eval(loader):
    num_epochs = len(loader) // BATCH_SIZE
    all_preds = []
    all_labels = []
    best_val_loss = float('inf')
    for epoch in range(NUM_EPOCHS):
        classifier.eval()
        val_loss = 0
        val_preds = []
        val_labels = []
        
        with torch.no_grad():
            for title, content, label in loader:
                title, content, label = title.to(device), content.to(device), label.to(device)
                logits = classifier(title, content)
                
                preds = torch.argmax(logits, dim=1).cpu().numpy()
                labels_np = label.cpu().numpy()
                
                all_preds.extend(preds)
                all_labels.extend(labels_np)

    test_accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
    test_precision = np.mean([np.mean(np.array(all_preds)[np.array(all_labels) == i] == i) for i in range(3) if np.sum(np.array(all_labels) == i) > 0])
    test_recall = np.mean([np.mean(np.array(all_preds)[np.array(all_labels) == i] == i) for i in range(3) if np.sum(np.array(all_labels) == i) > 0])
    confuse_matrix = np.zeros([3,3])
    for i in range(len(all_labels)):
        confuse_matrix[all_preds[i]][all_labels[i]] += 1 
    for i in range(3):
        p = precision(confuse_matrix, i)
        r = recall(confuse_matrix, i)
        b = 1
        print(f"{i} -> \nprecision ({p})")
        print(f'recall ({r})')
        print(f'f-measure ({fmeasure(p, r, b)})')
        print()