In [1]:
import os
import re
import urllib.request
import zipfile
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
from datasets import load_dataset
from sklearn.metrics import accuracy_score, f1_score

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
def load_glove_embeddings(glove_file_path, vocab, embedding_dim=100):
    embeddings_index = {}
    with open(glove_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = vector
    embedding_matrix = np.random.uniform(-0.05, 0.05, (len(vocab), embedding_dim)).astype(np.float32)
    for word, idx in vocab.items():
        if word in embeddings_index:
            embedding_matrix[idx] = embeddings_index[word]
    return torch.tensor(embedding_matrix)

In [4]:
glove_path = "glove.6B.300d.txt"
if not os.path.exists(glove_path):
    urllib.request.urlretrieve("http://nlp.stanford.edu/data/glove.6B.zip", "glove.6B.zip")
    with zipfile.ZipFile("glove.6B.zip", 'r') as zip_ref:
        zip_ref.extractall(".")

In [5]:
class TemporalBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation):
        super().__init__()
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size,
                              padding=(kernel_size - 1) * dilation, dilation=dilation)
        self.relu = nn.ReLU()
        self.norm = nn.BatchNorm1d(out_channels)

    def forward(self, x):
        return self.norm(self.relu(self.conv(x)))

In [6]:
class TCN(nn.Module):
    def __init__(self, in_channels, out_channels, num_levels=3):
        super().__init__()
        layers = []
        for i in range(num_levels):
            dilation = 2 ** i
            in_ch = in_channels if i == 0 else out_channels
            layers.append(TemporalBlock(in_ch, out_channels, kernel_size=3, dilation=dilation))
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

In [9]:
class SentimentModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False, padding_idx=vocab["<pad>"])
        self.conv1 = nn.Conv1d(embedding_dim, 128, kernel_size=5)
        self.bn1 = nn.BatchNorm1d(128)
        self.pool1 = nn.MaxPool1d(kernel_size=2)

        self.tcn = TCN(128, 64)
        self.ln = nn.LayerNorm(64)

        self.bilstm = nn.LSTM(input_size=64, hidden_size=hidden_dim, bidirectional=True, batch_first=True)
        self.dropout_lstm = nn.Dropout(0.3)  # Added dropout after LSTM
        self.global_pool = nn.AdaptiveMaxPool1d(1)

        self.fc1 = nn.Linear(hidden_dim * 2, 64)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(64, 1)
        nn.init.constant_(self.fc2.bias, -1.0)  # Help escape flat start

    def forward(self, x):
        x = self.embedding(x)              # [B, L, E]
        x = x.permute(0, 2, 1)             # [B, E, L]

        x = self.pool1(F.relu(self.bn1(self.conv1(x))))  # [B, C, L//2]

        x = self.tcn(x)
        x = x.permute(0, 2, 1)             # [B, L, C]
        x = self.ln(x)

        lstm_out, _ = self.bilstm(x)       # [B, L, 2H]
        lstm_out = self.dropout_lstm(lstm_out)  # Added dropout
        lstm_out = lstm_out.permute(0, 2, 1)
        pooled = self.global_pool(lstm_out).squeeze(-1)

        x = self.dropout(F.relu(self.fc1(pooled)))
        return torch.sigmoid(self.fc2(x)).squeeze()

In [10]:
def basic_tokenizer(text):
    text = text.lower()
    text = re.sub(r"[^a-z0-9\s]", "", text)
    return text.split()

In [11]:
def encode(text):
    return torch.tensor([vocab.get(token, vocab["<unk>"]) for token in basic_tokenizer(text)])

In [12]:
def collate_fn(batch):
    texts = [encode(x["text"]) for x in batch]
    labels = torch.tensor([x["label"] for x in batch], dtype=torch.float32)
    padded = pad_sequence(texts, batch_first=True, padding_value=vocab["<pad>"])
    return padded, labels

In [13]:
def evaluate(loader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            preds = model(X)
            all_preds.extend((preds > 0.5).float().cpu())
            all_labels.extend(y.cpu())
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    return acc, f1

In [14]:
dataset = load_dataset("imdb")
train_data = dataset["train"]
test_data = dataset["test"]

README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

In [15]:
counter = Counter()
for sample in train_data:
    tokens = basic_tokenizer(sample["text"])
    counter.update(tokens)

In [16]:
vocab = {"<pad>": 0, "<unk>": 1}
for word, freq in counter.items():
    if freq >= 2:
        vocab[word] = len(vocab)
vocab_size = len(vocab)

In [17]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [18]:
embedding_dim = 300
embedding_matrix = load_glove_embeddings(glove_path, vocab, embedding_dim)

In [19]:
model = SentimentModel(vocab_size, embedding_dim, hidden_dim=128).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.BCELoss()

In [20]:
best_f1 = 0.0
patience = 3
epochs_without_improvement = 0

In [21]:
for epoch in range(10):
    model.train()
    total_loss = 0
    for X, y in train_loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        preds = model(X)
        loss = criterion(preds, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Evaluate
    acc, f1 = evaluate(test_loader)
    print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}, Test Acc: {acc:.4f}, F1: {f1:.4f}")

    # Early stopping
    if f1 > best_f1:
        best_f1 = f1
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1
        if epochs_without_improvement >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break

Epoch 1, Loss: 322.7507, Test Acc: 0.8517, F1: 0.8671
Epoch 2, Loss: 182.8774, Test Acc: 0.8904, F1: 0.8848
Epoch 3, Loss: 121.6934, Test Acc: 0.8920, F1: 0.8911
Epoch 4, Loss: 85.8199, Test Acc: 0.8775, F1: 0.8837
Epoch 5, Loss: 65.9822, Test Acc: 0.8515, F1: 0.8362
Epoch 6, Loss: 55.8267, Test Acc: 0.8717, F1: 0.8641
Early stopping at epoch 6


In [22]:
torch.save(model.state_dict(), "complete_model.pth")

In [None]:
model = torch.load("complete_model.pth")
model.eval()