In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

# ------------------------------
# 1. Setup
# ------------------------------
# Device: use CUDA if available, otherwise cpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
# Data transform: convert images (0-255) → tensors (0-1)
transform = transforms.ToTensor()

# Download + prepare datasets
train_dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root="./data", train=False, transform=transform, download=True)

# DataLoaders: provide mini-batches
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1000, shuffle=False)

# ------------------------------
# 2. Define the Model
# ------------------------------
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()                 # Flatten 28x28 → 784
        self.fc1 = nn.Linear(784, 784)              # First hidden layer
        self.relu = nn.ReLU()                       # Non-linearity
        self.fc2 = nn.Linear(784, 10)               # Output layer (10 classes)

    def forward(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)                             # No softmax here; loss expects raw logits
        return x

model = MLP().to(device)

# ------------------------------
# 3. Loss and Optimizer
# ------------------------------
criterion = nn.CrossEntropyLoss()                   # Combines LogSoftmax + NLLLoss
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# ------------------------------
# 4. Training Loop
# ------------------------------
for epoch in range(1, 4):                           # Train for 3 epochs
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()                       # Reset gradients
        output = model(data)                        # Forward pass
        loss = criterion(output, target)            # Compute loss
        loss.backward()                             # Backprop
        optimizer.step()                            # Update weights

    print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

cuda
Epoch 1, Loss: 0.0630
Epoch 2, Loss: 0.1992
Epoch 3, Loss: 0.1902


In [3]:
import io, time, statistics, torch

def eval_with_metrics(
    model,
    test_loader,
    dataset_len: int,
    device: str = "cuda",
    warmup_batches: int = 3,
    measure_batches: int | None = None,   # None = measure all (after warmup)
    sync_cuda: bool = True,               # ensure accurate CUDA timings
):
    """
    Evaluate accuracy, serialized model size, and inference speed.

    Args:
      model: torch.nn.Module (already moved to `device`)
      test_loader: DataLoader for eval (don’t shuffle)
      dataset_len: len(test_dataset)
      device: "cpu" | "cuda" | "mps"
      warmup_batches: batches to run (unmeasured) before timing
      measure_batches: if set, only time this many batches after warmup
      sync_cuda: call torch.cuda.synchronize() around timers for CUDA

    Returns:
      dict with accuracy, size_mb, latency stats, throughput
    """
    model.eval()
    
    model_type = 'Cuda FP' if device == 'cuda' else 'Quantized'

    # ---- Model size via serialization (captures packed quant weights) ----
    buf = io.BytesIO()
    # Saving the full model captures quantized packed params better than state_dict in some flows
    torch.save(model, buf)
    size_mb = len(buf.getvalue()) / (1024 * 1024)

    # ---- Accuracy + timing ----
    correct = 0
    latencies = []
    seen = 0

    # local helpers for accurate timing
    def _sync():
        if device.startswith("cuda") and sync_cuda and torch.cuda.is_available():
            torch.cuda.synchronize()

    with torch.inference_mode():
        # Warmup (optional, helps JIT/caches & fairer timing)
        w = 0
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            _ = model(data)  # forward only
            w += 1
            if w >= warmup_batches:
                break

        # Timed pass
        measured = 0
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)

            _sync()
            t0 = time.perf_counter()
            output = model(data)
            _sync()
            t1 = time.perf_counter()

            latencies.append(t1 - t0)
            pred = output.argmax(dim=1)
            correct += (pred == target).sum().item()
            seen += data.size(0)

            measured += 1
            if measure_batches is not None and measured >= measure_batches:
                break

        # If we limited measured batches, still account accuracy over only those
        # (If you want full-dataset accuracy ALWAYS, set measure_batches=None)

    # Aggregates
    mean_latency_s = float(sum(latencies) / max(1, len(latencies)))
    median_latency_s = float(statistics.median(latencies)) if latencies else float("nan")
    # Throughput based on measured samples & time
    total_time_s = float(sum(latencies))
    throughput = (seen / total_time_s) if total_time_s > 0 else float("nan")
    accuracy = correct / max(1, seen if measure_batches else dataset_len)

    return {
        "model_type": model_type,
        "accuracy": accuracy,                      # 0..1
        "size_mb": size_mb,                       # serialized model size
        "mean_latency_ms": mean_latency_s * 1e3,  # per batch
        "median_latency_ms": median_latency_s * 1e3,
        "throughput_samples_per_s": throughput,
        "measured_batches": len(latencies),
        "measured_samples": seen,
    }
def print_metrics(metrics: dict, title: str = "Evaluation Results"):
    print(f"\n=== {title} ===")
    print(f"Model:           {metrics['model_type']}")
    print(f"Accuracy:           {metrics['accuracy']*100:.2f}%")
    print(f"Model size:         {metrics['size_mb']:.2f} MB")
    print(f"Mean latency:       {metrics['mean_latency_ms']:.2f} ms/batch")
    print(f"Median latency:     {metrics['median_latency_ms']:.2f} ms/batch")
    print(f"Throughput:         {metrics['throughput_samples_per_s']:.1f} samples/s")
    print(f"Measured batches:   {metrics['measured_batches']}")
    print(f"Measured samples:   {metrics['measured_samples']}")
    print("=" * (len(title) + 10))

In [4]:
import copy
from torchao.quantization import quantize_, Int8WeightOnlyConfig
torch.backends.quantized.engine = "fbgemm"
sq_model = copy.deepcopy(model)
sq_model.eval()                      # important for inference
model_cpu = sq_model.to("cpu")       # dynamic quantization is CPU-only

quantize_(
    model_cpu,
    Int8WeightOnlyConfig(group_size=784),
)

In [6]:
sq_metrics = eval_with_metrics(sq_model, test_loader, len(test_dataset), device='cpu', warmup_batches=3, measure_batches=100)
model = model.to("cpu")
base_metrics = eval_with_metrics(model, test_loader, len(test_dataset), device="cpu", warmup_batches=3, measure_batches=200)

print_metrics(sq_metrics)
print_metrics(base_metrics)

RuntimeError: Expected all tensors to be on the same device, but got mat1 is on cuda:0, different from other tensors on cpu (when checking argument in method wrapper_CUDA_addmm)