In [None]:
import os, shutil, time, math, gc, tempfile, json, contextlib
from dataclasses import dataclass

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset

# Optimum (Quanto) for post-training quantization (static, weight-only int8)
from optimum.quanto import quantize, freeze, qint8

MODEL_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
DEVICE   = "cuda" if torch.cuda.is_available() else "cpu"
SEED     = 42

torch.manual_seed(SEED)
if DEVICE == "cuda":
    torch.cuda.manual_seed_all(SEED)

# Utilities

In [None]:
def dir_size_mb(path: str) -> float:
    total = 0
    for root, _, files in os.walk(path):
        for f in files:
            total += os.path.getsize(os.path.join(root, f))
    return total / (1024**2)

In [None]:
@contextlib.contextmanager
def torch_cuda_monitor():
    """Context manager to measure peak GPU memory in MB."""
    if DEVICE == "cuda":
        torch.cuda.reset_peak_memory_stats()
        torch.cuda.synchronize()
        start_alloc = torch.cuda.memory_allocated()
        try:
            yield
        finally:
            torch.cuda.synchronize()
            peak = torch.cuda.max_memory_allocated()
            torch.cuda.empty_cache()
            # return values indirectly by storing on the function object
            torch_cuda_monitor.peak_mb = peak / (1024**2)
            torch_cuda_monitor.start_mb = start_alloc / (1024**2)
    else:
        try:
            yield
        finally:
            torch_cuda_monitor.peak_mb = 0.0
            torch_cuda_monitor.start_mb = 0.0

In [None]:
@dataclass
class GenMetrics:
    latency_s: float
    tokens_per_sec: float
    peak_gpu_mem_mb: float

In [None]:
def measure_generate(model, tokenizer, prompt: str, max_new_tokens=64, runs=3) -> GenMetrics:
    """Measure latency, throughput, and peak GPU memory for text generation."""
    model.eval()
    inputs = tokenizer(prompt, return_tensors="pt")
    inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
    input_len = inputs["input_ids"].shape[1]

    # Warmup
    with torch.inference_mode():
        _ = model.generate(**inputs, max_new_tokens=8, do_sample=False, use_cache=True)

    latencies, tps = [], []
    with torch_cuda_monitor():
        for _ in range(runs):
            if DEVICE == "cuda":
                torch.cuda.synchronize()
            t0 = time.perf_counter()
            with torch.inference_mode():
                out = model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False, use_cache=True)
            if DEVICE == "cuda":
                torch.cuda.synchronize()
            t1 = time.perf_counter()

            gen_len = out.shape[1] - input_len
            lat = t1 - t0
            latencies.append(lat)
            tps.append(gen_len / lat if lat > 0 else float("nan"))

    return GenMetrics(
        latency_s=sum(latencies)/len(latencies),
        tokens_per_sec=sum(tps)/len(tps),
        peak_gpu_mem_mb=getattr(torch_cuda_monitor, "peak_mb", 0.0),
    )

In [None]:
@torch.no_grad()
def compute_perplexity(model, tokenizer, seq_len=128) -> float:
    """
    Self-contained perplexity estimate using a small built-in eval text.
    Keeps evaluation light but still allows FP32 vs INT8 comparison.
    """
    eval_text = (
        "Quantization reduces the precision of neural network weights and activations. "
        "This process shrinks model size, lowers memory use, and can speed up inference. "
        "The tradeoff is a small drop in accuracy. "
        "Perplexity measures how well a language model predicts text: "
        "a lower perplexity means the model is more confident in its predictions. "
        "Large language models like LLaMA or TinyLlama are evaluated on benchmarks such as WikiText, "
        "where perplexity is calculated over thousands of tokens. "
        "In practice, we only need a small text sample to compare relative changes. "
        "By quantizing a model to 8-bit, we can observe whether perplexity increases significantly. "
        "If the rise is modest while speed and memory improve, quantization is usually a good trade-off. "
        "This evaluation text is deliberately extended to ensure enough tokens for testing."
    )

    enc = tokenizer(eval_text, return_tensors="pt")
    input_ids = enc["input_ids"][0]

    usable = (len(input_ids) // seq_len) * seq_len
    input_ids = input_ids[:usable + 1]
    if len(input_ids) <= seq_len:
        raise ValueError("Not enough tokens for perplexity calculation. Try reducing seq_len.")

    nll_sum, tok_count = 0.0, 0
    model.eval()

    for start in range(0, len(input_ids) - 1 - seq_len, seq_len):
        chunk = input_ids[start:start+seq_len+1]
        inp = chunk[:-1].unsqueeze(0).to(DEVICE)
        labels = chunk[1:].unsqueeze(0).to(DEVICE)

        out = model(input_ids=inp, labels=labels)
        nll_sum += float(out.loss) * labels.numel()
        tok_count += labels.numel()

    return math.exp(nll_sum / max(1, tok_count))

In [None]:
def save_and_size(model, tokenizer, out_dir: str) -> float:
    if os.path.exists(out_dir):
        shutil.rmtree(out_dir)
    os.makedirs(out_dir, exist_ok=True)
    tokenizer.save_pretrained(out_dir)
    model.save_pretrained(out_dir, safe_serialization=True)
    return dir_size_mb(out_dir)

In [None]:
def print_row(title, size_mb, lat_s, tps, gpu_mb, ppl):
    print(
        f"{title:18s} | Size: {size_mb:8.1f} MB | Latency: {lat_s:7.3f} s | "
        f"Throughput: {tps:7.2f} tok/s | Peak VRAM: {gpu_mb:7.1f} MB | PPL: {ppl:7.2f}"
    )

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
# Some chat models have no pad token; make generation/perplexity robust:
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
print("\n== Baseline FP32 ==")
baseline_model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype=torch.float32,
    low_cpu_mem_usage=True,
)
baseline_model.to(DEVICE)

baseline_size_mb = save_and_size(baseline_model, tokenizer, out_dir="tinyllama_fp32")
baseline_gen = measure_generate(
    baseline_model,
    tokenizer,
    prompt="Explain quantization in one paragraph for ML engineers.",
    max_new_tokens=128,
    runs=3,
)
baseline_ppl = compute_perplexity(baseline_model, tokenizer, seq_len=128)

print_row("FP32 (baseline)", baseline_size_mb, baseline_gen.latency_s, baseline_gen.tokens_per_sec,
          baseline_gen.peak_gpu_mem_mb, baseline_ppl)

In [None]:
print("\n== PTQ INT8 (Optimum-Quanto, weight-only) ==")
q_model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype=torch.float32,
    low_cpu_mem_usage=True,
)

# Static PTQ (weight-only): no calibration set required.
# This converts Linear weights to int8-packed format and wires quant/dequant where needed.
quantize(q_model, weights=qint8)
freeze(q_model)               # finalize quantization graphs / params
q_model.to(DEVICE)

q_size_mb = save_and_size(q_model, tokenizer, out_dir="tinyllama_int8_quanto")

q_gen = measure_generate(
    q_model,
    tokenizer,
    prompt="Explain quantization in one paragraph for ML engineers.",
    max_new_tokens=128,
    runs=3,
)
q_ppl = compute_perplexity(q_model, tokenizer, seq_len=128)

print_row("INT8 (Quanto)", q_size_mb, q_gen.latency_s, q_gen.tokens_per_sec, q_gen.peak_gpu_mem_mb, q_ppl)

In [1]:

# --- 5) Summary JSON (optional) ----------------------------------------------
summary = {
    "device": DEVICE,
    "model": MODEL_ID,
    "seed": SEED,
    "baseline_fp32": {
        "size_mb": baseline_size_mb,
        "latency_s": baseline_gen.latency_s,
        "tokens_per_sec": baseline_gen.tokens_per_sec,
        "peak_gpu_mem_mb": baseline_gen.peak_gpu_mem_mb,
        "perplexity": baseline_ppl,
    },
    "int8_quanto": {
        "size_mb": q_size_mb,
        "latency_s": q_gen.latency_s,
        "tokens_per_sec": q_gen.tokens_per_sec,
        "peak_gpu_mem_mb": q_gen.peak_gpu_mem_mb,
        "perplexity": q_ppl,
    },
}
with open("ptq_tinyllama_summary.json", "w") as f:
    json.dump(summary, f, indent=2)

print("\nSaved summary -> ptq_tinyllama_summary.json")


2025-08-26 22:42:09.317398: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-08-26 22:42:09.331712: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-08-26 22:42:09.349317: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-08-26 22:42:09.354787: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-08-26 22:42:09.367749: I tensorflow/core/platform/cpu_feature_guar


== Baseline FP32 ==
FP32 (baseline)    | Size:   4200.3 MB | Latency:   2.654 s | Throughput:   48.24 tok/s | Peak VRAM:  4212.9 MB | PPL:    1.00

== PTQ INT8 (Optimum-Quanto, weight-only) ==
INT8 (Quanto)      | Size:   1242.5 MB | Latency:   6.688 s | Throughput:   19.14 tok/s | Peak VRAM:  5765.4 MB | PPL:    1.00

Saved summary -> ptq_tinyllama_summary.json


# Part 2

In [2]:
import os, time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.ao.quantization import quantize_dynamic

# -----------------------------
# Config (CPU-only demo)
# -----------------------------
MODEL_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"  # swap to "sshleifer/tiny-gpt2" if CPU is tight
DEVICE = "cpu"
MAX_NEW_TOKENS = 128
RUNS = 3
PROMPT = "Quantization test: explain why int8 dynamic quantization can be faster on CPU."

torch.set_grad_enabled(False)
torch.set_num_threads(max(1, os.cpu_count() or 1))  # let PyTorch use available cores

def measure_generate(model, tokenizer, prompt=PROMPT, max_new_tokens=MAX_NEW_TOKENS, runs=RUNS):
    model.eval()
    tokenizer.padding_side = "left"
    enc = tokenizer(prompt, return_tensors="pt").to(DEVICE)

    # Warmup
    with torch.inference_mode():
        _ = model.generate(**enc, max_new_tokens=8, use_cache=True)

    latencies, throughputs = [], []
    for _ in range(runs):
        t0 = time.perf_counter()
        with torch.inference_mode():
            out = model.generate(**enc, max_new_tokens=max_new_tokens, use_cache=True)
        t1 = time.perf_counter()
        gen_len = out.shape[1] - enc["input_ids"].shape[1]
        lat = t1 - t0
        latencies.append(lat)
        throughputs.append(gen_len / lat)

    return sum(latencies)/len(latencies), sum(throughputs)/len(throughputs)

# -----------------------------
# Tokenizer (shared)
# -----------------------------
tok = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
if tok.pad_token_id is None:
    tok.pad_token_id = tok.eos_token_id

# -----------------------------
# Baseline: FP32 on CPU
# -----------------------------
print("== CPU FP32 Baseline ==")
model_fp32 = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype=torch.float32).to(DEVICE).eval()
model_fp32.config.use_cache = True

lat_fp32, tps_fp32 = measure_generate(model_fp32, tok)
print(f"FP32 (CPU) | Latency {lat_fp32:.3f}s | Throughput {tps_fp32:.2f} tok/s")

# -----------------------------
# Quantized: INT8 dynamic (CPU)
# -----------------------------
print("\n== CPU INT8 (dynamic) ==")
# Quantize only Linear layers to int8 (native PyTorch). This is weight-only int8 + dynamic activation quant.
model_int8 = quantize_dynamic(
    model_fp32.cpu(),
    {torch.nn.Linear},
    dtype=torch.qint8
).eval()
# (Optional) free original object to reduce memory
del model_fp32

lat_int8, tps_int8 = measure_generate(model_int8, tok)
print(f"INT8 dyn (CPU) | Latency {lat_int8:.3f}s | Throughput {tps_int8:.2f} tok/s")

# -----------------------------
# Summary
# -----------------------------
speedup = lat_fp32 / lat_int8 if lat_int8 > 0 else float("inf")
print("\n== Summary ==")
print(f"CPU FP32 latency: {lat_fp32:.3f}s | CPU INT8 latency: {lat_int8:.3f}s | Speedup: {speedup:.2f}x")
print(f"CPU FP32 tput:   {tps_fp32:.2f} tok/s | CPU INT8 tput:   {tps_int8:.2f} tok/s")


== CPU FP32 Baseline ==
FP32 (CPU) | Latency 0.710s | Throughput 1.60 tok/s

== CPU INT8 (dynamic) ==
INT8 dyn (CPU) | Latency 0.220s | Throughput 4.57 tok/s

== Summary ==
CPU FP32 latency: 0.710s | CPU INT8 latency: 0.220s | Speedup: 3.22x
CPU FP32 tput:   1.60 tok/s | CPU INT8 tput:   4.57 tok/s


# [EX-1] Calibration set + Static PTQ (Optimum)

In [3]:
def build_calibration_texts():
    """
    TODO: Return ~15–30 short, varied sentences for calibration.
    Hints:
      - Mix styles: facts, questions, lists, numbers.
      - Keep each under ~200 tokens.
    """
    CALIB_TEXTS = [
        # "Add sentences here...",
    ]
    return CALIB_TEXTS


def make_calib_dataloader(tokenizer, texts, batch_size=4, max_length=256):
    """
    TODO: Build a torch DataLoader that yields dicts with 'input_ids' and 'attention_mask'.
    Hints:
      - Tokenize each text with truncation + max_length.
      - Use a simple pad-sequence collator (pad_token_id=tokenizer.pad_token_id).
    """
    import torch
    from torch.utils.data import Dataset, DataLoader
    from torch.nn.utils.rnn import pad_sequence

    class _CalibDS(Dataset):
        def __init__(self, texts):
            self.texts = texts
        def __len__(self): return len(self.texts)
        def __getitem__(self, idx):
            enc = tokenizer(
                texts[idx],
                truncation=True, max_length=max_length, return_tensors="pt"
            )
            return {k: v.squeeze(0) for k, v in enc.items()}

    pad_id = tokenizer.pad_token_id

    def _collate(batch):
        # TODO: pad input_ids and attention_mask to same length
        # Hints:
        #   input_ids = pad_sequence([...], batch_first=True, padding_value=pad_id)
        #   attention = pad_sequence([...], batch_first=True, padding_value=0)
        input_ids = ...
        attention = ...
        return {"input_ids": input_ids, "attention_mask": attention}

    ds = _CalibDS(texts)
    return DataLoader(ds, batch_size=batch_size, shuffle=False, collate_fn=_collate)


def run_static_ptq_optimum(model_fp32, calib_loader, out_dir="tinyllama-int8-static"):
    """
    TODO: Perform static PTQ with Hugging Face Optimum (INC).
    Steps:
      - from optimum.intel.neural_compressor import INCQuantizer, PostTrainingQuantConfig
      - qconfig = PostTrainingQuantConfig(approach="static", precision="int8")
      - quantizer = INCQuantizer.from_pretrained(model_fp32, task="text-generation")
      - quantizer.quantize(qconfig, calib_dataloader=calib_loader)
      - quantizer.save_pretrained(out_dir)
    Note:
      - This requires `pip install optimum neural-compressor`.
    """
    # from optimum.intel.neural_compressor import INCQuantizer, PostTrainingQuantConfig
    # qconfig = ...
    # quantizer = ...
    # quantizer.quantize(...)
    # quantizer.save_pretrained(out_dir)
    raise NotImplementedError("Student TODO: implement static PTQ with Optimum.")


# [EX-2] Prompt-length sensitivity sweep

In [4]:
def sweep_prompt_lengths(model_dict, tokenizer, prompt, lengths=(16, 64, 256), runs=3):
    """
    For each model, measure latency & throughput across max_new_tokens.
    Returns: {model_key: [{'L': L, 'lat': ..., 'tps': ...}, ...], ...}

    TODOs:
      1) Convert `lengths` to a sorted list of ints to ensure consistent plots.
      2) Set each model to eval mode and (if available) enable use_cache=True.
      3) Loop over lengths:
         - Call `measure_generate(model, tokenizer, prompt=prompt, max_new_tokens=L, runs=runs)`.
         - Accept either a (lat, tps) tuple OR a metrics object with .latency_s / .tokens_per_sec.
      4) Append a dict per length: {"L": L, "lat": <float>, "tps": <float>}.
      5) Return the results dict.
      6) (Optional) If your `measure_generate` exposes std/variance, include "lat_std"/"tps_std".
    """
    # 1) TODO: sanitize and sort lengths
    # lengths = ...

    # 2) Prepare results structure
    results = {name: [] for name in model_dict.keys()}

    for name, model in model_dict.items():
        # 2) TODO: model.eval() and enable caching if present
        # try:
        #     ...
        # except Exception:
        #     pass

        for L in lengths:
            # 3) TODO: run benchmark at this L
            # out = measure_generate(...)

            # 3) TODO: support both return types
            # if isinstance(out, tuple):
            #     lat, tps = out
            # else:
            #     lat = getattr(out, "latency_s", getattr(out, "lat", None))
            #     tps = getattr(out, "tokens_per_sec", getattr(out, "tps", None))
            #     # (Optional std)
            #     lat_std = getattr(out, "latency_std", None)
            #     tps_std = getattr(out, "tps_std", None)

            # 4) TODO: build row dict and append
            # row = {"L": L, "lat": lat, "tps": tps}
            # if lat_std is not None: row["lat_std"] = lat_std
            # if tps_std is not None: row["tps_std"] = tps_std
            # results[name].append(row)

    # 5) TODO: return results
    return results

def plot_sweep(results, save_prefix=None):
    """
    Plot (L vs latency) and (L vs throughput) for each model.
    Uses matplotlib (no custom styles/colors).

    TODOs:
      1) Create a figure for latency.
         - For each model, extract L (x) and lat (y).
         - If "lat_std" present, use error bars; else use line plot.
         - Label axes, set title, add legend; show the figure.
         - If save_prefix is set, save as f"{save_prefix}_latency.png".
      2) Create a separate figure for throughput.
         - For each model, extract L (x) and tps (y).
         - If "tps_std" present, use error bars; else use line plot.
         - Label axes, set title, add legend; show the figure.
         - If save_prefix is set, save as f"{save_prefix}_throughput.png".
      3) Return the two figure objects for downstream use.
    """
    import matplotlib.pyplot as plt

    # 1) TODO: Latency figure
    # fig1 = plt.figure()
    # ax1 = plt.gca()
    # for name, rows in results.items():
    #     Ls = [r["L"] for r in rows]
    #     lats = [r["lat"] for r in rows]
    #     if "lat_std" in rows[0]:
    #         errs = [r.get("lat_std", 0.0) for r in rows]
    #         ax1.errorbar(Ls, lats, yerr=errs, marker="o", label=name)
    #     else:
    #         ax1.plot(Ls, lats, marker="o", label=name)
    # ax1.set_xlabel("max_new_tokens")
    # ax1.set_ylabel("Latency (s)")
    # ax1.set_title("Latency vs Generation Length")
    # ax1.legend()
    # plt.show()
    # if save_prefix:
    #     fig1.savefig(f"{save_prefix}_latency.png", bbox_inches="tight", dpi=150)

    # 2) TODO: Throughput figure
    # fig2 = plt.figure()
    # ax2 = plt.gca()
    # for name, rows in results.items():
    #     Ls = [r["L"] for r in rows]
    #     tps = [r["tps"] for r in rows]
    #     if "tps_std" in rows[0]:
    #         errs = [r.get("tps_std", 0.0) for r in rows]
    #         ax2.errorbar(Ls, tps, yerr=errs, marker="o", label=name)
    #     else:
    #         ax2.plot(Ls, tps, marker="o", label=name)
    # ax2.set_xlabel("max_new_tokens")
    # ax2.set_ylabel("Throughput (tok/s)")
    # ax2.set_title("Throughput vs Generation Length")
    # ax2.legend()
    # plt.show()
    # if save_prefix:
    #     fig2.savefig(f"{save_prefix}_throughput.png", bbox_inches="tight", dpi=150)

    # 3) TODO: return both figs
    # return fig1, fig2

# [EX-3] KV-cache & padding-side ablation

In [5]:
def ablate_cache_and_padding(model, tokenizer, prompt, max_new_tokens=128, runs=3):
    """
    Measure under four scenarios:
      A) cache on,  padding left
      B) cache off, padding left
      C) cache on,  padding right
      D) cache off, padding right
    Returns dict with lat/tps for each scenario.
    """
    # TODO: save original cache setting and tokenizer padding_side

    scenarios = {
        "A_cache_on_left":  (True,  "left"),
        "B_cache_off_left": (False, "left"),
        "C_cache_on_right": (True,  "right"),
        "D_cache_off_right":(False, "right"),
    }
    out = {}
    for key, (use_cache, side) in scenarios.items():
        # TODO: set model.config.use_cache and tokenizer.padding_side
        # TODO: call measure_generate(...)
        # TODO: store results in out[key] = {"lat": ..., "tps": ...}
        pass

    # TODO: restore original cache + padding settings
    return out



# [EX-4] Batch size sensitivity (micro-batching)

In [6]:
def measure_batched(model, tokenizer, prompt, batch_sizes=(1, 2, 4), max_new_tokens=128, runs=3):
    """
    For B in batch_sizes, build a batch by repeating prompt B times and timing a single generate().
    Returns {B: {'lat': ..., 'tps_per_sample': ...}, ...}

    TODOs:
      1) Loop over each batch size B.
      2) Tokenize the same prompt repeated B times (padding=True) and move tensors to DEVICE.
      3) Run a short warmup generate() to stabilize performance.
      4) For each run:
         - Record start time.
         - Run model.generate() with max_new_tokens and use_cache=True.
         - Record end time.
         - Compute latency and per-sample throughput (tokens generated ÷ latency).
      5) Average latency and throughput across runs.
      6) Store results in dict: results[B] = {"lat": avg_latency, "tps_per_sample": avg_throughput}.
    """
    results = {}

    for B in batch_sizes:
        # TODO: tokenize [prompt] * B with padding=True, send to DEVICE
        # enc = ...

        # TODO: warmup with small generate
        # with torch.inference_mode():
        #     _ = model.generate(...)

        lats = []
        tputs = []
        for _ in range(runs):
            # TODO: record start time
            # t0 = ...

            # TODO: run generate inside torch.inference_mode()
            # out = ...

            # TODO: record end time
            # t1 = ...

            # TODO: compute generated length per sample
            # gen_len = ...

            # TODO: compute latency and per-sample throughput
            # lat = ...
            # tps = ...

            # lats.append(lat)
            # tputs.append(tps)

        # TODO: average latency and throughput across runs
        # results[B] = {"lat": ..., "tps_per_sample": ...}

    return results


# Measuring Disk size + peak memory accounting (Nothing to Implement)

In [7]:
def save_and_measure_size(model, tokenizer, out_dir):
    """
    Save model + tokenizer to `out_dir` and return total size in bytes.
    """
    p = Path(out_dir)
    if p.exists():
        shutil.rmtree(p)
    p.mkdir(parents=True, exist_ok=True)
    model.save_pretrained(p)
    tokenizer.save_pretrained(p)
    total_bytes = sum(f.stat().st_size for f in p.rglob("*") if f.is_file())
    return total_bytes


class PeakMemory:
    """
    Context manager to record peak CPU (and GPU if available) memory during a block.
    CPU via psutil RSS; GPU via torch.cuda.max_memory_allocated().
    """
    def __enter__(self):
        import psutil
        self._psutil = psutil
        self._proc = psutil.Process(os.getpid())
        self.cpu_peak_bytes = self._proc.memory_info().rss
        self.gpu_peak_bytes = 0
        if torch.cuda.is_available():
            torch.cuda.reset_peak_memory_stats()
        return self

    def __exit__(self, exc_type, exc, tb):
        # CPU: we sample once at the end (simple approach).
        self.cpu_peak_bytes = max(self.cpu_peak_bytes, self._proc.memory_info().rss)
        if torch.cuda.is_available():
            self.gpu_peak_bytes = torch.cuda.max_memory_allocated()

# [EX-6] Quality proxy: pseudo-perplexity

In [8]:
@torch.no_grad()
def pseudo_perplexity(model, tokenizer, text, max_len=128):
    """
    Leave-one-out style pseudo-perplexity.
    For each token position i, predict token i given all tokens < i.
    Return exp(average loss).
    
    TODOs:
      1) Tokenize the text (truncate to max_len), move tensors to DEVICE.
      2) Loop from token 1 to end:
         - Feed prefix [:i] into the model.
         - Get logits for the last position.
         - Compute cross-entropy loss against the true token at position i.
         - Accumulate loss.
      3) Average the losses and return math.exp(avg_loss).
    """
    # enc = tokenizer(...)
    # input_ids, attn = ...
    # if n < 2: return float("nan")

    loss_sum = 0.0
    steps = 0
    for i in range(1, n):
        # TODO: forward pass on prefix
        # logits = ...
        # target = ...
        # loss = torch.nn.functional.cross_entropy(...)
        # loss_sum += ...
        # steps += 1

    # TODO: compute avg_loss and return exp(avg_loss)
    return ...


def compare_ppplx(models, tokenizer, texts):
    """
    Compute pseudo-perplexity for each model over a list of texts.
    Return: {model_key: [scores...], ...}

    TODOs:
      1) Loop over models in dict.
      2) For each model, compute pseudo_perplexity on all texts.
      3) Collect results in a dict mapping model_name -> list of scores.
    """
    scores = {}
    for name, model in models.items():
        # TODO: run pseudo_perplexity on each text
        # scores[name] = [...]
        pass
    return scores


# [EX-7] Results table + short reflection

In [9]:
def build_results_table(metrics_rows):
    """
    Build a pandas DataFrame from a list of dicts.
    Expected keys per row:
      precision, size_bytes, lat_64, tps_64, lat_256, tps_256, ppplx_avg

    TODOs:
      1) Convert metrics_rows into a pandas DataFrame.
      2) If "size_bytes" is present, add a new column "size_MB" by dividing by 1024**2
         and rounding to 1 decimal place.
      3) Return the DataFrame with the following column order (only if present):
         ["precision","size_bytes","size_MB","lat_64","tps_64","lat_256","tps_256","ppplx_avg"]
    """
    # TODO: create DataFrame
    # df = pd.DataFrame(metrics_rows)

    # TODO: add size_MB if "size_bytes" exists
    # if "size_bytes" in df.columns:
    #     df["size_MB"] = ...

    # TODO: filter/reorder columns
    # cols = [...]
    # return df[[c for c in cols if c in df.columns]]
    return ...


# [EX-8 - Stretch] Edge-case prompts check

In [10]:
def probe_edge_prompts(models, tokenizer, prompts, max_new_tokens=64):
    """
    Generate for a small set of edge prompts.
    Returns: {model_key: [decoded_outputs...], ...}

    TODOs:
      1) Create an outputs dict with model names as keys and empty lists as values.
      2) For each model and each prompt:
         - Tokenize the prompt and move tensors to DEVICE.
         - Run model.generate() with max_new_tokens and use_cache=True inside torch.inference_mode().
         - Decode the generated ids into text (skip special tokens).
         - Append decoded text to the model’s outputs list.
      3) Return the outputs dict.
    """
    outputs = {k: [] for k in models.keys()}

    for name, model in models.items():
        for p in prompts:
            # TODO: tokenize
            # enc = ...

            # TODO: run generate under inference_mode
            # out_ids = ...

            # TODO: decode output ids
            # decoded = ...

            # TODO: append to outputs[name]
            # outputs[name].append(decoded)
            pass

    return outputs


# Master controller for the quantization exercise

In [11]:
def run_quantization_exercise():
    """
    Controller to run all parts of the exercise.
    Students are expected to fill in the TODOs inside each helper function.
    This function orchestrates the flow and prints / returns results.
    """
    # -------------------------------
    # 1. Build calibration set + run static PTQ
    # -------------------------------
    print("\n[1] Calibration + Static PTQ")
    calib_texts = build_calibration_texts()
    calib_loader = make_calib_dataloader(tok, calib_texts)
    # Run static PTQ and save quantized model
    run_static_ptq_optimum(model_fp32, calib_loader, out_dir="tinyllama-int8-static")
    
    # Reload quantized model (student will implement saving inside run_static_ptq_optimum)
    model_static = AutoModelForCausalLM.from_pretrained("tinyllama-int8-static").to(DEVICE).eval()
    model_static.config.use_cache = True

    models = {
        "fp32": model_fp32,
        "int8-dyn": model_int8,
        "int8-static": model_static
    }

    # -------------------------------
    # 2. Prompt-length sensitivity sweep
    # -------------------------------
    print("\n[2] Prompt-length sweep")
    sweep_results = sweep_prompt_lengths(models, tok, PROMPT, lengths=(16, 64, 256))
    plot_sweep(sweep_results)

    # -------------------------------
    # 3. KV-cache & padding ablation
    # -------------------------------
    print("\n[3] Cache & padding ablation")
    ablation = ablate_cache_and_padding(model_static, tok, PROMPT)
    print("Ablation results:", ablation)

    # -------------------------------
    # 4. Batch-size sensitivity
    # -------------------------------
    print("\n[4] Batch-size sensitivity")
    batch_res = measure_batched(model_static, tok, PROMPT)
    print("Batch results:", batch_res)

    # -------------------------------
    # 5. Model size + memory tracking
    # -------------------------------
    print("\n[5] Model size + memory")
    size_fp32 = save_and_measure_size(model_fp32, tok, "fp32-save")
    size_int8_dyn = save_and_measure_size(model_int8, tok, "int8dyn-save")
    size_int8_static = save_and_measure_size(model_static, tok, "int8static-save")

    with PeakMemory() as pm:
        measure_generate(model_static, tok, PROMPT, max_new_tokens=64)
    print("Peak memory (CPU/GPU):", pm.cpu_peak_bytes, pm.gpu_peak_bytes)

    # -------------------------------
    # 6. Quality proxy: pseudo-perplexity
    # -------------------------------
    print("\n[6] Quality proxy (pseudo-perplexity)")
    texts = ["Quantization reduces model size.", "Speed matters for deployment."]
    ppplx_scores = compare_ppplx(models, tok, texts)
    print("PPPLX:", ppplx_scores)

    # -------------------------------
    # 7. Build results table
    # -------------------------------
    print("\n[7] Summary table")
    metrics = [
        {
            "precision": "FP32",
            "size_bytes": size_fp32,
            "lat_64": None,  # TODO: fill from sweep_results
            "tps_64": None,
            "lat_256": None,
            "tps_256": None,
            "ppplx_avg": None,
        },
        # TODO: add rows for int8-dyn and int8-static
    ]
    df = build_results_table(metrics)
    print(df)

    # -------------------------------
    # 8. (Stretch) Edge prompts
    # -------------------------------
    print("\n[8] Edge-case prompts")
    edge_prompts = [
        "Supercalifragilisticexpialidocious",
        "12345678901234567890",
        "A very very very very very long name...",
    ]
    outputs = probe_edge_prompts(models, tok, edge_prompts)
    for m, outs in outputs.items():
        print(f"--- {m} ---")
        for o in outs:
            print(o)
            print("="*40)

    return {
        "sweep": sweep_results,
        "ablation": ablation,
        "batch": batch_res,
        "sizes": (size_fp32, size_int8_dyn, size_int8_static),
        "ppplx": ppplx_scores,
        "table": df,
        "edge_outputs": outputs
    }


In [12]:
run_quantization_exercise()


[1] Calibration + Static PTQ


NameError: name 'model_fp32' is not defined