## Triton 기초

In [1]:
import torch
import triton
from torch import Tensor
import triton.language as tl
import jaxtyping
from jaxtyping import Float32, Int32

### Load and Store

#### 1D Load & Store

In [32]:
import torch, triton
import triton.language as tl

@triton.jit
def add_one_to_five_elements(x_ptr):
    # triton에서는 CUDA와 달리 thread index를 직접 다루지 않음
    # 대신 arange를 이용해 벡터 연산으로 표현하면 이것이 thread에 매핑됨
    offset = tl.arange(0, 8) # arange는 반드시 2의 거듭제곱 형태 tensor만 가능.
    x = tl.load(x_ptr + offset, mask=offset < 5, other=0) # other는 mask에 해당하지 않는 경우 사용되는 default값
    tl.store(x_ptr + offset, x+1, mask=offset < 5)

# 텐서 생성
x = torch.ones(8, device='cuda', dtype=torch.float32)

# 커널 실행
add_one_to_five_elements[(1, 1, 1)](x) # grid가 (1, 1, 1) 
print(x)

tensor([2., 2., 2., 2., 2., 1., 1., 1.], device='cuda:0')


#### 2D Load & Store

In [None]:
@triton.jit
def add_one_to_5x3_elements(x_ptr):
    # 2차원 index는 이렇게 생성
    offset_x = tl.arange(0, 8)[:, None]
    offset_y = tl.arange(0, 8)[None, :]
    offset = offset_x * 8 + offset_y

    x = tl.load(
        x_ptr + offset, 
        mask=(offset_x < 5) & (offset_y < 3))
    tl.store(
        x_ptr + offset, 
        x+1, 
        mask=(offset_x < 5) & (offset_y < 3))

# 2D 텐서 생성
x = torch.ones((8, 8), device='cuda', dtype=torch.float32)

# 커널 실행
add_one_to_5x3_elements[(1, 1, 1)](x) # grid가 (1, 1, 1)
print(x)

tensor([[2., 2., 2., 1., 1., 1., 1., 1.],
        [2., 2., 2., 1., 1., 1., 1., 1.],
        [2., 2., 2., 1., 1., 1., 1., 1.],
        [2., 2., 2., 1., 1., 1., 1., 1.],
        [2., 2., 2., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1., 1., 1.]], device='cuda:0')


### 여러 개의 Block 사용하기
큰 tensor를 다룰 때에는 여러 개의 block을 사용해야 함.
kernel 호출 시 grid size를 명시해주고, grid 내의 위치를 `tl.program_id`로 가져오면 됨 

In [37]:
@triton.jit
def long_vector_add(x_ptr, y_ptr, z_ptr, N: tl.constexpr, BLOCK_SIZE: tl.constexpr):
    pid = tl.program_id(0)  # CUDA의 BlockIdx와 유사
    offset = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
    mask = offset < N
    x = tl.load(x_ptr + offset, mask=mask)
    y = tl.load(y_ptr + offset, mask=mask)
    z = x + y
    tl.store(z_ptr + offset, z, mask=mask)  

N = 2048  
a = torch.randn(N, device='cuda', dtype=torch.float32)
b = torch.randn(N, device='cuda', dtype=torch.float32)
c = torch.zeros(N, device='cuda', dtype=torch.float32)
# 커널 실행
BLOCK_SIZE = 128
long_vector_add[((N + BLOCK_SIZE - 1) // BLOCK_SIZE, 1, 1)](a, b, c, N, BLOCK_SIZE) # grid가 (1, 1, 1) 
# print(c)

if (c.cpu() - (a + b).cpu()).abs().max() < 1e-7:
    print("Correct!")
else:
    print("Incorrect!")

Correct!


## Matrix Multiplication

In [37]:
@triton.jit
def matmul_kernel(
    a_ptr, b_ptr, c_ptr, 
    M: tl.constexpr, N: tl.constexpr, K: tl.constexpr,
    BLOCKSIZE_M: tl.constexpr, BLOCKSIZE_N: tl.constexpr, BLOCKSIZE_K: tl.constexpr):
    
    pid_m, pid_n = tl.program_id(0), tl.program_id(1)
    # A[pid_m * BLOCK_SIZE_M: (pid_m + 1) * BLOCK_SIZE_M, :]와 B [:, pid_n * BLOCK_SIZE_N: (pid_n + 1) * BLOCK_SIZE_N]을 곱하기
    
    offset_m = pid_m * BLOCKSIZE_M + tl.arange(0, BLOCKSIZE_M)
    offset_n = pid_n * BLOCKSIZE_N + tl.arange(0, BLOCKSIZE_N)
     
    acc = tl.zeros((BLOCKSIZE_M, BLOCKSIZE_N), dtype=tl.float32)

    for k in range(0, K, BLOCKSIZE_K):
        offset_k = k + tl.arange(0, BLOCKSIZE_K)

        a = tl.load(
            a_ptr + offset_m[:, None] * K + offset_k[None, :], 
            mask=(offset_m[:, None] < M) & (offset_k[None, :] < K),
            other=0.0)
        b = tl.load(
            b_ptr + offset_k[:, None] * N + offset_n[None, :],
            mask=(offset_k[:, None] < K) & (offset_n[None, :] < N),
            other=0.0)

        acc += tl.dot(a, b, allow_tf32=False)   # TF32를 허용하면 정밀도가 떨어짐

    tl.store(
        c_ptr + offset_m[:, None] * N + offset_n[None, :],
        acc,
        mask=(offset_m[:, None] < M) & (offset_n[None, :] < N),
    )



BLOCKSIZE_M = 64
BLOCKSIZE_N = 64
BLOCKSIZE_K = 64 

sizes = [(120, 32, 30), (1000, 711, 400), (30, 63, 50)]

for m, k, n in sizes:
    A = torch.randn(m, k).cuda()
    B = torch.randn(k, n).cuda()
    C = torch.zeros(m, n).cuda()

    # kernel call
    grid = ((m + BLOCKSIZE_M - 1) // BLOCKSIZE_M, (n + BLOCKSIZE_N - 1) // BLOCKSIZE_N, 1)
    matmul_kernel[grid](A, B, C, m, n, k, BLOCKSIZE_M, BLOCKSIZE_N, BLOCKSIZE_K)
    
    error = (A@B - C).abs().mean()
    if error < 1e-5:
        print("Correct!")
    else:
        print(f"Incorrect! error: {error}")

Correct!
Correct!
Correct!


### FlashAttention