In [15]:
# mypy: allow-untyped-defs
"""
This module implements Paged Attention on top of flex_attention.
This module is experimental and subject to change.
"""

import torch
from torch.nn.attention.flex_attention import (
    _identity,
    _mask_mod_signature,
    _score_mod_signature,
    BlockMask,
    noop_mask,
    flex_attention,
    create_block_mask,
)

__all__ = ["PagedAttention"]


def _cdiv(x: int | float | torch.Tensor, multiple: int | float | torch.Tensor):
    return (x + multiple - 1) // multiple


class PagedAttention:
    """
    PagedAttention supports flex attention inference with a large batch size.
    With PagedAttention, a batch of key/value tensors with varying kv length
    is split into tensor blocks of fixed length and cached in a compact way.
    Thus we can avoid redundant memory consumption due to varying kv length and
    support a larger batch size.
    """

    def __init__(
        self,
        n_pages: int,
        page_size: int,
        max_batch_size: int,
        device: str = "cuda",
    ) -> None:
        # number of pages
        self.n_pages = n_pages

        # number of tokens per page
        self.page_size = page_size

        # page table: [batch, logical_block_idx] -> physical_page_idx
        self.page_table = -torch.ones(
            (max_batch_size, self.n_pages), dtype=torch.int64, device=device
        )

        # capacity: batch_idx -> allocated sequence length
        self.capacity = torch.zeros(max_batch_size, dtype=torch.int64, device=device)

        # index of empty pages that is available for allocation
        self.empty_pages = list(range(n_pages - 1, -1, -1))

        # mapping from physical page index to logical page index
        self.physical_to_logical = -torch.ones(
            (max_batch_size, n_pages), dtype=torch.int64, device=device
        )

    def reserve(self, batch_idx: torch.Tensor, seq_len: torch.Tensor) -> None:
        """
        Requests the capacity of a given batch to be at least enough to
        hold `seq_len` elements.

        Args:
            batch_idx (Tensor): batch index to be reserved; shape :math:`(1)`.
            seq_len (Tensor): minimum capacity for the given batch; shape :math:`(1)`.
        """

        if seq_len <= self.capacity[batch_idx]:
            return

        num_pages_to_allocate = _cdiv(
            seq_len - self.capacity[batch_idx], self.page_size
        )

        if len(self.empty_pages) < num_pages_to_allocate:
            raise AssertionError(
                f"requested {num_pages_to_allocate.item()} pages "
                f"but there are only {len(self.empty_pages)} empty pages"
            )

        start_page_idx = self.capacity[batch_idx] // self.page_size
        end_page_idx = start_page_idx + num_pages_to_allocate

        # find empty physical pages
        allocated_pages = torch.tensor(
            self.empty_pages[-num_pages_to_allocate:],
            device=num_pages_to_allocate.device,
        )
        self.empty_pages = self.empty_pages[:-num_pages_to_allocate]

        # update page table
        self.page_table[
            batch_idx,
            start_page_idx:end_page_idx,
        ] = allocated_pages

        # update metadata
        self.physical_to_logical[batch_idx, allocated_pages] = torch.arange(
            start_page_idx.item(),
            end_page_idx.item(),
            device=num_pages_to_allocate.device,
        )
        self.capacity[batch_idx] += num_pages_to_allocate * self.page_size

    def erase(self, batch_idx: torch.Tensor) -> None:
        """
        Removes a single batch from paged attention.

        Args:
            batch_idx (Tensor): batch index to be removed; shape :math:`(1)`.
        """

        # find allocated pages
        allocated_page_idx = self.page_table[batch_idx] != -1
        allocated_pages = self.page_table[batch_idx][allocated_page_idx]

        # clean metadata
        self.capacity[batch_idx] = 0
        self.empty_pages += allocated_pages.tolist()
        self.physical_to_logical[batch_idx][:, allocated_pages] = -1
        self.page_table[batch_idx] = -1

    def assign(
        self,
        batch_idx: torch.Tensor,
        input_pos: torch.Tensor,
        k_val: torch.Tensor,
        v_val: torch.Tensor,
        k_cache: torch.Tensor,
        v_cache: torch.Tensor,
    ) -> None:
        """
        Assigns new contents `val` to the storage `cache` at the location
        `batch_idx` and `input_pos`.

        Args:
            batch_idx (Tensor): batch index; shape :math:`(B)`.
            input_pos (Tensor): input positions to be assigned for the given batch; shape :math:`(B, S)`.
            val (Tensor): value to be assigned; shape :math:`(B, H, S, D)`
            cache (Tensor): the cache to store the values; shape:`(1, H, MAX_S, D)`
        """
        if k_val.requires_grad:
            raise RuntimeError("val must not require gradient")

        B, H, S, K_D = k_val.shape
        V_D = v_val.shape[3]
        if B != batch_idx.shape[0]:
            raise RuntimeError(
                f"Expect val and batch_idx have the same batch size "
                f"but got B={B} and B={batch_idx.shape[0]}."
            )
        if H != k_cache.shape[1]:
            raise RuntimeError(
                f"Expect val and cache has the same number of heads "
                f"but got H={H} and H={k_cache.shape[1]}."
            )
        if S != input_pos.shape[1]:
            raise RuntimeError(
                f"Expect val and input_pos has the same length "
                f"but got S={S} and S={input_pos.shape[0]}."
            )
        if K_D != k_cache.shape[3]:
            raise RuntimeError(
                f"Expect k_val and k_cache has the same hidden dim "
                f"but got D={K_D} and D={k_cache.shape[3]}."
            )
        if V_D != v_cache.shape[3]:
            raise RuntimeError(
                f"Expect v_val and v_cache has the same hidden dim "
                f"but got D={V_D} and D={v_cache.shape[3]}."
            )

        # find address
        logical_block_idx = input_pos // self.page_size  # [B, S]
        logical_block_offset = input_pos % self.page_size  # [B, S]
        physical_block_idx = torch.gather(
            self.page_table[batch_idx], 1, logical_block_idx.to(torch.int64)
        ).to(torch.int32)  # [B, S]

        addr = (physical_block_idx * self.page_size + logical_block_offset).view(
            -1
        )  # [B*S]

        k_val = k_val.permute(1, 0, 2, 3).contiguous().view(1, H, B * S, K_D)
        v_val = v_val.permute(1, 0, 2, 3).contiguous().view(1, H, B * S, V_D)

        k_cache[:, :, addr, :] = k_val
        v_cache[:, :, addr, :] = v_val

    def convert_logical_block_mask(
        self,
        block_mask: BlockMask,
        batch_idx: torch.Tensor | None = None,
        kv_len: torch.Tensor | None = None,
    ) -> BlockMask:
        """
        Converts a logical block mask by mapping its logical kv indices to the corresponding
        physical kv indices.

        Args:
            block_mask (BlockMask): logical block mask;
                kv_indices shape :math:`(B, H, ROWS, MAX_BLOCKS_IN_COL)`.
            batch_idx (Tensor): batch index corresponding to the block_mask
                batch dimension. This provides flexibility to convert a
                block mask with smaller batch size than the page table;
                shape :math:`(B)`.
            kv_len (Optional[Tensor]): actual KV sequence length for upper bound check;
                shape :math:`(B,)` to handle multiple batches.
        """
        B, H, ROWS, MAX_BLOCKS_IN_COL = block_mask.kv_indices.shape

        if block_mask.BLOCK_SIZE[1] != self.page_size:
            raise RuntimeError(
                f"Expect block_mask has the same column block size as page_size"
                f"but got size={block_mask.BLOCK_SIZE[1]} and size={self.page_size}"
            )

        device = block_mask.kv_num_blocks.device

        if batch_idx is None:
            batch_idx = torch.arange(B, device=device)
        page_table = self.page_table[batch_idx]

        new_kv_num_blocks = block_mask.kv_num_blocks.clone()

        # The physical page table might be larger than the max logical blocks
        # but the block mask only cares about MAX_BLOCKS_IN_COL.
        # We assume the logical mask is sparse enough.
        new_kv_indices = torch.zeros(
            (B, H, ROWS, self.n_pages), dtype=torch.int32, device=device
        )

        # We gather the physical page indices using the logical block indices
        # provided by the input block_mask.
        new_kv_indices[:, :, :, :MAX_BLOCKS_IN_COL] = (
            torch.gather(
                page_table, 1, block_mask.kv_indices.view(B, -1).to(torch.int64)
            )
            .view(block_mask.kv_indices.shape)
            .to(torch.int32)
        )

        new_full_kv_indices, new_full_kv_num_blocks = None, None
        if block_mask.full_kv_num_blocks is not None:
            if block_mask.full_kv_indices is None:
                raise AssertionError(
                    "block_mask.full_kv_indices must not be None when full_kv_num_blocks is not None"
                )
            new_full_kv_num_blocks = block_mask.full_kv_num_blocks.clone()
            new_full_kv_indices = torch.zeros(
                (B, H, ROWS, self.n_pages), dtype=torch.int32, device=device
            )
            new_full_kv_indices[:, :, :, :MAX_BLOCKS_IN_COL] = (
                torch.gather(
                    page_table,
                    1,
                    block_mask.full_kv_indices.view(B, -1).to(torch.int64),
                )
                .view(block_mask.full_kv_indices.shape)
                .to(torch.int32)
            )

        new_mask_mod = self.get_mask_mod(block_mask.mask_mod, kv_len)

        # The K/V cache sent to flex_attention acts as one giant sequence of length n_pages * page_size.
        seq_lengths = (block_mask.seq_lengths[0], self.n_pages * self.page_size)

        return BlockMask.from_kv_blocks(
            new_kv_num_blocks,
            new_kv_indices,
            new_full_kv_num_blocks,
            new_full_kv_indices,
            block_mask.BLOCK_SIZE,
            new_mask_mod,
            seq_lengths=seq_lengths,
        )

    def get_mask_mod(
        self,
        mask_mod: _mask_mod_signature | None,
        kv_len: torch.Tensor | None = None,
    ) -> _mask_mod_signature:
        """
        Converts a mask_mod based on mapping from the physical block index to the logical
        block index.
        """
        if mask_mod is None:
            mask_mod = noop_mask

        def new_mask_mod(
            b: torch.Tensor,
            h: torch.Tensor,
            q_idx: torch.Tensor,
            physical_kv_idx: torch.Tensor,
        ):
            physical_kv_block = physical_kv_idx // self.page_size
            physical_kv_offset = physical_kv_idx % self.page_size
            logical_block_idx = self.physical_to_logical[b, physical_kv_block]
            logical_kv_idx = logical_block_idx * self.page_size + physical_kv_offset
            live_block = logical_block_idx >= 0
            within_upper_bound = (
                logical_kv_idx < kv_len[b] if kv_len is not None else True
            )
            within_lower_bound = logical_kv_idx >= 0
            is_valid = live_block & within_upper_bound & within_lower_bound

            return torch.where(is_valid, mask_mod(b, h, q_idx, logical_kv_idx), False)

        return new_mask_mod

    def get_score_mod(
        self,
        score_mod: _score_mod_signature | None,
        kv_len: torch.Tensor | None = None,
    ) -> _score_mod_signature:
        """
        Converts a score_mod based on mapping from the physical block index to the logical
        block index.
        """
        if score_mod is None:
            score_mod = _identity

        def new_score_mod(
            score: torch.Tensor,
            b: torch.Tensor,
            h: torch.Tensor,
            q_idx: torch.Tensor,
            physical_kv_idx: torch.Tensor,
        ):
            physical_kv_block = physical_kv_idx // self.page_size
            physical_kv_offset = physical_kv_idx % self.page_size
            logical_block_idx = self.physical_to_logical[b, physical_kv_block]
            logical_kv_idx = logical_block_idx * self.page_size + physical_kv_offset
            live_block = logical_block_idx >= 0
            within_upper_bound = (
                logical_kv_idx < kv_len[b] if kv_len is not None else True
            )
            within_lower_bound = logical_kv_idx >= 0
            is_valid = live_block & within_upper_bound & within_lower_bound

            return torch.where(
                is_valid,
                score_mod(score, b, h, q_idx, logical_kv_idx),
                float("-inf"),
            )

        return new_score_mod

    def forward(
        self,
        query: torch.Tensor,
        k_cache: torch.Tensor,
        v_cache: torch.Tensor,
        kv_lens: torch.Tensor,
        block_mask: BlockMask | None = None,
        batch_indices: torch.Tensor | None = None,
        score_mod: _score_mod_signature | None = None,
        scale: float | None = None,
        enable_gqa: bool = False,
    ):
        """
        Computes the paged attention.

        Args:
            query (Tensor): Query tensor; shape :math:`(B, H, Q_LEN, D)`.
            k_cache (Tensor): Physical key cache; shape :math:`(1, H, TOTAL_PAGES * PAGE_SIZE, D)`.
            v_cache (Tensor): Physical value cache; shape :math:`(1, H, TOTAL_PAGES * PAGE_SIZE, D)`.
            kv_lens (Tensor): The logical length of the KV sequence for each batch element.
                              Used to mask out padding and future tokens. Shape :math:`(B,)`.
            block_mask (BlockMask, optional): Logical block mask. If None, a standard causal mask is assumed
                                              for the logical sequence.
            batch_indices (Tensor, optional): The indices in the page table corresponding to the query batch.
                                              If None, assumes 0..B-1.
            score_mod (callable, optional): Logical score modification function.
            scale (float, optional): Attention scale factor.
            enable_gqa (bool): Enable Grouped Query Attention.
        """
        B, H, Q_LEN, D = query.shape
        device = query.device

        if batch_indices is None:
            batch_indices = torch.arange(B, device=device)

        # 1. Create a logical block mask if one isn't provided.
        # We assume a standard Causal mask over the maximum logical KV length.
        if block_mask is None:
            max_kv_len = kv_lens.max().item()

            def causal_mask(b, h, q, k):
                return q >= k

            # Create a block mask for the logical dimensions: (B, H, Q_LEN, Max_KV)
            # Note: We must ensure the BLOCK_SIZE matches the page_size.
            block_mask = create_block_mask(
                causal_mask,
                B=B,
                H=H,
                Q_LEN=Q_LEN,
                KV_LEN=max_kv_len,
                device=device,
                BLOCK_SIZE=self.page_size,
                _compile=True # Often improves performance for mask creation
            )

        # 2. Convert the logical block mask to a physical block mask.
        # This re-maps the indices in the block mask to point to the correct physical pages.
        physical_block_mask = self.convert_logical_block_mask(
            block_mask,
            batch_idx=batch_indices,
            kv_len=kv_lens
        )

        # 3. Handle Score Mods
        # Wrap the user score_mod (which expects logical indices) to handle physical indices.
        physical_score_mod = self.get_score_mod(score_mod, kv_lens)

        # 4. Call flex_attention
        # Note: k_cache and v_cache have batch size 1, but query has batch size B.
        # The block mask handles the routing (preventing batch B from seeing batch A's pages).
        output = flex_attention(
            query,
            k_cache,
            v_cache,
            block_mask=physical_block_mask,
            score_mod=physical_score_mod,
            scale=scale,
            enable_gqa=enable_gqa
        )

        return output

In [16]:
import torch
import torch.nn.functional as F
from torch.nn.attention.flex_attention import create_block_mask

# Assuming the PagedAttention class is defined in the same file or imported
# from paged_attention import PagedAttention

def test_paged_attention_correctness():
    print("=== Starting Paged Attention Test ===")

    if not torch.cuda.is_available():
        print("Skipping test: CUDA not available (FlexAttention requires CUDA).")
        return

    device = "cuda"
    torch.set_default_device(device)

    # --- 1. Configuration ---
    BATCH_SIZE = 4
    NUM_HEADS = 6
    HEAD_DIM = 64
    PAGE_SIZE = 16
    N_PAGES = 100  # Total physical pages available
    Q_LEN = 12     # Query length (e.g., chunked prefill or decoding steps)

    # Initialize Paged Attention Manager
    pa = PagedAttention(
        n_pages=N_PAGES,
        page_size=PAGE_SIZE,
        max_batch_size=BATCH_SIZE,
        device=device
    )

    # Allocate Physical Cache (Pre-allocated memory on GPU)
    # Shape: [1, H, Total_Pages * Page_Size, D]
    # Flex attention expects the cache to be treated as a single flattened sequence
    physical_k_cache = torch.zeros(1, NUM_HEADS, N_PAGES * PAGE_SIZE, HEAD_DIM, device=device)
    physical_v_cache = torch.zeros(1, NUM_HEADS, N_PAGES * PAGE_SIZE, HEAD_DIM, device=device)

    # --- 2. Generate Random Data (Ragged Batch) ---
    # We will create random KV sequences of different lengths for each batch item.
    # We will verify correctness by comparing against standard PyTorch attention.

    # Random lengths between 30 and 150 (spanning multiple pages)
    # Ensure they are > Q_LEN for this test to be interesting
    kv_lengths = torch.randint(Q_LEN + 5, 80, (BATCH_SIZE,), device=device)

    print(f"Test Configuration:")
    print(f"  Batch Size: {BATCH_SIZE}")
    print(f"  KV Lengths: {kv_lengths.tolist()}")
    print(f"  Query Len:  {Q_LEN}")
    print(f"  Page Size:  {PAGE_SIZE}")

    queries = torch.randn(BATCH_SIZE, NUM_HEADS, Q_LEN, HEAD_DIM, device=device)

    # Store ground truth K and V to run reference implementation later
    ground_truth_ks = []
    ground_truth_vs = []

    # --- 3. Populate Paged Attention ---
    print("\nPopulating Page Table and Cache...")

    for i in range(BATCH_SIZE):
        seq_len = kv_lengths[i].item()

        # Generate random K/V for this sequence
        k_data = torch.randn(1, NUM_HEADS, seq_len, HEAD_DIM, device=device)
        v_data = torch.randn(1, NUM_HEADS, seq_len, HEAD_DIM, device=device)

        ground_truth_ks.append(k_data.squeeze(0))
        ground_truth_vs.append(v_data.squeeze(0))

        # 1. Reserve pages
        batch_idx_tensor = torch.tensor([i], device=device)
        seq_len_tensor = torch.tensor([seq_len], device=device)
        pa.reserve(batch_idx_tensor, seq_len_tensor)

        # 2. Assign data
        # We assign the whole sequence at once for this test.
        # assign expects input_pos of shape [B, S]
        input_pos = torch.arange(0, seq_len, device=device).unsqueeze(0) # [1, S]

        pa.assign(
            batch_idx=batch_idx_tensor,
            input_pos=input_pos,
            k_val=k_data, # [1, H, S, D]
            v_val=v_data, # [1, H, S, D]
            k_cache=physical_k_cache,
            v_cache=physical_v_cache
        )

    # --- 4. Run Paged Attention Forward Pass ---
    print("Running Paged Attention Forward...")

    # Flex Attention scaling default is 1/sqrt(D)
    scale = 1.0 / (HEAD_DIM ** 0.5)

    # We use a standard Causal Mask implicitly handled inside the PagedAttention.forward
    # when block_mask is None.
    # Note: In a real "prefill+decoding" scenario, Q usually aligns with the END of K.
    # Here, we are simulating that Q is attending to the *entire* K history provided.

    paged_output = pa.forward(
        query=queries,
        k_cache=physical_k_cache,
        v_cache=physical_v_cache,
        kv_lens=kv_lengths,
        scale=scale
    )

    # --- 5. Run Reference Implementation (SDPA) ---
    print("Running Reference (Standard SDPA)...")

    max_diff = 0.0

    for i in range(BATCH_SIZE):
        q_i = queries[i]      # [H, Q, D]
        k_i = ground_truth_ks[i] # [H, S, D]
        v_i = ground_truth_vs[i] # [H, S, D]

        # Standard Scaled Dot Product Attention
        # We need a causal mask.
        # In this specific test setup:
        # Query (Length Q) attends to Key (Length S).
        # Since we generated S > Q, and usually Q represents the *newest* tokens,
        # we need to define the causal relationship.
        #
        # For simplicity in this test, we will assume standard broadcasting causal mask
        # is NOT applied rigidly because S != Q. We simply want to attend to all available keys
        # up to the length defined.
        # HOWEVER, PagedAttention default mask in previous code was: `q_idx >= k_idx`.
        # This implies standard causal masking.

        # Let's align reference with the Logic in PagedAttention:
        # q_idx (0..Q) attends to k_idx (0..S). Mask is True if q_idx >= k_idx.

        # Create explicit mask for SDPA
        # Shape: [Q, S]
        q_idx = torch.arange(Q_LEN, device=device).unsqueeze(1)
        k_idx = torch.arange(kv_lengths[i].item(), device=device).unsqueeze(0)
        attn_mask = (q_idx >= k_idx) # Boolean mask

        # SDPA expects float mask for add (0, -inf) or boolean (True=Keep, False=Drop)
        # But torch.nn.functional.scaled_dot_product_attention handles boolean is_causal
        # ONLY if S == Q. Since S != Q, we pass explicit mask.

        ref_out = F.scaled_dot_product_attention(
            q_i.unsqueeze(0),
            k_i.unsqueeze(0),
            v_i.unsqueeze(0),
            attn_mask=attn_mask.unsqueeze(0).unsqueeze(0),
            scale=scale
        )

        ref_out = ref_out.squeeze(0) # [H, Q, D]

        # Compare
        pa_out_i = paged_output[i]

        diff = (ref_out - pa_out_i).abs().max().item()
        max_diff = max(max_diff, diff)

        if diff > 1e-3:
            print(f"Batch {i} FAILED. Max Diff: {diff}")
        else:
            print(f"Batch {i} PASSED. Max Diff: {diff:.6f}")

    print(f"\nTest Complete. Maximum discrepancy across batch: {max_diff:.6f}")

    if max_diff < 1e-3:
        print("SUCCESS: Paged Attention matches Reference Implementation.")
    else:
        print("FAILURE: Differences detected.")

if __name__ == "__main__":
    # Ensure PagedAttention class is available here
    test_paged_attention_correctness()

=== Starting Paged Attention Test ===
Test Configuration:
  Batch Size: 4
  KV Lengths: [19, 41, 62, 19]
  Query Len:  12
  Page Size:  16

Populating Page Table and Cache...
Running Paged Attention Forward...



SOLUTION: Use torch.compile(flex_attention)(...)

If you want to debug your score_mod/mask_mod, you can set:
torch.nn.attention.flex_attention._FLEX_ATTENTION_DISABLE_COMPILE_DEBUG = True

This will allow you to use print statements or breakpoints. Note: This doesn't work with the backwards pass and may produce incorrect results.
  _warn_once(


Running Reference (Standard SDPA)...
Batch 0 PASSED. Max Diff: 0.000001
Batch 1 PASSED. Max Diff: 0.000001
Batch 2 PASSED. Max Diff: 0.000001
Batch 3 PASSED. Max Diff: 0.000001

Test Complete. Maximum discrepancy across batch: 0.000001
SUCCESS: Paged Attention matches Reference Implementation.
