In [1]:
import torch
import torch.nn as nn
from torch.optim import Adam
import random

import numpy as np
from torchsummary import summary


In [2]:
train_data = []
train_label = []
for _ in range(50):
    start = random.randint(0, 97)  # Random start to generate diverse sequences
    seq = [start, start+1, start+2]
    train_data.append(seq)
    train_label.append(start+3)  # Next number in sequence

In [3]:
print(len(train_data))
print(len(train_label))
print(train_data[1])
print(train_label[1])

50
50
[72, 73, 74]
75


In [4]:
# Convert to PyTorch tensors
X = torch.FloatTensor(train_data).unsqueeze(-1)  # Shape: [50, 3, 1]
print(X.shape)
y = torch.FloatTensor(train_label)
print(y.shape)

# Simple RNN model
class SimpleRNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.rnn = nn.RNN(input_size=1, hidden_size=16, batch_first=True)
        self.linear = nn.Linear(16, 1)
        
    def forward(self, x):
        output, _ = self.rnn(x)
        return self.linear(output[:, -1, :]).squeeze()

# Create and train model
model = SimpleRNN()
# summary(model, (50, 3, 1))
model

torch.Size([50, 3, 1])
torch.Size([50])


SimpleRNN(
  (rnn): RNN(1, 16, batch_first=True)
  (linear): Linear(in_features=16, out_features=1, bias=True)
)

In [5]:
loss_fn = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=0.01)

In [6]:
# Train
for epoch in range(1500):
    model.train()
    optimizer.zero_grad()
    
    prediction = model(X)
    loss = loss_fn(prediction, y)
    
    loss.backward()
    optimizer.step()
    
    if epoch % 100 == 0:
        print(f'Epoch: {epoch} | Loss: {loss.item():.4f}')

Epoch: 0 | Loss: 3184.6130
Epoch: 100 | Loss: 1836.0853
Epoch: 200 | Loss: 1078.7091
Epoch: 300 | Loss: 645.6280
Epoch: 400 | Loss: 395.2435
Epoch: 500 | Loss: 251.8957
Epoch: 600 | Loss: 166.3913
Epoch: 700 | Loss: 114.6052
Epoch: 800 | Loss: 81.7485
Epoch: 900 | Loss: 59.2929
Epoch: 1000 | Loss: 42.8959
Epoch: 1100 | Loss: 30.7801
Epoch: 1200 | Loss: 21.9459
Epoch: 1300 | Loss: 15.6746
Epoch: 1400 | Loss: 11.1456


In [13]:
# Test several sequences
test_sequences = [
    [7,8,9],
    [11,12,13],
    [20,21,22],
    [50, 51, 52],
    [143, 144, 145],
]

model.eval()
with torch.no_grad():
    for seq in test_sequences:
        test_input = torch.FloatTensor(seq).reshape(1, 3, 1)
        pred = model(test_input)
        print(f'\nSequence {seq} -> Predicted: {pred.item():.4f}, Rounded pred: {np.round(pred)} Expected: {seq[-1]+1}')


Sequence [7, 8, 9] -> Predicted: 10.0014, Rounded pred: 10.0 Expected: 10

Sequence [11, 12, 13] -> Predicted: 13.7659, Rounded pred: 14.0 Expected: 14

Sequence [20, 21, 22] -> Predicted: 22.9745, Rounded pred: 23.0 Expected: 23

Sequence [50, 51, 52] -> Predicted: 52.9788, Rounded pred: 53.0 Expected: 53

Sequence [143, 144, 145] -> Predicted: 87.3655, Rounded pred: 87.0 Expected: 146


# https://www.tensorflow.org/text/guide/word_embeddings

In [18]:
import torch
import torch.nn as nn

import kagglehub
import glob
import os
import pandas as pd

import tqdm

In [19]:
# Download latest version
path = kagglehub.dataset_download("lakshmi25npathi/imdb-dataset-of-50k-movie-reviews")

print("Path to dataset files:", path)

Path to dataset files: /root/.cache/kagglehub/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews/versions/1


In [20]:
# probably want to change the name of the directory w/ "mv IMDB Dataset.csv dataset.csv" 
# Change to your directory
path = '/root/.cache/kagglehub/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews/versions/1/IMDB Dataset.csv'
os.path.isfile(path)

True

In [21]:
dataset = pd.read_csv(path)
print(len(dataset))
print(dataset.shape)
pd.set_option('display.max_colwidth', 200)
dataset.head()

50000
(50000, 2)


Unnamed: 0,review,sentiment
0,"One of the other reviewers has mentioned that after watching just 1 Oz episode you'll be hooked. They are right, as this is exactly what happened with me.<br /><br />The first thing that struck me...",positive
1,"A wonderful little production. <br /><br />The filming technique is very unassuming- very old-time-BBC fashion and gives a comforting, and sometimes discomforting, sense of realism to the entire p...",positive
2,"I thought this was a wonderful way to spend time on a too hot summer weekend, sitting in the air conditioned theater and watching a light-hearted comedy. The plot is simplistic, but the dialogue i...",positive
3,Basically there's a family where a little boy (Jake) thinks there's a zombie in his closet & his parents are fighting all the time.<br /><br />This movie is slower than a soap opera... and suddenl...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is a visually stunning film to watch. Mr. Mattei offers us a vivid portrait about human relations. This is a movie that seems to be telling us what mone...",positive


In [22]:
def tokenizer(text):

    text = text.replace('<br />', ' ')
    text = text.replace('<br/>', ' ')
    text = text.replace('<br>', ' ')
    text = text.lower()    
    for punctuation in ".,!?:;:":
        text = text.replace(punctuation, f'') # Add space before and after the punctuation
    tokens = text.split()
    return tokens

text = "My name is John Doe!"
print(tokenizer(text))
    

['my', 'name', 'is', 'john', 'doe']


In [23]:
sample_review = dataset['review'][0]
print(tokenizer(sample_review))

['one', 'of', 'the', 'other', 'reviewers', 'has', 'mentioned', 'that', 'after', 'watching', 'just', '1', 'oz', 'episode', "you'll", 'be', 'hooked', 'they', 'are', 'right', 'as', 'this', 'is', 'exactly', 'what', 'happened', 'with', 'me', 'the', 'first', 'thing', 'that', 'struck', 'me', 'about', 'oz', 'was', 'its', 'brutality', 'and', 'unflinching', 'scenes', 'of', 'violence', 'which', 'set', 'in', 'right', 'from', 'the', 'word', 'go', 'trust', 'me', 'this', 'is', 'not', 'a', 'show', 'for', 'the', 'faint', 'hearted', 'or', 'timid', 'this', 'show', 'pulls', 'no', 'punches', 'with', 'regards', 'to', 'drugs', 'sex', 'or', 'violence', 'its', 'is', 'hardcore', 'in', 'the', 'classic', 'use', 'of', 'the', 'word', 'it', 'is', 'called', 'oz', 'as', 'that', 'is', 'the', 'nickname', 'given', 'to', 'the', 'oswald', 'maximum', 'security', 'state', 'penitentary', 'it', 'focuses', 'mainly', 'on', 'emerald', 'city', 'an', 'experimental', 'section', 'of', 'the', 'prison', 'where', 'all', 'the', 'cells', 

In [24]:
from collections import Counter

def build_vocab(texts, max_words=10000):
    word_counter = Counter()
    for text in texts:
        tokens = tokenizer(text)
        word_counter.update(tokens)

    vocab = ['<PAD>', '<UNK>']
    vocab += [word for word, count in word_counter.most_common(max_words)]

    word_to_idx = {word:idx for idx, word in enumerate(vocab)}
    return word_to_idx

In [25]:
vocab = build_vocab(dataset['review'])
print("Vocabulary size:", len(vocab))
vocab_20 = list(vocab.items())[:10]
for word, idx in vocab_20:
    print(f'{word}: {idx}')

Vocabulary size: 10002
<PAD>: 0
<UNK>: 1
the: 2
a: 3
and: 4
of: 5
to: 6
is: 7
in: 8
it: 9


In [26]:
#Create custom dataset 
from torch.utils.data import Dataset, DataLoader
class MovieReview(Dataset):
    def __init__(self, reviews, labels, vocab, max_len=200):
        self.reviews = list(reviews)
        self.labels = list(labels) 
        self.vocab = vocab
        self.max_len = max_len
    
    def __len__(self):
        return len(self.reviews)
    
    def __getitem__(self, idx):
        review = self.reviews[idx]
        label = self.labels[idx]
        tokens = tokenizer(review)[:self.max_len]

        indices = [self.vocab.get(token, 1) for token in tokens] 

        if len(indices) < self.max_len:
            indices += [0] * (self.max_len - len(indices))
        
        return {
            'input_ids': torch.tensor(indices),
            'labels': torch.tensor(1 if label == 'positive' else 0)
        }

In [27]:
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(dataset['review'], dataset['sentiment'])

In [28]:
print(f'x_train: {len(x_train)} x_test: {len(x_test)} y_train: {len(y_train)} y_test: {len(y_test)}')

x_train: 37500 x_test: 12500 y_train: 37500 y_test: 12500


In [29]:
train_dataset = MovieReview(x_train, y_train, vocab)
test_dataset = MovieReview(x_test, y_test, vocab)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [30]:
sample_batch = next(iter(train_loader))
print(sample_batch['input_ids'])
print(sample_batch['labels'])
print(sample_batch['labels'].shape)


tensor([[   2,  246, 9236,  ...,    2,  271, 1533],
        [ 890,  184,    2,  ...,    0,    0,    0],
        [  89,    5,   30,  ...,    9,   14,    3],
        ...,
        [   1,  558,  175,  ...,   72,   85,   34],
        [  22,  115,   51,  ...,  102, 2669,  337],
        [5852, 4948,    3,  ...,   10,  495,    1]])
tensor([0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 0,
        1, 1, 0, 0, 1, 1, 0, 1])
torch.Size([32])


In [52]:
class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim=200, hidden_dim=256, n_layers=2):
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(input_size=embedding_dim, 
                            hidden_size=hidden_dim, 
                            num_layers=n_layers,
                            batch_first=True, 
                            bidirectional=True)
        
        self.linear1 = nn.Linear(hidden_dim * 2, 1)

        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        # print("Input shape:", x.shape)

        embedded = self.dropout(self.embeddings(x))
        # print("Embedded shape:", embedded.shape)


        lstm_out, _ = self.lstm(embedded)

        # print(f'lstm output shape: {lstm_out.shape}')

        out = self.linear1(lstm_out[:, -1, :])
        # print("Output shape:", out.shape)


        return torch.sigmoid(out)

model = SentimentLSTM(vocab_size=len(vocab))
model

SentimentLSTM(
  (embeddings): Embedding(10002, 200)
  (lstm): LSTM(200, 256, num_layers=2, batch_first=True, bidirectional=True)
  (linear1): Linear(in_features=512, out_features=1, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [62]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

cuda


SentimentRNN(
  (embedding): Embedding(10002, 100)
  (lstm): LSTM(100, 256, num_layers=2, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=512, out_features=1, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [63]:
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters())

In [64]:
from tqdm import tqdm

In [65]:
def train_model(model, device, train_loader, criterion, optimizer, epochs):
    print(device)
    model.to(device)

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch in tqdm(train_loader, desc="training"):
            # Get data
            inputs = batch['input_ids'].to(device)
            labels = batch['labels'].float().to(device)
            
            # Zero gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            outputs = outputs.squeeze(1)

            loss = criterion(outputs, labels)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            
        avg_loss = total_loss / len(train_loader)
        print(f'Epoch {epoch+1}, Loss: {avg_loss:.4f}')

In [66]:
train_model(model, device, train_loader, criterion, optimizer, epochs=15)

cuda


training: 100%|██████████| 1172/1172 [00:31<00:00, 36.80it/s]


Epoch 1, Loss: 0.6934


training: 100%|██████████| 1172/1172 [00:31<00:00, 36.90it/s]


Epoch 2, Loss: 0.6932


training: 100%|██████████| 1172/1172 [00:31<00:00, 37.55it/s]


Epoch 3, Loss: 0.6914


training: 100%|██████████| 1172/1172 [00:32<00:00, 35.88it/s]


Epoch 4, Loss: 0.6864


training: 100%|██████████| 1172/1172 [00:33<00:00, 35.31it/s]


Epoch 5, Loss: 0.5599


training: 100%|██████████| 1172/1172 [00:33<00:00, 35.51it/s]


Epoch 6, Loss: 0.3557


training: 100%|██████████| 1172/1172 [00:33<00:00, 35.39it/s]


Epoch 7, Loss: 0.2960


training: 100%|██████████| 1172/1172 [00:32<00:00, 35.64it/s]


Epoch 8, Loss: 0.2574


training: 100%|██████████| 1172/1172 [00:33<00:00, 35.30it/s]


Epoch 9, Loss: 0.2329


training: 100%|██████████| 1172/1172 [00:31<00:00, 37.34it/s]


Epoch 10, Loss: 0.2099


training: 100%|██████████| 1172/1172 [00:31<00:00, 36.92it/s]


Epoch 11, Loss: 0.1838


training: 100%|██████████| 1172/1172 [00:32<00:00, 35.87it/s]


Epoch 12, Loss: 0.1623


training: 100%|██████████| 1172/1172 [00:31<00:00, 37.78it/s]


Epoch 13, Loss: 0.1463


training: 100%|██████████| 1172/1172 [00:32<00:00, 36.12it/s]


Epoch 14, Loss: 0.1298


training: 100%|██████████| 1172/1172 [00:30<00:00, 37.89it/s]

Epoch 15, Loss: 0.1136





In [81]:
torch.save(model, 'model.pt')

In [82]:
model_test = torch.load('model.pt', weights_only=False)

In [83]:
model_test

SentimentRNN(
  (embedding): Embedding(10002, 100)
  (lstm): LSTM(100, 256, num_layers=2, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=512, out_features=1, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [87]:
def test_model(model, device, test_loader, criterion):
    print(device)
    model.to(device)

    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0
    
    with torch.no_grad():  # No need to track gradients during testing
        for batch in tqdm(test_loader, desc="Testing"):
            # Get data
            inputs = batch['input_ids'].to(device)
            labels = batch['labels'].float().to(device)
            
            # Forward pass
            outputs = model(inputs)
            outputs = outputs.squeeze(1)  # Remove the extra dimension if needed

            # Calculate loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Calculate accuracy
            predicted = (outputs >= 0.5).float()  # For binary classification (threshold 0.5)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

        avg_loss = total_loss / len(test_loader)  # Use test_loader for average loss
        accuracy = (correct_predictions / total_predictions) * 100  # Accuracy in percentage
        
        print(f'Test Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')

In [88]:
test_model(model_test, device, test_loader, criterion)

cuda


Testing: 100%|██████████| 391/391 [00:03<00:00, 103.19it/s]

Test Loss: 0.3612, Accuracy: 88.50%





In [118]:
def predict(model, vocab, text, device, max_len=200):
    # Step 1: Tokenize the input text
    tokens = tokenizer(text)
    
    # Step 2: Convert tokens to indices using the vocabulary
    indices = [vocab.get(token, 1) for token in tokens]  # 1 is for <UNK> token
    
    # Pad the input if necessary
    if len(indices) < max_len:
        indices += [0] * (max_len - len(indices))  # Pad with <PAD> token (0)
    
    # Convert the indices to a tensor
    input_tensor = torch.tensor(indices).unsqueeze(0).to(device)  # Add batch dimension and move to the same device
    
    # Step 3: Run the model to get the prediction
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        output = model(input_tensor)  # Get the output logits
    
    # Step 4: Convert output to a class prediction
    predicted_class = torch.round(output).item()  # 0 or 1
    
    # Step 5: Return the prediction result
    return "positive" if predicted_class == 1 else "negative"


In [122]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Check if GPU is available, otherwise use CPU
text = "The movie tickled my pickl"
prediction = predict(model, vocab, text, device)
print(f"Prediction: {prediction}")

Prediction: positive
