In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.nn as nn
import torch.nn.functional as F
from nltk.corpus import stopwords 
from collections import Counter
import string
import re
import seaborn as sns
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

In [2]:
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

GPU not available, CPU used


In [3]:
base_csv = './data/IMDB Dataset.csv'
df = pd.read_csv(base_csv)
df.head()

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive


In [5]:
from transformers import BertTokenizer, BertModel
import torch
# Load pretrained tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Load pretrained model
model = BertModel.from_pretrained("bert-base-uncased")

# Access the embedding matrix
embedding_matrix = model.embeddings.word_embeddings.weight
print("Embedding matrix shape:", embedding_matrix.shape)  # Shape: (vocab_size, embedding_dim)

# Tokenize the text column
def tokenize_texts(texts, tokenizer):
    return tokenizer(
        list(texts),  # Pass a list of strings
        padding="max_length",  # Pad to max_length
        truncation=True,       # Truncate to max_length
        return_tensors="pt"    # Return PyTorch tensors
    )

# Tokenize the `text` column
tokenized_data = tokenize_texts(df["review"], tokenizer)
label = df['sentiment'].apply(lambda x: 1 if x =='positive' else 0)

# Print tokenized outputs
print(tokenized_data)

Embedding matrix shape: torch.Size([30522, 768])
{'input_ids': tensor([[ 101, 2028, 1997,  ...,    0,    0,    0],
        [ 101, 1037, 6919,  ...,    0,    0,    0],
        [ 101, 1045, 2245,  ...,    0,    0,    0],
        ...,
        [ 101, 1045, 2572,  ...,    0,    0,    0],
        [ 101, 1045, 1005,  ...,    0,    0,    0],
        [ 101, 2053, 2028,  ...,    0,    0,    0]]), 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        ...,
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]])}


In [6]:
input_ids = tokenized_data["input_ids"]
attention_mask = tokenized_data["attention_mask"]

# Split the data into train and test sets
train_input_ids, test_input_ids, train_attention_mask, test_attention_mask, train_labels, test_labels = train_test_split(
    input_ids, attention_mask, label, test_size=0.2, random_state=42  # 20% for testing
)

print("Input IDs:", input_ids)
print("Attention Mask:", attention_mask)

Input IDs: tensor([[ 101, 2028, 1997,  ...,    0,    0,    0],
        [ 101, 1037, 6919,  ...,    0,    0,    0],
        [ 101, 1045, 2245,  ...,    0,    0,    0],
        ...,
        [ 101, 1045, 2572,  ...,    0,    0,    0],
        [ 101, 1045, 1005,  ...,    0,    0,    0],
        [ 101, 2053, 2028,  ...,    0,    0,    0]])
Attention Mask: tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]])


In [33]:
# create Tensor datasets
train_data = TensorDataset(train_input_ids, torch.from_numpy(train_labels.values))
valid_data = TensorDataset(test_input_ids, torch.from_numpy(test_labels.values))

# dataloaders
batch_size = 128

# make sure to SHUFFLE your data
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size)

In [34]:
class RNN(torch.nn.Module):
    
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super().__init__()

        self.embedding = torch.nn.Embedding(input_dim, embedding_dim)
        
        self.rnn = torch.nn.LSTM(embedding_dim,
                                 hidden_dim)        
        
        self.fc = torch.nn.Linear(hidden_dim, output_dim)
        

    def forward(self, text):
        # text dim: [sentence length, batch size]
        
        embedded = self.embedding(text)
        # embedded dim: [sentence length, batch size, embedding dim]
        
        output, (hidden, cell) = self.rnn(embedded)
        # output dim: [sentence length, batch size, hidden dim]
        # hidden dim: [1, batch size, hidden dim]

        hidden.squeeze_(0)
        # hidden dim: [batch size, hidden dim]
        
        output = self.fc(hidden)
        return output

In [38]:
# general Settings

RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)

VOCABULARY_SIZE = tokenizer.vocab_size
LEARNING_RATE = 0.005
BATCH_SIZE = 128
NUM_EPOCHS = 5
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

EMBEDDING_DIM = 768
HIDDEN_DIM = 768
NUM_CLASSES = 2

torch.manual_seed(RANDOM_SEED)
model = RNN(input_dim=VOCABULARY_SIZE,
            embedding_dim=EMBEDDING_DIM,
            hidden_dim=HIDDEN_DIM,
            output_dim=NUM_CLASSES # could use 1 for binary classification
)

model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

In [39]:
def compute_accuracy(model, data_loader, device):

    with torch.no_grad():

        correct_pred, num_examples = 0, 0

        for i, (features, targets) in enumerate(data_loader):

            features = features.to(device)
            targets = targets.float().to(device)

            logits = model(features)
            _, predicted_labels = torch.max(logits, 1)

            num_examples += targets.size(0)
            correct_pred += (predicted_labels == targets).sum()
    return correct_pred.float()/num_examples * 100

In [49]:
import time
start_time = time.time()

for epoch in range(NUM_EPOCHS):
    model.train()
    for inputs, labels in train_loader:
        
        inputs, labels = inputs.to(device), labels.to(device)  
        inputs = torch.transpose(inputs, 0, 1)
        ### FORWARD AND BACK PROP
        logits = model(inputs)
        loss = F.cross_entropy(logits, labels)
        optimizer.zero_grad()
        
        loss.backward()
        
        ### UPDATE MODEL PARAMETERS
        optimizer.step()
        
        ### LOGGING
        # if not batch_idx % 50:
        #     print (f'Epoch: {epoch+1:03d}/{NUM_EPOCHS:03d} | '
        #            f'Batch {batch_idx:03d}/{len(train_loader):03d} | '
        #            f'Loss: {loss:.4f}')

    with torch.set_grad_enabled(False):
        print(f'training accuracy: '
              f'{compute_accuracy(model, train_loader, DEVICE):.2f}%'
              f'\nvalid accuracy: '
              f'{compute_accuracy(model, valid_loader, DEVICE):.2f}%')
        
    print(f'Time elapsed: {(time.time() - start_time)/60:.2f} min')
    
print(f'Total Training Time: {(time.time() - start_time)/60:.2f} min')
# print(f'Test accuracy: {compute_accuracy(model, test_loader, DEVICE):.2f}%')

KeyboardInterrupt: 