# Seminar 1. 

## Part 1. Pytorch performance, benchmarking


At first, let's connect to our env and import all libraries, as well as check CUDA availability:

In [None]:
import torch
import torch.nn.functional as F
from torch import FloatTensor


torch.use_deterministic_algorithms(False)

print("PyTorch:", torch.__version__)
print("CUDA available:", is_cuda:=torch.cuda.is_available())
DEVICE = torch.device("cuda:4" if torch.cuda.is_available() else "cpu")

print(f"Device is {DEVICE}")

if is_cuda:
    device_properties = torch.cuda.get_device_properties(0)  # device index
    print(f"Device name: {device_properties.name}")
    print(f"Device total memory: {device_properties.total_memory / 2 ** 20} MB")

### Microbenchmarking

To microbenchmark a single function or small portion of the code, you can use `torch.utils.benchmark`. Let's try it out with two examples:
- Batched matrix multiply
- Multihead attention

#### Batched Matmul example

In [None]:
def batched_matmul_at_home(a: FloatTensor, b: FloatTensor) -> FloatTensor:
    """Batched matmul by hands

    Args:
        a (FloatTensor) [B, M, N] tensor
        b (FloatTensor): [B, N, K] tensor

    Returns:
        FloatTensor: [B, M, K] tensor - for each element in a batch, computes corresponding
        matrix multiplication of MxN and NxK matrices
    """

    # expand dims so that broadcasted multiply works:
    # a.unsqueeze(-1): [B, M, N, 1]
    # b.unsqueeze(-3): [B, 1, N, K]

    # after broadcasting: [B, M, N, K]
    prod = a.unsqueeze(-1) * b.unsqueeze(-3)  # a and b are viewed as 4D tensors. We get product across N dimension.

    # all we need is to reduce N dimension: sum over it to get [B, M, K]
    out = prod.sum(dim=-2)

    return out

def batched_matmul_torch(a: FloatTensor, b: FloatTensor) -> FloatTensor:
    """Batched matmul using torch.bmm

    Args:
        a (FloatTensor) [B, M, N] tensor
        b (FloatTensor): [B, N, K] tensor

    Returns:
        FloatTensor: [B, M, K] tensor - computes each matmul in a batch
    """
    return torch.bmm(a, b)


a = torch.randn(6, 128, 256).to(DEVICE)
b = torch.randn(6, 256, 512).to(DEVICE)


# need to tweak rtol and atol due to numeric precision
try:
    torch.testing.assert_close(batched_matmul_at_home(a, b), batched_matmul_torch(a, b))
    print("Default test is OK!")
except AssertionError as e:
    print(f"Caught Assertion error with standard assertion thresholds! Error: {e}\n\n*******\n")
finally:
    print("Relaxing testing requirements:...")
    torch.testing.assert_close(batched_matmul_at_home(a, b), batched_matmul_torch(a, b), rtol=1e-4, atol=1e-4)
    print("Relaxed test is OK!")


So, what's the problem? The problem lies in __different kernels execution__ - floating-point arithmetic can produce varying results (up to small errors) even if the implementation is correct. For instance, in our case:

- `torch.bmm` is implemented in optimized CUBLAS kernels that accumulate in float32 consistently.
- Our `batched_matmul_at_home` uses `out = prod.sum(dim=-2)` which accumulates in the same dtype as inputs. With large sums (e.g. 256 terms per dot product), rounding errors can drift by 1e-5–1e-4.



**NOTE**: numerical precision of floating point operations can very from the impkementations, environments and libraries, and thus can be introduced by the user's code. It also happens due to the different summation order and nondeterministic operations on the hardware side (`torch.use_deterministic_algorithms(True)` won't help sometimes).

**NOTE #2**: the relative error is small enough, which frees you from worrying too much about it (unless you work on super-precie calculation for whatever reason).

Now, let's test their performance using `benchmark.Timer`:

In [None]:
import torch.utils.benchmark as benchmark


?benchmark.Timer

In [None]:
globals_dict = dict(a=a, b=b)

t0 = benchmark.Timer(
    stmt="batched_matmul_at_home(a, b)",
    setup="from __main__ import batched_matmul_at_home",
    globals=globals_dict,
)

t1 = benchmark.Timer(
    stmt="batched_matmul_torch(a, b)",
    setup="from __main__ import batched_matmul_torch",
    globals=globals_dict,
)

print(t0_res:=t0.timeit(100))
print(t1_res:=t1.timeit(100))


t0_sec,  = t0_res.times
t1_sec,  = t1_res.times

print(f"\n\n*******\n\n===> torch is faster then vanilla implementation by ~{t0_sec/t1_sec:.3f}x")

#### One more example on numeric precision (inspired by the example from Efficiend-Dl systems [seminar](https://github.com/mryab/efficient-dl-systems/blob/2024/week01_intro/seminar.ipynb)):

In [None]:
%%writefile nondeterminism_example.py

# run it with CUBLAS_WORKSPACE_CONFIG=:4096:8 python nondeterminism_example.py to ensure CUBLAS determinism

import torch
import numpy as np



# make matmul comparable (disable TF32 fast path)
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
# determinism guard (PyTorch side). For matmul on CUDA you ALSO need the env var above.
torch.use_deterministic_algorithms(True, warn_only=False)


torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

x = torch.randn(2500, 2500)

def matrix_polynomial(x):
    y = x + 0.5 * x @ x + 3.0 * x @ x @ x + x @ x @ x @ x
    return (y).sum().item()

print("Torch CPU:", matrix_polynomial(x), sep='\t')
print("Torch GPU:", matrix_polynomial(x.cuda()), sep='\t')

print("Numpy: ", matrix_polynomial(x.numpy()), sep='\t')
print("Numpy built-in: ", (x.numpy() +
                           0.5 * np.linalg.matrix_power(x.numpy(), 2) +
                           3.0 * np.linalg.matrix_power(x.numpy(), 3) +
                           np.linalg.matrix_power(x.numpy(), 4)
                           ).sum(), sep='\t')

In [None]:
!CUBLAS_WORKSPACE_CONFIG=:4096:8 python nondeterminism_example.py

#### Making your own benchmark tool

In python, you can implement your own microbenchmarking, e.g., by using decorators. We need to explicitly enforce GPU-CPU synchronization by placing `torch.cuda.synchronize` -- the interpreter will wait all launched CUDA processes at that location. 

In [None]:
from tqdm import trange
from statistics import mean, stdev

    
def microbench_this_func(iterations: int = 100):
    is_benchmarked = False

    def outer_wrapper(func):
        
        def wrapper(*args, **kwargs):
            nonlocal is_benchmarked
            name = func.__name__
            if not is_benchmarked:
                measurements = []
                for _ in trange(iterations, desc=f"Benchmarking {name}"):
                    start = torch.cuda.Event(enable_timing=True)
                    end = torch.cuda.Event(enable_timing=True)

                    start.record()
                    func(*args, **kwargs)
                    end.record()
                    torch.cuda.synchronize()  # wait the processes
    
                    measurements.append(start.elapsed_time(end) / 1000)

                is_benchmarked = True  # mark the function and not benchmark later

                # summarize the report:
                mean_time = mean(measurements)
                std_time = stdev(measurements)
                print(f'Function {func} took {mean_time:.5f}+-{std_time:.5f}s')

            res = func(*args, **kwargs)
            return res
        return wrapper    
    return outer_wrapper

In [None]:
@microbench_this_func(100)
def batched_matmul_torch(a: FloatTensor, b: FloatTensor) -> FloatTensor:
    """Batched matmul using torch.bmm

    Args:
        a (FloatTensor) [B, M, N] tensor
        b (FloatTensor): [B, N, K] tensor

    Returns:
        FloatTensor: [B, M, K] tensor - computes each matmul in a batch
    """
    return torch.bmm(a, b)


In [None]:
x = batched_matmul_torch(a, b)

#### Multihead Attention example

In [None]:
def mha_looped(q: FloatTensor, k: FloatTensor, v: FloatTensor, n_heads: int):
    """
    q, k, v --> [B, N, D] tensors
    n_heads - number of heads in attention layer

    outputs: Attention output
    computation for multiple attention heads is performed in for-loop

    """
    B, N, D = q.shape
    assert D % n_heads == 0, "D must be divisible by n_heads"
    d_head = D // n_heads
    scale = 1.0 / d_head ** 0.5

    # Split last dim into heads: list of [B, N, d_head]
    qs = q.split(d_head, -1)  # method for splitting the tensor across the ast dimension with specified slice size (`d_head`)
    ks = k.split(d_head, -1)
    vs = v.split(d_head, -1)

    head_outputs = []
    for qi, ki, vi in zip(qs, ks, vs):
        # [B, N, d_head] @ [B, d_head, N] -> [B, N, N]
        scores = torch.matmul(qi, ki.transpose(-2, -1)) * scale
        attn = scores.softmax(-1)
        # [B, N, N] @ [B, N, d_head] -> [B, N, d_head]
        out_i = torch.matmul(attn, vi)
        head_outputs.append(out_i)

    # Concatenate heads back: [B, N, D]
    out = torch.cat(head_outputs, dim=-1)
    return out



def mha_batched(q: FloatTensor, k: FloatTensor, v: FloatTensor, n_heads: int):
    """
    q, k, v --> [B, N, D] tensors
    n_heads - number of heads in attention layer

    outputs: Attention output
    computation for multiple attention heads is performed more efficiently iwith matched matmul

    """
    B, N, D = q.shape
    assert D % n_heads == 0, "D must be divisible by n_heads"
    d_head = D // n_heads
    scale = 1.0 / d_head ** 0.5

    # Reshape to [B, n_heads, N, d_head]
    qh = q.view(B, N, n_heads, d_head).transpose(1, 2)  # [B, H, N, d_head]
    kh = k.view(B, N, n_heads, d_head).transpose(1, 2)  # [B, H, N, d_head]
    vh = v.view(B, N, n_heads, d_head).transpose(1, 2)  # [B, H, N, d_head]

    # Attention scores: [B, H, N, N]
    scores = torch.matmul(qh, kh.transpose(-2, -1)) * scale
    attn = scores.softmax(dim=-1)

    # Weighted sum: [B, H, N, d_head]
    out_heads = torch.matmul(attn, vh)

    # Merge heads back to [B, N, D]
    out = out_heads.transpose(1, 2).contiguous().view(B, N, D)
    return out



def mha_sdpa_reference(q, k, v, n_heads, *, is_causal=False):
    """
    Reference using torch.nn.functional.scaled_dot_product_attention
    Shapes expected by SDPA: (*, L, E), (*, S, E).
    We'll pass (B,H,N,d) so it returns (B,H,N,d).
    """
    B, N, D = q.shape
    d_head = D // n_heads
    # [B,H,N,d]
    qh = q.view(B, N, n_heads, d_head).transpose(1, 2)
    kh = k.view(B, N, n_heads, d_head).transpose(1, 2)
    vh = v.view(B, N, n_heads, d_head).transpose(1, 2)
    # Let SDPA choose the scale (it uses 1/sqrt(d_head) if scale=None)
    oh = F.scaled_dot_product_attention(qh, kh, vh,
                                        attn_mask=None,
                                        dropout_p=0.0,
                                        is_causal=is_causal,
                                        scale=None)           # [B,H,N,d]
    return oh.transpose(1, 2).contiguous().view(B, N, D)      # [B,N,D]


In [None]:
q = torch.randn((16, 64, 256)).to(DEVICE)
k = torch.randn((16, 64, 256)).to(DEVICE)
v = torch.randn((16, 64, 256)).to(DEVICE)

n_heads = 8

torch.testing.assert_close(mha_looped(q, k, v, n_heads), mha_batched(q, k, v, n_heads))
torch.testing.assert_close(mha_looped(q, k, v, n_heads), mha_sdpa_reference(q, k, v, n_heads))
torch.testing.assert_close(mha_batched(q, k, v, n_heads), mha_sdpa_reference(q, k, v, n_heads))


In [None]:
globals_dict_mha = dict(q=q, k=k, v=v, n_heads=n_heads)

t0 = benchmark.Timer(
    stmt="mha_looped(q, k, v, n_heads)",
    setup="from __main__ import mha_looped",
    globals=globals_dict_mha,
)

t1 = benchmark.Timer(
    stmt="mha_batched(q, k, v, n_heads)",
    setup="from __main__ import mha_batched",
    globals=globals_dict_mha,
)

t2 = benchmark.Timer(
    stmt="mha_sdpa_reference(q, k, v, n_heads)",
    setup="from __main__ import mha_sdpa_reference",
    globals=globals_dict_mha,
)

print(t0.timeit(100))
print(t1.timeit(100))
print(t2.timeit(100))



### Profiling code

Here, we will see how we can profile the whole execution of the program. It covers mroe general task compared to microbenchmarking - you can identify bottlnecks of your program and verify your solutions (whether they work or not)



We will use `bench_attention.py` as an example of DL pipeline we want to optimize

### `torch.compile`

TLDR: it detects the overhead introduced by the model, interferes with the computational graph and rewrites the parts of it. Let's try it out!


Further reading:
- [Ways to use `torch.compile`](https://blog.ezyang.com/2024/11/ways-to-use-torch-compile/)
- [`torch.compile`, the missing manual](https://docs.google.com/document/d/1y5CRfMLdwEoF1nTk9q8qEu1mgMUuUtvhklPKJ2emLU8/edit?tab=t.0#heading=h.ivdr7fmrbeab)

Simple example - fuse elementwise operations:

$ 0.5 * x * (1 + \text{Tanh}(\sqrt{2 / \pi} * (x + 0.044715 * x^3)))$

In [None]:
from math import pi, sqrt

@microbench_this_func()
def foo(x, y):
    a = torch.sin(x) + (x + y)
    b = torch.cos(y) + (y + x)
    c = a + b
    gelu = 0.5 * x * (1 + torch.tanh(sqrt(2 / pi) * (c + 0.044715 * c @ c @ c)))
    gelu2 = 0.5 * x * (1 + torch.tanh(sqrt(2 / torch.acos(torch.tensor(-1))) * (c + 0.044715 * c @ c @ c)))
    return gelu + gelu2


@microbench_this_func()
@torch.compile
def foo_jit_compiled(x, y):
    return foo(x, y)

a = torch.randn(1000, 1000).cuda()
b = torch.randn(1000, 1000).cuda()
x = foo(a, b)
y = foo_jit_compiled(a, b)


torch.testing.assert_close(x, y, atol=5e-4, rtol=5e-4)

In [None]:
@microbench_this_func()
def compiled(x, y):
    return foo_jit_compiled(x, y)

_ = compiled(x, y)

When we fuse it, we eradicate data transfer needed in the eager pytorch mode --> more FLOPs --> faster execution

Let's try with toy DL example (adapted from original tutorial):

In [None]:
from torchvision.models import densenet121

def simple_timer(fn):
    start = torch.cuda.Event(enable_timing=True)
    end = torch.cuda.Event(enable_timing=True)
    start.record()
    result = fn()
    end.record()
    torch.cuda.synchronize()
    return result, start.elapsed_time(end) / 1000



def generate_data(b):
    return (
        torch.randn(b, 3, 128, 128).to(torch.float32).cuda(),
        torch.randint(1000, (b,)).cuda(),
    )

N_ITERS = 10

def init_model():
    return densenet121().to(torch.float32).cuda()


model = init_model()

# Reset since we are using a different mode.
import torch._dynamo
torch._dynamo.reset()

model_opt = torch.compile(model, mode="reduce-overhead")

inp = generate_data(16)[0]
with torch.no_grad():
    print(f"Eager execution took {simple_timer(lambda: model(inp))[1]:.3f} sec")
    print(f"Compilation + execution took {simple_timer(lambda: model_opt(inp))[1]:.3f} sec")


Compiled version takes longer to execute due to the compilation of the optimized kernels. If we run the optimized model several times, the result will be faster:

In [None]:
eager_times = []
for i in range(N_ITERS):
    inp = generate_data(16)[0]
    with torch.no_grad():
        _, eager_time = simple_timer(lambda: model(inp))
    eager_times.append(eager_time)
    print(f"eager eval time {i}: {eager_time:.4f} sec")

print("~" * 10)

compile_times = []
for i in range(N_ITERS):
    inp = generate_data(16)[0]
    with torch.no_grad():
        _, compile_time = simple_timer(lambda: model_opt(inp))
    compile_times.append(compile_time)
    print(f"compile eval time {i}: {compile_time:.4f} sec")
print("~" * 10)

import numpy as np
eager_med = np.median(eager_times)
compile_med = np.median(compile_times)
speedup = eager_med / compile_med
assert(speedup > 1)
print(f"(eval) eager median: {eager_med:.4f} sec, compile median: {compile_med:.4f} sec, speedup: {speedup:.3f}x")
print("~" * 10)


In [None]:
model = init_model()
opt = torch.optim.Adam(model.parameters())

def train(mod, data):
    opt.zero_grad(True)
    pred = mod(data[0])
    loss = torch.nn.CrossEntropyLoss()(pred, data[1])
    loss.backward()
    opt.step()

eager_times = []
for i in range(N_ITERS):
    inp = generate_data(16)
    _, eager_time = simple_timer(lambda: train(model, inp))
    eager_times.append(eager_time)
    print(f"eager train time {i}: {eager_time}")
print("~" * 10)

model = init_model()
opt = torch.optim.Adam(model.parameters())
train_opt = torch.compile(train, mode="reduce-overhead")

compile_times = []
for i in range(N_ITERS):
    inp = generate_data(16)
    _, compile_time = simple_timer(lambda: train_opt(model, inp))
    compile_times.append(compile_time)
    print(f"compile train time {i}: {compile_time}")
print("~" * 10)

eager_med = np.median(eager_times)
compile_med = np.median(compile_times)
speedup = eager_med / compile_med
assert(speedup > 1)
print(f"(train) eager median: {eager_med:.4f} sec, compile median: {compile_med:.4f} sec, speedup: {speedup:.3f}x")
print("~" * 10)

# Free practise :)

_A seminarist might ask it :)_

- Profile execution of batched matmul implementations and multihead attention implementations with `torch.profiler.profile`:
  - Obtain traces, analyze them
  - Try to find the difference in traces and verify the claim we made regarding the kernels
- Improve `microbench_this_func` decorator:
  - It should handle arguments too - if the function is called with different shapes, the benchmark should be done too
  - The microbench should store the measurements rather than just printing (can be useful for automatic reports)