# Processamento de linguagem natural com RNN usando Pytorch

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import os
import re
import random

In [2]:
!wget http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xzf aclImdb_v1.tar.gz

--2025-01-30 01:07:48--  http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
Resolving ai.stanford.edu (ai.stanford.edu)... 171.64.68.10
Connecting to ai.stanford.edu (ai.stanford.edu)|171.64.68.10|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84125825 (80M) [application/x-gzip]
Saving to: ‘aclImdb_v1.tar.gz’


2025-01-30 01:07:51 (29.2 MB/s) - ‘aclImdb_v1.tar.gz’ saved [84125825/84125825]



In [3]:
SEED = 1234
BATCH_SIZE = 64
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_EPOCHS = 5

In [4]:
random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [6]:
def preprocess_text(text):
  text = re.sub(r'[^\w\s]', '',text.lower().strip())
  return text.split()

In [7]:
class IMDBDataset(Dataset):
    def __init__(self, data_dir):
        self.samples = []
        for label in ['pos', 'neg']:
            dir_path = os.path.join(data_dir, label)
            if not os.path.exists(dir_path):
                raise FileNotFoundError(f"Diretório {dir_path} não encontrado.")
            for filename in os.listdir(dir_path):
                file_path = os.path.join(dir_path, filename)
                if os.path.isfile(file_path):
                    with open(file_path, 'r', encoding='utf-8') as file:
                        text = file.read()
                        self.samples.append((text, 1.0 if label == 'pos' else 0.0))
        print(f"{len(self.samples)} amostras carregadas para {data_dir}")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        text, label = self.samples[idx]
        tokens = preprocess_text(text)
        return tokens, torch.tensor(label, dtype=torch.float32)


In [8]:
def build_vocab(dataset):
    vocab = {"<pad>": 0, "<unk>": 1}
    idx = 2
    for i in range(len(dataset)):
        tokens, _ = dataset[i]
        for token in tokens:
            if token not in vocab:
                vocab[token] = idx
                idx += 1
    return vocab

In [9]:
def text_to_indices(tokens, vocab):
    return [vocab.get(token, vocab["<unk>"]) for token in tokens]

In [10]:
def collate_batch(batch):
    texts, labels = zip(*batch)
    indices_list = [torch.tensor(text_to_indices(tokens, vocab), dtype=torch.long) for tokens in texts]
    padded_texts = nn.utils.rnn.pad_sequence(indices_list, batch_first=True, padding_value=vocab["<pad>"])
    return padded_texts.to(device), torch.tensor(labels, dtype=torch.float32).to(device)

In [11]:
train_data_dir = "aclImdb/train"
test_data_dir =  "aclImdb/test"

train_dataset = IMDBDataset(train_data_dir)
valid_dataset = IMDBDataset(test_data_dir)

25000 amostras carregadas para aclImdb/train
25000 amostras carregadas para aclImdb/test


In [12]:
vocab = build_vocab(train_dataset)

In [13]:
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, collate_fn=collate_batch)


In [14]:
class SentimentRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        embedded = self.embedding(text)
        output, (hidden, cell) = self.rnn(embedded)
        hidden = hidden[-1]
        output = self.fc(hidden)
        return self.sigmoid(output)


In [15]:
INPUT_DIM = len(vocab)
model = SentimentRNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM).to(device)

In [16]:
optimizer = optim.Adam(model.parameters())
criterion = nn.BCELoss().to(device)

In [17]:
def train(model, dataloader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for text, labels in dataloader:
        optimizer.zero_grad()
        predictions = model(text).squeeze(1)
        loss = criterion(predictions, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

In [18]:
def evaluate(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for text, labels in dataloader:
            predictions = model(text).squeeze(1)
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

In [19]:
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_dataloader, optimizer, criterion)
    valid_loss = evaluate(model, valid_dataloader, criterion)
    print(f'Epoch {epoch+1}/{N_EPOCHS} | Train Loss: {train_loss:.3f} | Val Loss: {valid_loss:.3f}')

Epoch 1/5 | Train Loss: 0.694 | Val Loss: 0.693
Epoch 2/5 | Train Loss: 0.693 | Val Loss: 0.693
Epoch 3/5 | Train Loss: 0.693 | Val Loss: 0.693
Epoch 4/5 | Train Loss: 0.692 | Val Loss: 0.693
Epoch 5/5 | Train Loss: 0.691 | Val Loss: 0.693
