
# Module 3 — Lesson 1: Data I/O for Training (Hands-on)
This notebook illustrates the **practical pipeline** you built in Lesson 1:
- A tiny **synthetic dataset** with `PAD/BOS/EOS` and a controllable **unknown-token rate**.
- **Length-aware batching** (bins) and a **collate_fn** that pads per-batch and builds `attention_mask`.
- A **DataLoader** configured with `num_workers`, `prefetch_factor`, and `persistent_workers`.
- **Tokens/sec** logging and a **micro overfit test** on a tiny batch.
- A peek at **DistributedSampler** (works in single-process for demo).

> You can run this on CPU if you don't have a GPU; it just runs slower.


In [1]:

# !pip install torch --quiet  # Uncomment if running in a fresh env
import math, random, time
from dataclasses import dataclass
from typing import List, Dict, Any

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, Sampler
from torch.utils.data.distributed import DistributedSampler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


device(type='cpu')


## 1) Tiny synthetic tokenizer + dataset
We emulate a tokenizer with a small vocabulary and insert:
- **BOS=1**, **EOS=2**, **PAD=0**, **UNK=3**
- A controllable **unknown-token rate** to show how to measure it.


In [2]:

PAD, BOS, EOS, UNK = 0, 1, 2, 3
VOCAB_SIZE = 200   # toy vocabulary (ids: 0..199)

def fake_tokenize(text: str, unk_prob: float = 0.02, max_word_id: int = 199) -> List[int]:
    """Turn characters into token IDs for demo; inject UNKs with probability unk_prob."""
    ids = [BOS]
    for ch in text:
        if ch == " ":
            continue
        if random.random() < unk_prob:
            tok = UNK
        else:
            tok = 4 + (ord(ch) % (max_word_id-4))  # map to [4..max_word_id]
        ids.append(tok)
    ids.append(EOS)
    return ids

def make_corpus(n=5000, min_len=20, max_len=200, unk_prob=0.02):
    random.seed(0)
    corpus = []
    for _ in range(n):
        L = random.randint(min_len, max_len)
        text = "".join(random.choice("abcdefghijklmnopqrstuvwxyz ") for _ in range(L))
        ids = fake_tokenize(text, unk_prob=unk_prob)
        corpus.append(ids)
    return corpus

corpus = make_corpus(n=6000, min_len=10, max_len=300, unk_prob=0.02)
len(corpus), sum(len(x) for x in corpus)//len(corpus)


(6000, 152)


### Measure unknown-token rate & a quick length histogram


In [3]:

def unknown_token_rate(seqs: List[List[int]]) -> float:
    total = sum(len(s) for s in seqs)
    unk = sum(s.count(UNK) for s in seqs)
    return 100.0 * unk / max(1,total)

def length_histogram(seqs: List[List[int]], cuts=(128,512,1024)):
    buckets = {"<=128":0, "<=512":0, "<=1024":0, ">1024":0}
    for s in seqs:
        L = len(s)
        if L <= 128: buckets["<=128"] += 1
        elif L <= 512: buckets["<=512"] += 1
        elif L <= 1024: buckets["<=1024"] += 1
        else: buckets[">1024"] += 1
    return buckets

unk_rate = unknown_token_rate(corpus)
hist = length_histogram(corpus)
unk_rate, hist


(1.9552834696917412, {'<=128': 2437, '<=512': 3563, '<=1024': 0, '>1024': 0})


## 2) Dataset + length-aware batching
We create a `Dataset` and a small **bucketing** step (3 bins). We then build **batches** from each bin and shuffle the order of batches.


In [4]:

class ToyDataset(Dataset):
    def __init__(self, seqs: List[List[int]]):
        self.seqs = seqs
    def __len__(self):
        return len(self.seqs)
    def __getitem__(self, idx):
        s = self.seqs[idx]
        return {"input_ids": torch.tensor(s, dtype=torch.long)}

dataset = ToyDataset(corpus)

# Build 3 length bins and pre-make batches (indices)
def build_length_bins(lengths, cuts=(128,512)):
    short, med, long = [], [], []
    for i, L in enumerate(lengths):
        if L <= cuts[0]: short.append(i)
        elif L <= cuts[1]: med.append(i)
        else: long.append(i)
    return short, med, long

lengths = [len(x) for x in corpus]
short, med, long_ = build_length_bins(lengths, cuts=(128,512))
len(short), len(med), len(long_)


(2437, 3563, 0)

In [5]:

def make_batches(idxs: List[int], batch_size: int) -> List[List[int]]:
    random.shuffle(idxs)
    return [idxs[i:i+batch_size] for i in range(0, len(idxs), batch_size)]

BS = 16  # per-iterator batch size (per-GPU if distributed)
batches = make_batches(short, BS) + make_batches(med, BS) + make_batches(long_, BS)
random.shuffle(batches)
len(batches), batches[0][:5]


(376, [3628, 1461, 5918, 4667, 5023])


### Collate with per-batch padding + attention mask
We pad each batch to its **own** max length and build `attention_mask` so the model ignores padding.


In [6]:

from torch.nn.utils.rnn import pad_sequence

def collate_pad(batch: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]:
    seqs = [item["input_ids"] for item in batch]
    max_len = max(s.size(0) for s in seqs)
    padded = pad_sequence(seqs, batch_first=True, padding_value=PAD)
    attn = (padded != PAD).long()
    # Labels = next-token with shift by 1; PAD on last position
    labels = padded.clone()
    labels[:, :-1] = padded[:, 1:]
    labels[:, -1] = PAD
    return {"input_ids": padded, "attention_mask": attn, "labels": labels}

# Quick smoke test
bt = [dataset[i] for i in batches[0]]
sample = collate_pad(bt)
sample["input_ids"].shape, sample["attention_mask"].shape, sample["labels"].shape, sample["attention_mask"][0,:20]


(torch.Size([16, 282]),
 torch.Size([16, 282]),
 torch.Size([16, 282]),
 tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]))


## 3) BatchSampler + DataLoader (workers, prefetch, persistence)
We use a **BatchSampler** so each yielded item is already a batch of indices.  
Tune `num_workers` and `prefetch_factor` gradually (e.g., 2→4).

In [7]:

class PrebuiltBatchSampler(Sampler[List[int]]):
    def __init__(self, batches: List[List[int]]):
        self._batches = batches
    def __iter__(self):
        return iter(self._batches)
    def __len__(self):
        return len(self._batches)

# Notebook-safe loader settings
NUM_WORKERS = 0          # <-- key change (was 2)
PREFETCH    = None       # ignored when workers=0
PERSISTENT  = False      # must be False when workers=0
PIN_MEMORY  = torch.cuda.is_available()

loader = DataLoader(
    dataset,
    batch_sampler=PrebuiltBatchSampler(batches),
    collate_fn=collate_pad,
    num_workers=NUM_WORKERS,
    persistent_workers=PERSISTENT,
    pin_memory=PIN_MEMORY
)


next(iter(loader))["input_ids"].shape


torch.Size([16, 282])


## 4) Tiny language model head + masked loss
A minimal model: embeddings + linear output; compute cross-entropy **only on non-PAD** positions.


In [8]:

class TinyLM(nn.Module):
    def __init__(self, vocab_size=VOCAB_SIZE, d_model=128):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.lm_head = nn.Linear(d_model, vocab_size)
    def forward(self, input_ids, attention_mask, labels=None):
        x = self.embed(input_ids)
        logits = self.lm_head(x)
        loss = None
        if labels is not None:
            # Flatten
            logits_f = logits.view(-1, logits.size(-1))
            labels_f = labels.view(-1)
            mask_f = attention_mask.view(-1).float()
            # Cross-entropy per token
            ce = nn.functional.cross_entropy(logits_f, labels_f, reduction="none")
            # Mask out PAD positions
            loss = (ce * mask_f).sum() / mask_f.sum().clamp_min(1.0)
        return {"logits": logits, "loss": loss}

model = TinyLM().to(device)
optim = torch.optim.AdamW(model.parameters(), lr=3e-3)
scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())


  scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())



## 5) Tokens/sec logging + micro overfit test
- We measure **tokens/sec** by summing `attention_mask` (real tokens only).
- **Micro overfit test**: overfit on 1 tiny batch to ensure labels/masks are correct.


In [9]:

def run_steps(dataloader, steps=50, accum_steps=1):
    model.train()
    tokens_seen = 0
    t0 = time.time()
    optim.zero_grad(set_to_none=True)
    for step, batch in enumerate(dataloader):
        if step >= steps: break
        input_ids = batch["input_ids"].to(device, non_blocking=True)
        attention_mask = batch["attention_mask"].to(device, non_blocking=True)
        labels = batch["labels"].to(device, non_blocking=True)
        tokens_seen += int(attention_mask.sum().item())

        with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
            out = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = out["loss"] / accum_steps

        scaler.scale(loss).backward()
        if (step + 1) % accum_steps == 0:
            scaler.step(optim); scaler.update()
            optim.zero_grad(set_to_none=True)

    dt = time.time() - t0
    tps = tokens_seen / max(1e-9, dt)
    return {"tokens_sec": tps, "steps": steps, "seconds": dt}

# Quick 30-step run
metrics = run_steps(loader, steps=30, accum_steps=2)
metrics


  with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):


{'tokens_sec': 651858.2996219884, 'steps': 30, 'seconds': 0.10469913482666016}

In [10]:

# Micro overfit test: take a single small batch and train until near-zero loss

# Notebook-safe loader settings
NUM_WORKERS = 0          # <-- key change (was 2)
PREFETCH    = None       # ignored when workers=0
PERSISTENT  = False      # must be False when workers=0
PIN_MEMORY  = torch.cuda.is_available()

micro_loader = DataLoader(
    dataset,
    batch_sampler=PrebuiltBatchSampler(batches),
    collate_fn=collate_pad,
    num_workers=NUM_WORKERS,
    persistent_workers=PERSISTENT,
    pin_memory=PIN_MEMORY
)


# Capture initial loss
batch = next(iter(micro_loader))
for k in batch: batch[k] = batch[k].to(device)
with torch.no_grad():
    init_loss = model(**batch)["loss"].item()

# Train on the same batch repeatedly
for _ in range(200):
    out = model(**batch)
    loss = out["loss"]
    optim.zero_grad(set_to_none=True)
    loss.backward()
    optim.step()

final_loss = model(**batch)["loss"].item()
init_loss, final_loss


(4.106113910675049, 3.175016403198242)


## 6) (Optional) DistributedSampler preview
If you launch with `torchrun`, **DistributedSampler** gives each GPU a different shard:
```python
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True, drop_last=False)
for epoch in range(num_epochs):
    sampler.set_epoch(epoch)  # reshuffle each epoch
    loader = DataLoader(dataset, batch_size=BS, sampler=sampler, collate_fn=collate_pad, ...)
```
For this single-process demo, we just construct it to show API shape.


In [11]:

# Demo: use DistributedSampler in single-process mode (world_size=1)
ds_sampler = DistributedSampler(dataset, num_replicas=1, rank=0, shuffle=True, drop_last=False)
# Notebook-safe loader settings
NUM_WORKERS = 0          # <-- key change (was 2)
PREFETCH    = None       # ignored when workers=0
PERSISTENT  = False      # must be False when workers=0
PIN_MEMORY  = torch.cuda.is_available()

dl_dist = DataLoader(
    dataset,
    batch_size=16,
    sampler=ds_sampler,
    batch_sampler=PrebuiltBatchSampler(batches),
    collate_fn=collate_pad,
    num_workers=NUM_WORKERS,
    persistent_workers=PERSISTENT,
    pin_memory=PIN_MEMORY
)

next(iter(dl_dist))["input_ids"].shape


ValueError: batch_sampler option is mutually exclusive with batch_size, shuffle, sampler, and drop_last