Import Required Libraries

In [None]:
import pandas as pd
import nltk
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report


Data loading: csv file is downloaded from kaggle 

In [None]:
df = pd.read_csv('Tweets.csv')
df.head()

Data Preprocessing

In [None]:
from pathlib import Path

# Ensure this location is on NLTK's search path
nltk_data_dir = Path.home() / "nltk_data"
if str(nltk_data_dir) not in nltk.data.path:
    nltk.data.path.append(str(nltk_data_dir))

# Tokenizer + stopwords resources
nltk.download("punkt", download_dir=str(nltk_data_dir))
# Some NLTK versions require this for word_tokenize
nltk.download("punkt_tab", download_dir=str(nltk_data_dir))
nltk.download("stopwords", download_dir=str(nltk_data_dir))

In [None]:
# convert text to lowercase (vectorized)
df["text"] = df["text"].astype(str).str.lower()

# Tokenization and Stopwords Removal
# Tokenize the text
from nltk.tokenize import word_tokenize

df["tokens"] = df["text"].apply(word_tokenize)

# Remove stopwords (compute once; use a set for faster membership tests)
stopword_set = set(nltk.corpus.stopwords.words("english"))
df["tokens"] = df["tokens"].apply(lambda toks: [w for w in toks if w not in stopword_set])

Split the dataset into training and testing sets to evaluate the performance of the sentiment analysis model

In [None]:
# Use tokenized text (list[str]) as the model input
X = df["tokens"]
y = df["sentiment"]

X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y,
)

# Encode sentiment labels to integer class IDs for PyTorch
label_encoder = LabelEncoder()
y_train_enc = label_encoder.fit_transform(y_train)
y_test_enc = label_encoder.transform(y_test)
num_classes = len(label_encoder.classes_)
print("Classes:", list(label_encoder.classes_))


Convert tokenized text into integer IDs (a vocabulary). Then an `nn.Embedding` layer will map those IDs to dense word vectors for the RNN to learn from.

In [None]:
# Build a vocabulary on the TRAIN split only (avoid leakage)
PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"
PAD_ID = 0
UNK_ID = 1

max_vocab_size = 20000
min_freq = 2
max_len = 60

counter = Counter()
for toks in X_train:
    counter.update(toks)

vocab_tokens = [tok for tok, freq in counter.most_common(max_vocab_size) if freq >= min_freq]
itos = [PAD_TOKEN, UNK_TOKEN] + vocab_tokens
stoi = {tok: i for i, tok in enumerate(itos)}
vocab_size = len(itos)

print("Vocab size:", vocab_size)


def numericalize(tokens):
    return [stoi.get(t, UNK_ID) for t in tokens]


# Keep variable lengths; pad per-batch (pairs with pack_padded_sequence)
X_train_num = [numericalize(toks) for toks in X_train]
X_test_num = [numericalize(toks) for toks in X_test]

y_train_ids = np.array(y_train_enc, dtype=np.int64)
y_test_ids = np.array(y_test_enc, dtype=np.int64)

from torch.utils.data import Dataset, DataLoader


def clip_len(x):
    x = x[:max_len]
    return x if len(x) > 0 else [UNK_ID]


class SeqDataset(Dataset):
    def __init__(self, X_num, y_ids):
        self.X = [clip_len(x) for x in X_num]
        self.y = y_ids

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x = self.X[idx]
        return torch.tensor(x, dtype=torch.long), int(self.y[idx]), len(x)


def collate_batch(batch):
    xs, ys, lens = zip(*batch)

    # sort by length descending (required for pack_padded_sequence)
    order = np.argsort(lens)[::-1]
    xs = [xs[i] for i in order]
    ys = torch.tensor([ys[i] for i in order], dtype=torch.long)
    lens = torch.tensor([lens[i] for i in order], dtype=torch.long)

    xpad = nn.utils.rnn.pad_sequence(xs, batch_first=True, padding_value=PAD_ID)
    return xpad, ys, lens


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_ds = SeqDataset(X_train_num, y_train_ids)
test_ds = SeqDataset(X_test_num, y_test_ids)

batch_size = 64
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

print("Device:", device)
print("Train batches:", len(train_loader), "Test batches:", len(test_loader))

# Class weights to reduce majority-class collapse
class_counts = np.bincount(y_train_ids, minlength=num_classes)
class_weights = (class_counts.sum() / (num_classes * np.maximum(class_counts, 1))).astype(np.float32)
class_weights_t = torch.tensor(class_weights, dtype=torch.float32, device=device)
print("Class counts:", class_counts, "weights:", class_weights)

Build and Train a Sentiment Analysis Model

In [None]:
# LSTM sentiment classifier (learned word embeddings)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)


class SentimentLSTM(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        hidden_dim: int,
        num_classes: int,
        pad_idx: int = 0,
        dropout: float = 0.3,
    ):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x, lengths):
        # x: [batch, seq_len], lengths: [batch]
        emb = self.embedding(x)  # [batch, seq_len, embed_dim]
        packed = nn.utils.rnn.pack_padded_sequence(
            emb,
            lengths.cpu(),
            batch_first=True,
            enforce_sorted=True,
        )
        _, (h_n, _) = self.lstm(packed)
        h_last = h_n[-1]  # [batch, hidden_dim]
        h_last = self.dropout(h_last)
        return self.fc(h_last)  # [batch, num_classes]


model = SentimentLSTM(
    vocab_size=vocab_size,
    embed_dim=128,
    hidden_dim=128,
    num_classes=num_classes,
    pad_idx=PAD_ID,
    dropout=0.3,
).to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights_t)
optimizer = optim.Adam(model.parameters(), lr=1e-3)


def run_epoch(loader, train: bool):
    if train:
        model.train()
    else:
        model.eval()

    total_loss = 0.0
    correct = 0
    total = 0

    for xb, yb, lens in loader:
        xb = xb.to(device)
        yb = yb.to(device)
        lens = lens.to(device)

        if train:
            optimizer.zero_grad(set_to_none=True)

        with torch.set_grad_enabled(train):
            logits = model(xb, lens)
            loss = criterion(logits, yb)
            if train:
                loss.backward()
                optimizer.step()

        total_loss += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == yb).sum().item()
        total += xb.size(0)

    return total_loss / total, correct / total


epochs = 8
for epoch in range(1, epochs + 1):
    train_loss, train_acc = run_epoch(train_loader, train=True)
    test_loss, test_acc = run_epoch(test_loader, train=False)
    print(
        f"Epoch {epoch}/{epochs} | "
        f"train loss {train_loss:.4f} acc {train_acc:.4f} | "
        f"test loss {test_loss:.4f} acc {test_acc:.4f}"
    )

Use the trained model to make predictions on the test data and evaluate its performance

In [None]:
# Evaluate on the test split
model.eval()
all_preds = []
all_true = []

with torch.no_grad():
    for xb, yb, lens in test_loader:
        xb = xb.to(device)
        lens = lens.to(device)
        logits = model(xb, lens)
        preds = logits.argmax(dim=1).cpu().numpy()
        all_preds.extend(preds.tolist())
        all_true.extend(yb.numpy().tolist())

all_preds = np.array(all_preds)
all_true = np.array(all_true)

print("Accuracy Score:", accuracy_score(all_true, all_preds))
print("Classification Report:")
print(
    classification_report(
        label_encoder.inverse_transform(all_true),
        label_encoder.inverse_transform(all_preds),
    )
)

In [None]:
import matplotlib.pyplot as plt

# WordCloud is optional; fall back gracefully if not installed.
try:
    from wordcloud import WordCloud
except ModuleNotFoundError:
    WordCloud = None


sent = "positive"
texts = df[df["sentiment"] == sent]["text"].astype(str)
joined = " ".join(texts)

if WordCloud is None:
    print("Optional dependency missing: wordcloud")
    print("Install with: pip install wordcloud")

    # Simple fallback: show top words as a bar chart
    from collections import Counter

    words = joined.lower().split()
    top = Counter(words).most_common(20)
    labels = [w for w, _ in top]
    values = [c for _, c in top]

    plt.figure(figsize=(10, 4))
    plt.bar(labels, values)
    plt.xticks(rotation=60, ha="right")
    plt.title(f"Top words - {sent} sentiment")
    plt.tight_layout()
    plt.show()
else:
    wc = WordCloud(width=800, height=400, background_color="white").generate(joined)
    plt.figure(figsize=(10, 6))
    plt.imshow(wc, interpolation="bilinear")
    plt.axis("off")
    plt.title(f"Word Cloud - {sent.capitalize()} Sentiment")
    plt.show()