In [1]:
import os
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModel, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
import random
import numpy as np

In [2]:
#Mudanças principais:
#1400 SEM VALIDAÇÃO CRUZADA
#Modelo Bertimbau Large: Alterado o model_name para 'neuralmind/bert-large-portuguese-cased'.

#LR= 3e-5.

#Descongelamento das camadas: Parametrizamos o número de camadas finais do BERT a descongelar, via unfreeze_layers. Por exemplo, se definirmos unfreeze_layers=8, descongelamos as últimas 8 camadas.

#Outros otimizadores e LR Schedulers: Mantemos o AdamW como otimizador principal, mas agora adicionamos um scheduler (get_linear_schedule_with_warmup do transformers) para ajustar a taxa de aprendizado durante o treino. Caso queira testar outro otimizador, basta substituir a linha do optimizador. Também deixamos comentado outro exemplo (SGD) para referência.
#Para testar diferentes taxas de aprendizado, basta alterar learning_rate no código.
#Para testar diferentes números de camadas a descongelar, altere unfreeze_layers.

In [3]:
# Semente para reprodutibilidade
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

In [4]:
# Configurações gerais
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Usando dispositivo: {device}')

# Parametrizações a serem testadas
model_name = 'neuralmind/bert-large-portuguese-cased'  # Bertimbau Large
learning_rate = 3e-5  # Pode alterar para 5e-5 e comparar
unfreeze_layers = 4   # Quantas últimas camadas descongelar? Pode variar (4, 8, 12, etc.)
nclasses = 2
nepochs = 10
batch_size = 8
batch_status = 32
early_stop = 5
max_length = 360
write_path = 'modelbB2'

Usando dispositivo: cuda


In [5]:
# Carregar os dados
data = pd.read_csv("DATAFRAME1400.csv")

In [6]:
# Divisão dos dados (ex: 80% treino, 10% val, 10% teste)
train_data, test_data = train_test_split(data, test_size=0.10, random_state=seed, stratify=data['contra'])
train_data, val_data = train_test_split(train_data, test_size=0.125, random_state=seed, stratify=train_data['contra'])

print(f"Tamanho do Treino: {len(train_data)}")
print(f"Tamanho da Validação: {len(val_data)}")
print(f"Tamanho do Teste: {len(test_data)}")

Tamanho do Treino: 1177
Tamanho da Validação: 169
Tamanho do Teste: 150


In [7]:
# Dataset
class CustomDataset(Dataset):
    def __init__(self, data, tokenizer, max_length):
        self.data = data.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]['text']
        label = self.data.iloc[idx]['contra']
        inputs = self.tokenizer(text, return_tensors='pt',
                                padding='max_length', truncation=True,
                                max_length=self.max_length)
        return {key: val.squeeze(0) for key, val in inputs.items()}, torch.tensor(label)



In [8]:
# Modelo
class CustomBERTModel(nn.Module):
    def __init__(self, model_name, nclasses, unfreeze_layers):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, nclasses)

        # Congelar tudo inicialmente
        for param in self.bert.parameters():
            param.requires_grad = False

        # Descongelar as últimas 'unfreeze_layers' camadas
        # A estrutura: self.bert.encoder.layer é uma lista de camadas
        # Se unfreeze_layers=4, descongela as últimas 4:
        if unfreeze_layers > 0:
            for param in self.bert.encoder.layer[-unfreeze_layers:].parameters():
                param.requires_grad = True

    def forward(self, input_ids, attention_mask, token_type_ids=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs.pooler_output
        dropped_out = self.dropout(pooled_output)
        logits = self.classifier(dropped_out)
        return logits


In [9]:
# Inicializar o tokenizador e modelo
tokenizer = AutoTokenizer.from_pretrained(model_name, do_lower_case=False)
model = CustomBERTModel(model_name, nclasses, unfreeze_layers).to(device)



In [10]:
# Otimizador (AdamW)
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate)

#  Exemplo de outro otimizador: optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate, momentum=0.9)


In [11]:
# Função de perda (como as classes estão balanceadas, pode usar peso 1:1 ou apenas None)
loss_fn = nn.CrossEntropyLoss()

In [12]:
# Datasets e Dataloaders
train_dataset = CustomDataset(train_data, tokenizer, max_length)
val_dataset = CustomDataset(val_data, tokenizer, max_length)
test_dataset = CustomDataset(test_data, tokenizer, max_length)

traindata = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valdata = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
testdata = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



In [13]:
# Número total de steps para o scheduler
total_steps = len(traindata) * nepochs

# Scheduler (Linear Warmup and Decay)
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps=int(0.1 * total_steps), 
                                            num_training_steps=total_steps)


In [14]:
# Número total de steps para o scheduler
total_steps = len(traindata) * nepochs

# Scheduler (Linear Warmup and Decay)
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps=int(0.1 * total_steps), 
                                            num_training_steps=total_steps)

def evaluate(model, dataloader):
    model.eval()
    y_real, y_pred = [], []
    with torch.no_grad():
        for batch_idx, (inputs, labels) in enumerate(dataloader):
            inputs = {key: val.to(device) for key, val in inputs.items()}
            labels = labels.to(device)
            logits = model(**inputs)
            pred_labels = torch.argmax(logits, 1)

            y_real.extend(labels.cpu().tolist())
            y_pred.extend(pred_labels.cpu().tolist())

    f1 = f1_score(y_real, y_pred, average='weighted')
    acc = accuracy_score(y_real, y_pred)
    return f1, acc, (y_real, y_pred)

if not os.path.exists(write_path):
    os.makedirs(write_path)

max_f1, repeat = 0, 0
for epoch in range(nepochs):
    model.train()
    losses = []
    for batch_idx, (inputs, labels) in enumerate(traindata):
        inputs = {key: val.to(device) for key, val in inputs.items()}
        labels = labels.to(device)

        logits = model(**inputs)
        loss = loss_fn(logits, labels)
        losses.append(float(loss))

        # Backprop
        loss.backward()
        optimizer.step()
        scheduler.step()  # Atualiza LR conforme o scheduler
        optimizer.zero_grad()

        if (batch_idx + 1) % batch_status == 0:
            print(f'Epoch: {epoch} [{batch_idx + 1}/{len(traindata)}]\tLoss: {loss:.6f}')

    # Avaliação no conjunto de validação
    f1_val, acc_val, _ = evaluate(model, valdata)
    print(f'Epoch {epoch} - Val F1: {f1_val:.4f}, Val Accuracy: {acc_val:.4f}')

    # Early Stopping baseado no Val F1
    if f1_val > max_f1:
        torch.save(model.state_dict(), os.path.join(write_path, 'best_model.pth'))
        max_f1 = f1_val
        repeat = 0
        print('Novo melhor modelo salvo.')
    else:
        repeat += 1

    if repeat == early_stop:
        print('Early stopping atingido.')
        break

# Avaliação no conjunto de teste
model.load_state_dict(torch.load(os.path.join(write_path, 'best_model.pth')))
f1_test, acc_test, (y_real, y_pred) = evaluate(model, testdata)
print("Desempenho no conjunto de teste:")
print(classification_report(y_real, y_pred, target_names=['0', '1']))
print(f"F1 (teste): {f1_test:.4f}, Accuracy (teste): {acc_test:.4f}")

Epoch: 0 [32/148]	Loss: 0.664921
Epoch: 0 [64/148]	Loss: 0.584619
Epoch: 0 [96/148]	Loss: 0.703176
Epoch: 0 [128/148]	Loss: 0.487228
Epoch 0 - Val F1: 0.8520, Val Accuracy: 0.8521
Novo melhor modelo salvo.
Epoch: 1 [32/148]	Loss: 0.527751
Epoch: 1 [64/148]	Loss: 0.537209
Epoch: 1 [96/148]	Loss: 0.446487
Epoch: 1 [128/148]	Loss: 0.470063
Epoch 1 - Val F1: 0.8446, Val Accuracy: 0.8462
Epoch: 2 [32/148]	Loss: 0.164029
Epoch: 2 [64/148]	Loss: 0.379126
Epoch: 2 [96/148]	Loss: 0.159345
Epoch: 2 [128/148]	Loss: 0.041814
Epoch 2 - Val F1: 0.8699, Val Accuracy: 0.8698
Novo melhor modelo salvo.
Epoch: 3 [32/148]	Loss: 0.087921
Epoch: 3 [64/148]	Loss: 0.139232
Epoch: 3 [96/148]	Loss: 0.047766
Epoch: 3 [128/148]	Loss: 0.070944
Epoch 3 - Val F1: 0.8994, Val Accuracy: 0.8994
Novo melhor modelo salvo.
Epoch: 4 [32/148]	Loss: 0.038919
Epoch: 4 [64/148]	Loss: 0.014150
Epoch: 4 [96/148]	Loss: 0.077177
Epoch: 4 [128/148]	Loss: 0.581271
Epoch 4 - Val F1: 0.9053, Val Accuracy: 0.9053
Novo melhor modelo sal

  model.load_state_dict(torch.load(os.path.join(write_path, 'best_model.pth')))


Desempenho no conjunto de teste:
              precision    recall  f1-score   support

           0       0.88      0.90      0.89        79
           1       0.88      0.86      0.87        71

    accuracy                           0.88       150
   macro avg       0.88      0.88      0.88       150
weighted avg       0.88      0.88      0.88       150

F1 (teste): 0.8799, Accuracy (teste): 0.8800
