In [1]:
import math, random, re, time
from collections import Counter
from typing import List, Tuple
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

seed = 42
random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [2]:
# -----------------------------
# A small, relatable corpus
# âžœ Students: replace `reviews` / `stories` with your own text.
# -----------------------------
reviews = [
    "I love this phone it has a great camera and long battery life .",
    "The laptop is fast but the fan is noisy during gaming .",
    "These headphones are comfortable and the sound quality is excellent .",
    "Terrible charger it stopped working after two weeks .",
    "This smartwatch tracks my sleep and steps accurately every day .",
    "Great picture quality but the remote feels cheap .",
    "The blender is powerful and easy to clean highly recommended .",
    "Waste of money the speakers crackle at high volume .",
    "Comfortable chair but the wheels feel flimsy .",
    "Amazing tablet perfect for reading drawing and taking notes .",
]

stories = [
    "The cat watched the rain and waited at the window .",
    "She packed a small bag and left before sunrise .",
    "He wrote the final line and closed the notebook gently .",
    "They reached the hilltop just as the clouds opened .",
    "We met at the station and missed the train anyway .",
    "A quiet tune played while the city lights flickered .",
    "He learned to fix radios from a faded manual .",
    "She baked bread that reminded everyone of home .",
]

# Duplicate a bit for a tiny-but-usable dataset (still quick on CPU)
raw_sentences = (reviews * 10) + (stories * 10)
random.shuffle(raw_sentences)

In [3]:
# -----------------------------
# Cleaning & tokenization
# -----------------------------
def simple_tokenize(s: str) -> List[str]:
    s = s.lower().strip()
    s = re.sub(r"[^a-z0-9\s\.]", " ", s)  # keep letters, numbers, spaces, periods
    s = re.sub(r"\s+", " ", s)
    tokens = s.split()
    if not tokens or tokens[-1] != ".":
        tokens.append(".")  # ensure a sentence end
    return tokens

sent_tokens = [simple_tokenize(s) for s in raw_sentences]
total_tokens = sum(len(t) for t in sent_tokens)
print(f"Sentences: {len(sent_tokens)} | Total tokens: {total_tokens} | Avg len: {total_tokens/len(sent_tokens):.2f}")

Sentences: 180 | Total tokens: 1870 | Avg len: 10.39


In [4]:
# -----------------------------
# Vocabulary
# -----------------------------
SPECIALS = ["<pad>", "<unk>"]
counter = Counter([w for sent in sent_tokens for w in sent])
itos = SPECIALS + sorted(counter.keys())
stoi = {w: i for i, w in enumerate(itos)}
pad_id, unk_id = stoi["<pad>"], stoi["<unk>"]
vocab_size = len(itos)
print("Vocab size:", vocab_size)

def encode_sentence(tokens: List[str]) -> List[int]:
    return [stoi.get(w, unk_id) for w in tokens]

encoded_sentences = [encode_sentence(t) for t in sent_tokens]

Vocab size: 129


In [5]:
# -----------------------------
# Build sequences (word-level LM)
# Input: previous SEQ_LEN tokens (left-padded)
# Target: current token
# -----------------------------
SEQ_LEN = 12

def make_sequences(encoded: List[List[int]], seq_len: int = SEQ_LEN) -> Tuple[np.ndarray, np.ndarray]:
    X, Y = [], []
    for sent in encoded:
        if len(sent) < 2:
            continue
        for i in range(1, len(sent)):
            start = max(0, i - seq_len)
            seq = sent[start:i]
            if len(seq) < seq_len:
                seq = [pad_id] * (seq_len - len(seq)) + seq  # left pad
            X.append(seq)
            Y.append(sent[i])
    return np.array(X, dtype=np.int64), np.array(Y, dtype=np.int64)

X, Y = make_sequences(encoded_sentences, SEQ_LEN)
N = len(X)
perm = np.random.permutation(N)
split = int(0.9 * N)
tr_idx, va_idx = perm[:split], perm[split:]
X_train, Y_train = X[tr_idx], Y[tr_idx]
X_val, Y_val     = X[va_idx], Y[va_idx]
print(f"Train pairs: {len(X_train)} | Val pairs: {len(X_val)}")

Train pairs: 1521 | Val pairs: 169


In [6]:
# -----------------------------
# Dataset & DataLoader
# -----------------------------
class LMDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.Y = torch.tensor(Y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, i):
        return self.X[i], self.Y[i]

BATCH_SIZE = 64
train_dl = DataLoader(LMDataset(X_train, Y_train), batch_size=BATCH_SIZE, shuffle=True)
val_dl   = DataLoader(LMDataset(X_val, Y_val), batch_size=BATCH_SIZE)


In [7]:
# -----------------------------
# Models: RNN, LSTM, GRU (same interface)
# Predict next token from last hidden state
# -----------------------------
class RNNLM(nn.Module):
    def __init__(self, vocab_size: int, emb: int = 64, hid: int = 128, cell_type: str = "RNN", pad_index: int = 0):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb, padding_idx=pad_index)
        cell_type = cell_type.upper()
        rnn_cls = {"RNN": nn.RNN, "LSTM": nn.LSTM, "GRU": nn.GRU}[cell_type]
        self.rnn = rnn_cls(emb, hid, batch_first=True)
        self.fc  = nn.Linear(hid, vocab_size)
    def forward(self, x):
        e = self.emb(x)        # [B, T, E]
        y, _ = self.rnn(e)     # [B, T, H]
        last = y[:, -1, :]     # use last time step to predict next token
        logits = self.fc(last) # [B, V]
        return logits

In [8]:
# -----------------------------
# Training & evaluation utilities
# -----------------------------
def accuracy_from_logits(logits: torch.Tensor, targets: torch.Tensor) -> float:
    pred = logits.argmax(dim=-1)
    return (pred == targets).float().mean().item()

def evaluate(model: nn.Module, loader, criterion) -> Tuple[float, float, float]:
    model.eval()
    total_loss, total_acc, n = 0.0, 0.0, 0
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            acc = accuracy_from_logits(logits, yb)
            b = xb.size(0)
            total_loss += loss.item() * b
            total_acc  += acc * b
            n += b
    val_loss = total_loss / n
    val_ppl  = math.exp(val_loss)
    val_acc  = total_acc / n
    return val_loss, val_ppl, val_acc

def train_model(model: nn.Module, train_loader, val_loader, epochs=8, lr=1e-3, clip=1.0):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    hist = {"train_loss": [], "val_loss": [], "val_ppl": [], "val_acc": []}
    for ep in range(1, epochs + 1):
        model.train()
        running, n = 0.0, 0
        t0 = time.time()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), clip)
            opt.step()
            running += loss.item() * xb.size(0)
            n += xb.size(0)
        train_loss = running / n
        val_loss, val_ppl, val_acc = evaluate(model, val_loader, criterion)
        hist["train_loss"].append(train_loss)
        hist["val_loss"].append(val_loss)
        hist["val_ppl"].append(val_ppl)
        hist["val_acc"].append(val_acc)
        print(f"Epoch {ep:02d} | train {train_loss:.3f} | val {val_loss:.3f} | ppl {val_ppl:.2f} | acc {val_acc:.3f} | time {time.time()-t0:.1f}s")
    return hist

In [9]:
# -----------------------------
# Train RNN, LSTM, GRU
# -----------------------------
EPOCHS = 8
EMB = 64
HID = 128

histories = {}
models = {}
for cell in ["RNN", "LSTM", "GRU"]:
    print(f"\n=== Training {cell} ===")
    m = RNNLM(vocab_size, emb=EMB, hid=HID, cell_type=cell, pad_index=pad_id)
    h = train_model(m, train_dl, val_dl, epochs=EPOCHS, lr=1e-3, clip=1.0)
    models[cell] = m.cpu()  # move to CPU for generation
    histories[cell] = h


=== Training RNN ===
Epoch 01 | train 4.402 | val 3.845 | ppl 46.78 | acc 0.509 | time 0.1s
Epoch 02 | train 3.220 | val 2.787 | ppl 16.23 | acc 0.609 | time 0.2s
Epoch 03 | train 2.099 | val 1.796 | ppl 6.03 | acc 0.828 | time 0.2s
Epoch 04 | train 1.191 | val 1.041 | ppl 2.83 | acc 0.923 | time 0.2s
Epoch 05 | train 0.633 | val 0.596 | ppl 1.81 | acc 0.959 | time 0.2s
Epoch 06 | train 0.359 | val 0.354 | ppl 1.42 | acc 0.982 | time 0.2s
Epoch 07 | train 0.232 | val 0.231 | ppl 1.26 | acc 0.976 | time 0.2s
Epoch 08 | train 0.166 | val 0.171 | ppl 1.19 | acc 0.976 | time 0.2s

=== Training LSTM ===
Epoch 01 | train 4.694 | val 4.387 | ppl 80.39 | acc 0.101 | time 0.3s
Epoch 02 | train 3.938 | val 3.752 | ppl 42.61 | acc 0.183 | time 0.2s
Epoch 03 | train 3.057 | val 2.770 | ppl 15.96 | acc 0.337 | time 0.3s
Epoch 04 | train 2.092 | val 1.927 | ppl 6.87 | acc 0.580 | time 0.2s
Epoch 05 | train 1.374 | val 1.325 | ppl 3.76 | acc 0.870 | time 0.2s
Epoch 06 | train 0.914 | val 0.900 | ppl

In [10]:
print("== Validation Metrics ==")
print("Model\tPPL (last)\tAcc (last)")
for name, h in histories.items():
    ppl = h["val_ppl"][-1]
    acc = h["val_acc"][-1]
    print(f"{name}\t{ppl:.2f}\t\t{acc:.3f}")


== Validation Metrics ==
Model	PPL (last)	Acc (last)
RNN	1.19		0.976
LSTM	1.50		0.976
GRU	1.28		0.976


In [11]:
print("== Validation Metrics ==")
print("Model\tPPL (last)\tAcc (last)")
for name, h in histories.items():
    ppl = h["val_ppl"][-1]
    acc = h["val_acc"][-1]
    print(f"{name}\t{ppl:.2f}\t\t{acc:.3f}")

== Validation Metrics ==
Model	PPL (last)	Acc (last)
RNN	1.19		0.976
LSTM	1.50		0.976
GRU	1.28		0.976


In [15]:
# -----------------------------
# Text generation demo
# -----------------------------
def tok(s: str) -> List[str]:
    return simple_tokenize(s)

def sample_next(probs: np.ndarray) -> int:
    probs = probs / probs.sum()
    return np.random.choice(len(probs), p=probs)

def generate_text(model: nn.Module, seed: str, steps: int = 20) -> str:
    model.eval()
    words = tok(seed)
    seq = [pad_id] * (SEQ_LEN - len(words)) + [stoi.get(w, unk_id) for w in words[-SEQ_LEN:]]
    seq = seq[-SEQ_LEN:]
    out_words = words.copy()
    with torch.no_grad():
        for _ in range(steps):
            x = torch.tensor([seq], dtype=torch.long)
            logits = model(x)
            probs = torch.softmax(logits[0], dim=0).numpy()
            nxt = sample_next(probs)
            out_words.append(itos[nxt])
            seq = seq[1:] + [nxt]
    return " ".join(out_words)

for name, m in models.items():
    print(f"\n{name} generation:")
    print(generate_text(m, seed="this is my version of the story", steps=150))


RNN generation:
this is my version of the story . . . i love this phone it has a great camera and long battery life . these left cheap is fast but the closed lights flickered . at the station and flickered powerful . noisy during gaming . . packed a faded are played while the city lights flickered . . at drawing the sound is . . volume . powerful charger charger chair but the wheels feel flimsy . two volume . waited money the speakers crackle at high volume . . excellent for highly recommended as . after and waited excellent . flickered closed the to fix radios from a faded manual . . laptop window wheels amazing flimsy cat sunrise the but the wheels feel flimsy . but the remote feels cheap . my charger flimsy . at the window . excellent quality but the remote feels cheap . noisy during accurately . . quiet clean we

LSTM generation:
this is my version of the story . . . . . the wrote feels fast is quality is fast but the . is gaming powerful a great day . . . . . drawing for powerful

In [None]:
# -----------------------------
# Extensions for student projects
# -----------------------------
print("""
Extensions:
- Paste your own text into the 'reviews' / 'stories' lists (or load from a file) and rerun.
- Compare different hidden sizes, embedding dims, and sequence lengths.
- Swap nn.LSTM for nn.GRU or nn.RNN and re-train to compare.
- Try dropout, weight tying, or character-level modeling.
- Replace sampling with top-k or beam search for generation.
Ethics:
- If you use personal or sensitive text, models can memorize and regurgitate it.
""")