In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from collections import Counter
from sklearn.metrics import classification_report, confusion_matrix

df = pd.read_csv('IMDB Dataset.csv')

def clean_text(text):
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    return text.lower()

def tokenize(text):
    return text.split()

df['cleaned_text'] = df['review'].apply(clean_text)
df['tokens'] = df['cleaned_text'].apply(tokenize)

all_words = [word for tokens in df['tokens'] for word in tokens]
vocab = Counter(all_words)
word_to_idx = {word: idx + 1 for idx, (word, _) in enumerate(vocab.most_common())}
df['indexed_tokens'] = df['tokens'].apply(lambda tokens: [word_to_idx[word] for word in tokens if word in word_to_idx])
df['sentiment'] = df['sentiment'].map({'negative': 0, 'positive': 1})
X = df['indexed_tokens']
y = df['sentiment']

X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42)

class SentimentDataset(Dataset):
    def __init__(self, reviews, labels):
        self.reviews = reviews
        self.labels = labels

    def __len__(self):
        return len(self.reviews)

    def __getitem__(self, idx):
        review = self.reviews.iloc[idx]
        label = self.labels.iloc[idx]
        return torch.tensor(review, dtype=torch.long), torch.tensor(label, dtype=torch.long)

def collate_fn(batch):
    reviews, labels = zip(*batch)
    padded_reviews = pad_sequence([torch.tensor(review) for review in reviews], batch_first=True, padding_value=0)
    labels = torch.tensor(labels, dtype=torch.long)
    return padded_reviews, labels

batch_size = 64
train_dataset = SentimentDataset(X_train, y_train)
val_dataset = SentimentDataset(X_val, y_val)
test_dataset = SentimentDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_fn)

class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size + 1, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.embedding(x)
        x, (hn, cn) = self.lstm(x)
        x = self.fc(hn[-1])
        return x

def train_model_with_curve(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        model.train()
        epoch_train_loss = 0

        for reviews, labels in train_loader:
            reviews, labels = reviews.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(reviews)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            epoch_train_loss += loss.item()

        avg_train_loss = epoch_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        model.eval()
        epoch_val_loss = 0
        with torch.no_grad():
            for reviews, labels in val_loader:
                reviews, labels = reviews.to(device), labels.to(device)
                outputs = model(reviews)
                loss = criterion(outputs, labels)
                epoch_val_loss += loss.item()

        avg_val_loss = epoch_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

    plot_learning_curve(train_losses, val_losses)

def plot_learning_curve(train_losses, val_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Learning Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid()
    plt.show()

def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    with torch.no_grad():
        for reviews, labels in test_loader:
            reviews, labels = reviews.to(device), labels.to(device)
            outputs = model(reviews)
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).sum().item()
    accuracy = correct / len(test_loader.dataset) * 100
    print(f'Test Accuracy: {accuracy:.2f}%')

def classify_unseen_data(model, unseen_data, word_to_idx, device):
    model.eval()
    unseen_data_cleaned = [clean_text(review) for review in unseen_data]
    unseen_data_tokenized = [tokenize(text) for text in unseen_data_cleaned]
    unseen_data_indexed = [[word_to_idx[word] for word in tokens if word in word_to_idx] for tokens in unseen_data_tokenized]

    # Pad the sequences
    unseen_data_tensor = pad_sequence([torch.tensor(review, dtype=torch.long) for review in unseen_data_indexed], batch_first=True, padding_value=0).to(device)

    with torch.no_grad():
        outputs = model(unseen_data_tensor)
        _, predictions = torch.max(outputs.data, 1)

    return predictions.cpu().numpy()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
vocab_size = len(word_to_idx)
embedding_dim = 100
hidden_dim = 128
output_dim = 2
num_epochs = 5

model = RNN(vocab_size, embedding_dim, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

train_model_with_curve(model, train_loader, val_loader, criterion, optimizer, num_epochs, device)
evaluate_model(model, test_loader, device)

unseen_reviews = [
    "The movie was fantastic! I loved every moment of it.",
    "It was a waste of time and I regret watching it."
]

predictions = classify_unseen_data(model, unseen_reviews, word_to_idx, device)
print("Predictions for unseen data:", ["Positive" if pred == 1 else "Negative" for pred in predictions])

  padded_reviews = pad_sequence([torch.tensor(review) for review in reviews], batch_first=True, padding_value=0)


In [6]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, nhead, num_encoder_layers, hidden_dim, output_dim):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size + 1, embedding_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(embedding_dim, nhead, hidden_dim),
            num_layers=num_encoder_layers
        )
        self.fc = nn.Linear(embedding_dim, output_dim)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)  # (batch_size, seq_len, embed_dim) -> (seq_len, batch_size, embed_dim)
        x = self.transformer_encoder(x)
        x = x.mean(dim=0)  # Average pooling across the sequence length
        return self.fc(x)

def train_model(model, train_loader, criterion, optimizer, num_epochs, device):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for reviews, labels in train_loader:
            reviews, labels = reviews.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(reviews)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    with torch.no_grad():
        for reviews, labels in test_loader:
            reviews, labels = reviews.to(device), labels.to(device)
            outputs = model(reviews)
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).sum().item()
    accuracy = correct / len(test_loader.dataset) * 100
    print(f'Test Accuracy: {accuracy:.2f}%')

def classify_unseen_data(model, unseen_data, word_to_idx, device):
    model.eval()
    unseen_data_cleaned = [clean_text(review) for review in unseen_data]  # Ensure you have this function
    unseen_data_tokenized = [tokenize(text) for text in unseen_data_cleaned]  # Ensure you have this function
    unseen_data_indexed = [[word_to_idx[word] for word in tokens if word in word_to_idx] for tokens in unseen_data_tokenized]

    unseen_data_tensor = pad_sequence([torch.tensor(review, dtype=torch.long) for review in unseen_data_indexed], batch_first=True, padding_value=0).to(device)

    with torch.no_grad():
        outputs = model(unseen_data_tensor)
        _, predictions = torch.max(outputs.data, 1)

    return predictions.cpu().numpy()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vocab_size = len(word_to_idx)
embedding_dim = 64
nhead = 8
num_encoder_layers = 2
hidden_dim = 128
output_dim = 2
num_epochs = 5

model = TransformerModel(vocab_size, embedding_dim, nhead, num_encoder_layers, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())


train_model(model, train_loader, criterion, optimizer, num_epochs, device)
evaluate_model(model, test_loader, device)

unseen_data = [
    "This movie was fantastic! I loved it.",
    "I did not enjoy the film; it was boring."
]

predictions = classify_unseen_data(model, unseen_data, word_to_idx, device)
print(predictions)

  padded_reviews = pad_sequence([torch.tensor(review) for review in reviews], batch_first=True, padding_value=0)


Epoch [1/5], Loss: 0.5013
Epoch [2/5], Loss: 0.3157
Epoch [3/5], Loss: 0.2355
Epoch [4/5], Loss: 0.1725
Epoch [5/5], Loss: 0.1155
Test Accuracy: 86.89%
[1 0]
