# Project 2 — Academic Tutor LM for 5000CMD (Theory of Computation)

**Goal**  
Fine-tune a small GPT model (**distilgpt2**) on the 5000CMD course pages  
so it can produce short and clear explanations (like DFAs, etc.).

**Steps**
1. Grab and clean text from the course website  
2. Make a dataset and split into train/val/test  
3. Load distilgpt2 and its tokenizer  
4. Fine-tune it with a PyTorch training loop  
5. Check results with loss/perplexity + run a sample generation






In [None]:

# Set up the environment so Colab has the right versions of the libraries
!pip -q install --no-cache-dir \
  "transformers==4.43.4" \
  "datasets==2.20.0" \
  "accelerate==0.33.0" \
  "huggingface-hub>=0.34.0,<1.0" \
  "beautifulsoup4==4.12.2" \
  "lxml==5.2.2" \
  "unidecode==1.3.8" \
  "tldextract==5.1.2" \
  "pyarrow<17" "fsspec==2024.5.0" \
  "numpy==1.26.4" "pandas==2.2.2"

# Import libraries
import os, re, json, time, hashlib, textwrap
from pathlib import Path
from urllib.parse import urljoin, urldefrag
import requests
from bs4 import BeautifulSoup
from unidecode import unidecode
import tldextract

# Import the NLP and dataset tools we need
import transformers, datasets, accelerate, huggingface_hub
from datasets import Dataset, DatasetDict

# Version check
print("✅ Environment ready:")
print("transformers:", transformers.__version__)
print("datasets:", datasets.__version__)
print("accelerate:", accelerate.__version__)
print("huggingface-hub:", huggingface_hub.__version__)


In [None]:
# --- Data crawl config ---
BASE_URL = "https://github.coventry.ac.uk/pages/ab3735/5000CMD/"
SAVE_DIR = Path("/content/5000cmd_data")
RAW_DIR  = SAVE_DIR / "raw_html"
CLEAN_DIR = SAVE_DIR / "clean"
RAW_DIR.mkdir(parents=True, exist_ok=True)
CLEAN_DIR.mkdir(parents=True, exist_ok=True)

MAX_PAGES = 60    # don’t crawl too many
REQUEST_TIMEOUT = 15  # timeout per reques (seconds)
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "Colab data prep for coursework"})

def same_site(url, base):
    return tldextract.extract(url).registered_domain == tldextract.extract(base).registered_domain

def should_visit(url):
    # only keep same-site HTML pages; skip pdf/images/css/js/etc.
    if not same_site(url, BASE_URL):
        return False
    if any(url.lower().endswith(ext) for ext in (".pdf", ".zip", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".css", ".js")):
        return False
    return True

def fetch(url):
   # just fetch the page content (raise error if status not ok)
    r = SESSION.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
    r.raise_for_status()
    return r.text

def extract_main_text(html):
    soup = BeautifulSoup(html, "lxml")

    # remove menus, scripts, styles so we only keep useful content
    for tag in soup(["script", "style", "noscript"]):
        tag.decompose()
    for sel in ["nav", "header", "footer", "aside"]:
        for t in soup.select(sel):
            t.decompose()

    # try to grab <main> or <article>, else just take <body>
    main = soup.select_one("main") or soup.select_one("article") or soup.body or soup
    text = "\n".join(p.get_text(" ", strip=True) for p in main.find_all(["h1","h2","h3","p","li","pre","code"]))
    title = (soup.title.get_text(" ", strip=True) if soup.title else "").strip()
    return title, text

def normalize(text):
    # clean up: fix whitespace, ascii-fy, drop super short/boilerplate lines
    text = unidecode(text)
    boilerplate = [
        "back to top", "coventry university", "github pages", "privacy", "cookies"
    ]
    # Clean up text lines:
    lines = []
    for ln in re.split(r"\s*\n\s*", text):
        ln = re.sub(r"\s+", " ", ln).strip()
        if len(ln) < 20:        # skip lines that are too short
            continue
        if any(bp in ln.lower() for bp in boilerplate):
            continue
        lines.append(ln)
    return "\n".join(lines).strip()

def crawl(base_url=BASE_URL, max_pages=MAX_PAGES):
    visited, queue = set(), [base_url]
    docs = []

    while queue and len(visited) < max_pages:
        url = queue.pop(0)
        url, _ = urldefrag(url)  # remove the #section part from URL
        if url in visited or not should_visit(url):
            continue
        try:
            html = fetch(url)
        except Exception as e:
            print("⚠️ skip:", url, "-", e)
            continue

        visited.add(url)
         # save raw html
        (RAW_DIR / f"{hashlib.md5(url.encode()).hexdigest()}.html").write_text(html, encoding="utf-8")

        title, raw_text = extract_main_text(html)
        clean_text = normalize(raw_text)

        if clean_text:
            docs.append({
                "url": url,
                "title": title or "Untitled",
                "text": clean_text
            })
            # quick progress log
            print(f"✅ {len(visited):>3}/{max_pages}  {title[:60]}  ({len(clean_text)} chars)")

        # find new links on the same site
        soup = BeautifulSoup(html, "lxml")
        for a in soup.find_all("a", href=True):
            nxt = urljoin(url, a["href"])
            nxt, _ = urldefrag(nxt)
            if nxt not in visited and should_visit(nxt):
                queue.append(nxt)

    return docs

docs = crawl()
len(docs), docs[0] if docs else None


In [None]:
# remove duplicates
seen = set()
unique_docs = []
for d in docs:
    h = hashlib.sha1(d["text"].encode()).hexdigest()
    if h in seen:
        continue
    seen.add(h)
    unique_docs.append(d)


print(f"📦 total {len(docs)} docs, after dedup {len(unique_docs)} docs")

# save dataset to JSONL (one record per line, good for reloading later)
jsonl_path = CLEAN_DIR / "5000cmd_clean.jsonl"
with jsonl_path.open("w", encoding="utf-8") as f:
    for d in unique_docs:
        f.write(json.dumps(d, ensure_ascii=False) + "\n")


# also save as CSV (optional, easier to open in Excel)
import csv
csv_path = CLEAN_DIR / "5000cmd_clean.csv"
with csv_path.open("w", newline="", encoding="utf-8") as f:
    w = csv.DictWriter(f, fieldnames=["url","title","text"])
    w.writeheader()
    for d in unique_docs:
        w.writerow(d)

print("✅ Saved to:", jsonl_path, "and", csv_path)


In [None]:
def build_examples(rows):
    examples = []
    for d in rows:
        # put the title at the top of the text （helps the model learn the topic)
        merged = f"# {d['title']}\n\n{d['text']}".strip()
        examples.append({"text": merged, "meta_url": d["url"]})
    return examples

examples = build_examples(unique_docs)
print("Number of samples:", len(examples))
print("Preview:\n", textwrap.shorten(examples[0]["text"], width=300, placeholder="…"))

# make a Hugging Face Dataset and split it into train/val/test
ds_all = Dataset.from_list(examples)
# split the dataset: 80% train   10% validation   10% test
tmp = ds_all.train_test_split(test_size=0.2, seed=413)
ds_val_test = tmp["test"].train_test_split(test_size=0.5, seed=413)
dataset_dict = DatasetDict({
    "train": tmp["train"],
    "validation": ds_val_test["train"],
    "test": ds_val_test["test"]
})

dataset_dict


In [None]:
def char_len(example):
    # add a new field: how many characters each text has
    return {"n_chars": len(example["text"])}

ds_with_len = dataset_dict.map(char_len)
print("Train size:", len(ds_with_len["train"]))
print("Val size:", len(ds_with_len["validation"]))
print("Test size:", len(ds_with_len["test"]))

# show the first two samples from each split (train/val/test)
for split in ["train", "validation", "test"]:
    print(f"\n=== {split.upper()} preview ===")
    for i in range(min(2, len(ds_with_len[split]))):
        ex = ds_with_len[split][i]
        print("URL:", ex["meta_url"])
        print(textwrap.shorten(ex["text"], width=280, placeholder="…"))


In [None]:
DATASET_DISK_DIR = str(SAVE_DIR / "hf_dataset")
dataset_dict.save_to_disk(DATASET_DISK_DIR)
print("📁 Dataset saved to:", DATASET_DISK_DIR)


In [None]:
from datasets import load_from_disk

dataset = load_from_disk("/content/5000cmd_data/hf_dataset")
print(dataset)

# look at one sample from the training set
print(dataset["train"][0])


In [None]:
from datasets import load_from_disk
dataset = load_from_disk("/content/5000cmd_data/hf_dataset")
dataset


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

MODEL_NAME = "distilgpt2"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
model.resize_token_embeddings(len(tokenizer))


In [None]:
from itertools import chain

BLOCK_SIZE = 256  # each training sample will be this many tokens long

def tokenize_fn(batch):
    # convert text into token IDs
    return tokenizer(batch["text"], truncation=False)

tokenized = dataset.map(tokenize_fn, batched=True, remove_columns=dataset["train"].column_names)

def group_texts(examples):
    # glue all tokens together, then cut into fixed-size chunks (BLOCK_SIZE)
    concatenated = {k: list(chain(*examples[k])) for k in examples.keys()}
    total_length = len(concatenated["input_ids"])
    total_length = (total_length // BLOCK_SIZE) * BLOCK_SIZE
    result = {
        k: [t[i:i+BLOCK_SIZE] for i in range(0, total_length, BLOCK_SIZE)]
        for k, t in concatenated.items()
    }
    # labels are the same as input_ids (for next-token prediction)
    result["labels"] = result["input_ids"].copy()
    return result

# final dataset ready for language model training
lm_datasets = tokenized.map(group_texts, batched=True)
lm_datasets


In [None]:
from transformers import DataCollatorForLanguageModeling
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)


In [None]:
# Custom PyTorch training loop
import math, os, shutil, time
from pathlib import Path
import torch
from torch.utils.data import DataLoader

# need: model, tokenizer, lm_datasets, data_collator (already created above)
# hyperparameters (same idea as TrainingArguments)
OUTPUT_DIR = "/content/model_5000cmd_distilgpt2"
per_device_train_batch_size = 2
per_device_eval_batch_size  = 2
gradient_accumulation_steps = 8
num_train_epochs = 2
learning_rate = 5e-5
weight_decay  = 0.01
warmup_ratio  = 0.05
logging_steps = 50
save_total_limit = 2    # only keep the last 2 checkpoints

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()

# DataLoaders
train_loader = DataLoader(
    lm_datasets["train"],
    batch_size=per_device_train_batch_size,
    shuffle=True,
    collate_fn=data_collator,
    num_workers=2,
    pin_memory=torch.cuda.is_available(),
)
eval_loader = DataLoader(
    lm_datasets["validation"],
    batch_size=per_device_eval_batch_size,
    shuffle=False,
    collate_fn=data_collator,
    num_workers=2,
    pin_memory=torch.cuda.is_available(),
)

# AdamW optimizer + learning rate scheduler (linear warmup + decay)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

num_update_steps_per_epoch = math.ceil(len(train_loader) / gradient_accumulation_steps)
max_train_steps = num_update_steps_per_epoch * num_train_epochs
warmup_steps = int(max_train_steps * warmup_ratio)

def lr_lambda(current_step):
    if current_step < warmup_steps:
        return float(current_step) / float(max(1, warmup_steps))
    return max(0.0, float(max_train_steps - current_step) / float(max(1, max_train_steps - warmup_steps)))

scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

# AMP scaler (mixed precision training on GPU if available)
scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())
global_step = 0

def evaluate():
    # run validation loop and return average loss + perplexity
    model.eval()
    total_loss, total_tokens = 0.0, 0
    with torch.no_grad():
        for batch in eval_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
                outputs = model(**batch)
                loss = outputs.loss
            tokens = batch["input_ids"].numel()
            total_loss += loss.item() * tokens
            total_tokens += tokens
    model.train()
    avg_loss = total_loss / max(1, total_tokens)
    ppl = math.exp(avg_loss) if avg_loss < 20 else float("inf")
    return avg_loss, ppl

# training loop
start_time = time.time()
for epoch in range(1, num_train_epochs + 1):
    running_loss = 0.0
    for step, batch in enumerate(train_loader, start=1):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
            outputs = model(**batch)
            loss = outputs.loss / gradient_accumulation_steps
        scaler.scale(loss).backward()
        running_loss += loss.item()

        if step % gradient_accumulation_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad(set_to_none=True)
            scheduler.step()
            global_step += 1

            if global_step % logging_steps == 0:
                curr_lr = scheduler.get_last_lr()[0]
                print(f"[epoch {epoch}] step {global_step}/{max_train_steps} "
                      f"loss={running_loss:.4f} lr={curr_lr:.6f}")
                running_loss = 0.0

    # evaluate + save checkpoint at the end of each epoch
    eval_loss, eval_ppl = evaluate()
    print(f"⭐ EVAL epoch {epoch}: eval_loss={eval_loss:.6f}, perplexity={eval_ppl:.2f}")

    ckpt_dir = Path(OUTPUT_DIR) / f"checkpoint-epoch-{epoch}"
    ckpt_dir.mkdir(parents=True, exist_ok=True)
    model.save_pretrained(ckpt_dir)
    tokenizer.save_pretrained(ckpt_dir)

    # clean up old checkpoints (keep only the last N)
    ckpts = sorted(Path(OUTPUT_DIR).glob("checkpoint-epoch-*"), key=os.path.getmtime)
    if len(ckpts) > save_total_limit:
        for p in ckpts[:-save_total_limit]:
            shutil.rmtree(p, ignore_errors=True)

# save the final model and tokenizer
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
model.save_pretrained(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)

# final eval on validation set and print results
final_loss, final_ppl = evaluate()
print({"eval_loss": final_loss, "perplexity": final_ppl})
print("✅ Saved to", OUTPUT_DIR, " | elapsed: %.1f min" % ((time.time()-start_time)/60))


In [None]:
from transformers import pipeline

generator = pipeline(
    "text-generation",
    model=OUTPUT_DIR,
    tokenizer=tokenizer,
    device=0 if torch.cuda.is_available() else -1
)

prompt = "In automata theory, a deterministic finite automaton"
out = generator(
    prompt,
    max_new_tokens=120,
    do_sample=True,
    top_p=0.95,
    temperature=0.8,
    pad_token_id=tokenizer.eos_token_id
)[0]["generated_text"]

print(out)
