In [None]:
!pip install datasets

In [2]:
from torch.optim import Adam
from torch.optim.lr_scheduler import LambdaLR
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch
from tokenizers import ByteLevelBPETokenizer
from datasets import load_dataset
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from torch.amp import autocast, GradScaler

import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
class SST2Dataset(Dataset):
    def __init__(self, data, tokenizer, max_len=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sentence = self.data[idx]['sentence']
        label = self.data[idx]['label']
        encoded = self.tokenizer.encode(sentence).ids
        encoded = encoded[:self.max_len] + [0] * (self.max_len - len(encoded)) if len(encoded) < self.max_len else encoded[:self.max_len]
        return {"input": torch.tensor(encoded), "label": torch.tensor(label)}


In [4]:
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_dim, inner_dim, num_layers, num_classes, max_seq_len=512, dropout=0.1):
        super(TransformerClassifier, self).__init__()

        # Embeddings 
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Parameter(torch.zeros(1, max_seq_len, embed_size))
        self.dropout = nn.Dropout(dropout)

        # Layers
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=embed_size,
                nhead=num_heads,
                dim_feedforward=inner_dim,
                activation='gelu',  
                dropout=dropout,
                batch_first=True
            ) for _ in range(num_layers)
        ])
        self.transformer_encoder = nn.TransformerEncoder(
            nn.ModuleList(self.layers), num_layers=num_layers
        )

        self.fc = nn.Linear(embed_size, num_classes)

    def forward(self, texts):
        seq_len = texts.size(1)
        # Add embeddings
        embeddings = self.embedding(texts) + self.position_embedding[:, :seq_len, :]
        x = self.dropout(embeddings)

        # Transformer Layers
        for layer in self.layers:
            x = layer(x)

        # Pooling 
        x = x.mean(dim=1)  
        return self.fc(x)


In [5]:
# Scheduler
def lr_lambda(current_step: int, warmup_steps: int, total_steps: int):
    if current_step < warmup_steps:
        return current_step / warmup_steps
    # Convert the calculation 
    return max(0.0, 0.5 * (1 + torch.cos(torch.tensor(torch.pi * (current_step - warmup_steps) / (total_steps - warmup_steps), device=device))))

In [6]:
def pretrain(model, data_loader, optimizer, scheduler, epochs=100):
    model.train()
    scaler = GradScaler()  # Mixed precision

    for epoch in range(epochs):
        total_loss = 0
        for batch in data_loader:
            inputs, labels = batch["input"].to(device), batch["label"].to(device)
            optimizer.zero_grad()

            # Forward pass with autocast for mixed precision
            with autocast('cuda'):
                predictions = model(inputs)
                loss = nn.CrossEntropyLoss()(predictions, labels)

            # Backward pass with GradScaler
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            scheduler.step()
            total_loss += loss.item()

        print(f"Epoch {epoch + 1}, Loss: {total_loss / len(data_loader)}")

In [7]:
def fine_tune(model, train_loader, epochs=3):
    optimizer = Adam(model.parameters(), lr=6.25e-5)
    total_steps = len(train_loader) * epochs
    warmup_steps = int(0.002 * total_steps)
    scheduler = LambdaLR(optimizer, lr_lambda=lambda step: lr_lambda(step, warmup_steps=warmup_steps, total_steps=total_steps))

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch in train_loader:
            inputs, labels = batch["input"].to(
                device), batch["label"].to(device)
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = nn.CrossEntropyLoss()(outputs, labels)

            # Backward pass
            loss.backward()
            optimizer.step()
            scheduler.step()
            total_loss += loss.item()
        print(f"Fine-tuning Epoch {epoch + 1}, Loss: {total_loss / len(train_loader)}")

In [19]:
def evaluate(model, loader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for batch in loader:
            inputs, labels = batch["input"].to(device), batch["label"].to(device)       
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    return accuracy_score(all_labels, all_preds)

In [12]:
ds = load_dataset("stanfordnlp/sst2")

train_list = list(ds['train'])

train_data, temp_data = train_test_split(train_list[:7000], test_size=0.40, random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.50, random_state=42)

tokenizer = ByteLevelBPETokenizer()
tokenizer.train_from_iterator( (example["sentence"] for example in train_data),
    vocab_size=40000,
    min_frequency=2,
    special_tokens=["<s>", "<pad>", "</s>", "<unk>", "<mask>"],
)

# Parameters
vocab_size = len(tokenizer.get_vocab())
embed_size = 768
num_heads = 12
inner_dim = 3072
num_layers = 12
num_classes = 2  
max_seq_len = 512

model = TransformerClassifier(vocab_size, embed_size, num_heads, inner_dim, inner_dim, num_layers, num_classes, max_seq_len).to(device)

# Define Optimizer
optimizer = Adam(model.parameters(), lr=2.5e-4, weight_decay=0.01)
scheduler = LambdaLR(optimizer, lr_lambda=lambda step: lr_lambda(step, warmup_steps=2000, total_steps=50000))



In [13]:
# Set loader
train_dataset = SST2Dataset(train_data, tokenizer)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [15]:
# Pre train
print("Starting pre-training...")
torch.cuda.empty_cache()
pretrain(model, train_loader, optimizer, scheduler, epochs=100)

Iniciando o pré-treinamento...
Epoch 1, Loss: 0.712254726525509
Epoch 2, Loss: 0.7084860368208452
Epoch 3, Loss: 0.7006478743119673
Epoch 4, Loss: 0.7058527975371389
Epoch 5, Loss: 0.7060796853267786
Epoch 6, Loss: 0.7016678434429746
Epoch 7, Loss: 0.702401132294626
Epoch 8, Loss: 0.7057624585700758
Epoch 9, Loss: 0.7044334411621094
Epoch 10, Loss: 0.7020141139174952
Epoch 11, Loss: 0.7005141865123402
Epoch 12, Loss: 0.7119928995768229
Epoch 13, Loss: 0.70325163638953
Epoch 14, Loss: 0.7000496026241418
Epoch 15, Loss: 0.694074688535748
Epoch 16, Loss: 0.695856845740116
Epoch 17, Loss: 0.6971529758337772
Epoch 18, Loss: 0.696790463996656
Epoch 19, Loss: 0.6942576206091678
Epoch 20, Loss: 0.6884179548783735
Epoch 21, Loss: 0.689452431418679
Epoch 22, Loss: 0.6873941710501006
Epoch 23, Loss: 0.6875828829678622
Epoch 24, Loss: 0.687968398585464
Epoch 25, Loss: 0.6876784815932765
Epoch 26, Loss: 0.6870047829367898
Epoch 27, Loss: 0.6869183164654356
Epoch 28, Loss: 0.6872158628521543
Epoch 2

In [16]:
# Fine-tuning
val_dataset = SST2Dataset(val_data, tokenizer)
val_loader = DataLoader(val_dataset, batch_size=10, shuffle=True)

print("Starting fine-tuning...")
fine_tune(model, val_loader, epochs=3)

Iniciando o fine-tuning...
Fine-tuning Epoch 1, Loss: 0.6876651614904403
Fine-tuning Epoch 2, Loss: 0.6876285629613059
Fine-tuning Epoch 3, Loss: 0.6875717661210469


In [20]:
# Evaluating
test_dataset = SST2Dataset(test_data, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)

accuracy = evaluate(model, test_loader)


print(f"Accuracy on the test set: {accuracy * 100:.2f}%")

Acurácia no conjunto de teste: 54.79%
