In [102]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import SST2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import GloVe
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

In [103]:
# Configure the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [104]:
# Set up tokenization
tokenizer = get_tokenizer('spacy', language='en_core_web_sm')

In [105]:
# Load GloVe embedded vectors
glove = GloVe(name='6B', dim=100)

In [106]:
# Text preprocessing and padding functions
def text_pipeline(text):
    if isinstance(text, str):
        return [glove.stoi[token] if token in glove.stoi else 0 for token in tokenizer(text)]
    else:
        return []

In [107]:
# Download dataset
train_iter = SST2(split='train')
valid_iter = SST2(split='dev')
test_iter = SST2(split='test')

In [108]:
# Batch processing function
def collate_batch(batch):
    label_list, text_list, length_list = [], [], []
    
    # Uncomment the following line to print the entire batch content for debugging
    # print(f"Batch content: {batch}")
    
    for data in batch:
        if len(data) != 2:
            # Print unexpected data formats (optional)
            # print(f"Unexpected batch format: {data}")
            continue
        
        _label, _text = data
        
        processed_text = text_pipeline(_text)
        if len(processed_text) == 0:
            continue
        
        label = torch.tensor(1.0 if _label == '1' else 0.0)  # Binary classification label
        processed_text = torch.tensor(processed_text).long()  # Convert to LongTensor
        
        # Optional: limit the amount of debugging output
        # print(f"Processed text (length): {len(processed_text)}, Label: {label}")

        label_list.append(label)
        text_list.append(processed_text)
        length_list.append(len(processed_text))

    if len(text_list) == 0:
        return None, None, None

    text_list = pad_sequence(text_list, batch_first=True)
    label_list = torch.tensor(label_list, dtype=torch.float32)
    length_list = torch.tensor(length_list)

    return text_list.to(device), label_list.to(device), length_list.to(device)

In [109]:
# Configure data loader
BATCH_SIZE = 64
train_loader = DataLoader(list(train_iter), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
valid_loader = DataLoader(list(valid_iter), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(list(test_iter), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)

In [110]:
# LSTM model definition
class LSTMClassifier(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, output_dim, num_layers, dropout):
        super(LSTMClassifier, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(glove.vectors)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, 
                            bidirectional=True, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # Bidirectional LSTM, so 2x
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, text, text_lengths):
        embedded = self.dropout(self.embedding(text))
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden)

In [111]:
# Set hyperparameters for the model
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1  # Binary classification (positive or negative)
N_LAYERS = 2
DROPOUT = 0.5

In [112]:
# Instantiate the model
model = LSTMClassifier(EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, DROPOUT).to(device)

In [113]:
# Set up optimizer and loss function
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss().to(device)

In [114]:
# Training functions
def train(model, iterator, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for i, batch in enumerate(iterator):
        if batch is None:
            continue
        text, labels, text_lengths = batch
        if text is None:
            continue
        
        optimizer.zero_grad()
        predictions = model(text, text_lengths).squeeze(1)
        
        # Optional: limit the amount of output per batch
        if i % 10 == 0:  # Print only every 10th batch
            print(f"Batch {i}: Predictions shape: {predictions.shape}")

        loss = criterion(predictions, labels)
        rounded_preds = torch.round(torch.sigmoid(predictions))
        correct = (rounded_preds == labels).float()
        acc = correct.sum() / len(correct)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [115]:
# Functions for evaluation
def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.eval()
    
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            if batch is None:
                continue
            text, labels, text_lengths = batch
            if text is None:
                continue
            
            predictions = model(text, text_lengths).squeeze(1)
            
            # Optional: limit the amount of output per batch
            if i % 10 == 0:  # Print only every 10th batch
                print(f"Eval Batch {i}: Predictions shape: {predictions.shape}")
            
            loss = criterion(predictions, labels)
            rounded_preds = torch.round(torch.sigmoid(predictions))
            correct = (rounded_preds == labels).float()
            acc = correct.sum() / len(correct)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [116]:
# Perform training and evaluation
N_EPOCHS = 5

for epoch in range(N_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_loader, criterion)
    
    print(f'Epoch: {epoch+1}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

Epoch: 1
	Train Loss: 0.000 | Train Acc: 0.00%
	 Val. Loss: 0.000 |  Val. Acc: 0.00%
Epoch: 2
	Train Loss: 0.000 | Train Acc: 0.00%
	 Val. Loss: 0.000 |  Val. Acc: 0.00%
Epoch: 3
	Train Loss: 0.000 | Train Acc: 0.00%
	 Val. Loss: 0.000 |  Val. Acc: 0.00%
Epoch: 4
	Train Loss: 0.000 | Train Acc: 0.00%
	 Val. Loss: 0.000 |  Val. Acc: 0.00%
Epoch: 5
	Train Loss: 0.000 | Train Acc: 0.00%
	 Val. Loss: 0.000 |  Val. Acc: 0.00%


In [117]:
# Evaluate on test data
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.000 | Test Acc: 0.00%
