<a href="https://colab.research.google.com/github/paryagsahni1845/deeplearning/blob/main/RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## RNN
An RNN (Recurrent Neural Network) is a type of neural network that processes sequences step-by-step, carrying forward a “memory” of past steps.
Use it when data has an order or time relationship, like text, speech, or time series.
It works well for tasks like sentiment analysis, language modeling, translation, or stock prediction.
We use it because it can capture patterns that depend on earlier parts of the input, unlike regular feedforward networks.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
import numpy as np

In [21]:
categories = ["rec.sport.baseball", "sci.space"]  # just 2 categories
newsgroups = fetch_20newsgroups(subset='all', categories=categories)

import re
from collections import Counter
import numpy as np

# --- 1. Tokenize ---
def tokenize(text):
    return re.findall(r"\b\w+\b", text.lower())

# --- 2. Build vocab (limit size) ---
all_tokens = []
for doc in newsgroups.data:
    all_tokens.extend(tokenize(doc))

max_vocab = 5000
counter = Counter(all_tokens)
vocab = {word: i+2 for i, (word, _) in enumerate(counter.most_common(max_vocab))}
vocab["<PAD>"] = 0
vocab["<UNK>"] = 1

# --- 3. Encode documents into sequences of word IDs ---
def encode(text):
    return [vocab.get(token, vocab["<UNK>"]) for token in tokenize(text)]

X_seq = [encode(doc) for doc in newsgroups.data]
y = newsgroups.target

# --- 4. Pad / clip to fixed length ---
max_len = 30
X = np.array([x[:max_len] if len(x) >= max_len else x + [0]*(max_len - len(x)) for x in X_seq])


# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [22]:
class NewsDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = NewsDataset(X_train, y_train)
test_dataset = NewsDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [23]:
class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = TextDataset(X_train, y_train)
test_dataset = TextDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

In [24]:
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim=64, hidden_dim=64, num_classes=20):
        super(RNNClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        out, _ = self.rnn(x)
        out = out[:, -1, :]  # take last timestep
        out = self.fc(out)
        return out

In [25]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RNNClassifier(len(vocab), 64, 64, len(newsgroups.target_names)).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=1e-3)

In [26]:
torch.nn.utils.clip_grad_norm_(model.parameters(), 5)

tensor(0.)

In [28]:
for epoch in range(10):  # keep small for demo
    model.train()
    total_loss, total_correct = 0, 0

    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_correct += (outputs.argmax(1) == y_batch).sum().item()

    acc = total_correct / len(train_dataset)
    print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}, Train Acc: {acc:.4f}")

Epoch 1, Loss: 11.9387, Train Acc: 0.7841
Epoch 2, Loss: 9.5216, Train Acc: 0.8441
Epoch 3, Loss: 7.6845, Train Acc: 0.8914
Epoch 4, Loss: 6.2610, Train Acc: 0.9097
Epoch 5, Loss: 5.4448, Train Acc: 0.9249
Epoch 6, Loss: 4.4928, Train Acc: 0.9381
Epoch 7, Loss: 3.9741, Train Acc: 0.9520
Epoch 8, Loss: 3.2279, Train Acc: 0.9628
Epoch 9, Loss: 2.3752, Train Acc: 0.9697
Epoch 10, Loss: 1.6990, Train Acc: 0.9785


In [29]:
model.eval()
correct = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        correct += (outputs.argmax(1) == y_batch).sum().item()

print(f"Test Accuracy: {correct / len(test_dataset):.4f}")

Test Accuracy: 0.9144
