<a href="https://colab.research.google.com/github/shivamsri07/vectors_and_llms/blob/main/paged_attention_simple.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import random

# --- Component 1: The Parking Garage Attendant ---
class BlockManager:
    """Manages the pool of physical memory blocks."""
    def __init__(self, num_blocks: int):
        print(f"[Manager] Creating a memory pool with {num_blocks} blocks.")
        # Creates a list of all available block numbers, e.g., [0, 1, 2, ..., 99]
        self.free_blocks = list(range(num_blocks))
        # Shuffling helps visualize that the allocated blocks are not contiguous
        random.shuffle(self.free_blocks)

    def allocate_block(self) -> int:
        """Allocate one free block. Returns its number."""
        if not self.free_blocks:
            raise MemoryError("Out of memory! No free blocks available.")
        block_number = self.free_blocks.pop()
        # print(f"[Manager] Allocating block: {block_number}")
        return block_number

    def free_sequence_blocks(self, block_table: list[int]):
        """Frees all blocks associated with a finished sequence."""
        print(f"\n[Manager] Freeing blocks: {block_table}")
        self.free_blocks.extend(block_table)
        # Optional: sort to make the free list look tidy
        self.free_blocks.sort()

    def get_free_block_count(self) -> int:
        return len(self.free_blocks)

# --- Component 2: The Car with its Parking Ticket Map ---
class Sequence:
    """Represents a single user request and its KV cache block mapping."""
    def __init__(self, sequence_id: int, initial_tokens: list[str]):
        self.sequence_id = sequence_id
        self.tokens = list(initial_tokens)
        # This is the "page table" or "block table" for this sequence!
        self.block_table = []
        print(f"[Sequence {self.sequence_id}] Created with {len(self.tokens)} initial tokens.")

    def append_token_and_block(self, new_token: str, block_number: int):
        """Adds a new token and the block that will store its KV cache."""
        self.tokens.append(new_token)
        self.block_table.append(block_number)

    def __repr__(self) -> str:
        return (f"Sequence {self.sequence_id} (len: {len(self.tokens)}): "
                f"Block Table -> {self.block_table}")


# --- Component 3: The Main Simulation ---
def run_simulation():
    print("--- Starting PagedAttention Logic Simulation ---")

    # 1. Initialize the manager with a small memory pool for our demo
    manager = BlockManager(num_blocks=20) # Our GPU has memory for 20 blocks

    # 2. Two new requests arrive from users
    seq1 = Sequence(sequence_id=1, initial_tokens=["The", "quick", "brown", "fox"])
    seq2 = Sequence(sequence_id=2, initial_tokens=["Hello", "World"])

    sequences = [seq1, seq2]

    # 3. Allocate initial blocks for these sequences
    print("\n--- Initial Allocation ---")
    for seq in sequences:
        for _ in seq.tokens:
            block = manager.allocate_block()
            seq.block_table.append(block) # Manually adding for initial state
        print(seq)
    print(f"Manager status: {manager.get_free_block_count()} blocks free.")

    # 4. Simulate autoregressive generation (generating new tokens)
    print("\n--- Generation Step 1 ---")
    # seq1 generates a new token "jumps"
    new_block_for_seq1 = manager.allocate_block()
    seq1.append_token_and_block("jumps", new_block_for_seq1)

    # seq2 generates a new token "!"
    new_block_for_seq2 = manager.allocate_block()
    seq2.append_token_and_block("!", new_block_for_seq2)

    print(seq1)
    print(seq2)
    print(f"Manager status: {manager.get_free_block_count()} blocks free.")

    print("\n--- Generation Step 2 ---")
    # seq2 generates another token "!!"
    new_block_for_seq2_step2 = manager.allocate_block()
    seq2.append_token_and_block("!!", new_block_for_seq2_step2)

    print(seq1)
    print(seq2)
    print(f"Manager status: {manager.get_free_block_count()} blocks free.")

    # 5. Sequence 1 finishes and its memory is reclaimed
    manager.free_sequence_blocks(seq1.block_table)
    print(f"\nSequence 1 finished. Manager status: {manager.get_free_block_count()} blocks free.")

    # 6. A new, very long sequence arrives. Can it fit?
    print("\n--- New Large Request Arrives ---")
    seq3 = Sequence(sequence_id=3, initial_tokens=["A"]*10)
    try:
        for _ in seq3.tokens:
            block = manager.allocate_block()
            seq3.block_table.append(block)
        print("Successfully allocated for new large sequence!")
        print(seq3)
    except MemoryError as e:
        print(f"Failed to allocate for new sequence: {e}")

    print(f"Final Manager status: {manager.get_free_block_count()} blocks free.")


if __name__ == "__main__":
    run_simulation()

--- Starting PagedAttention Logic Simulation ---
[Manager] Creating a memory pool with 20 blocks.
[Sequence 1] Created with 4 initial tokens.
[Sequence 2] Created with 2 initial tokens.

--- Initial Allocation ---
Sequence 1 (len: 4): Block Table -> [19, 17, 1, 0]
Sequence 2 (len: 2): Block Table -> [13, 14]
Manager status: 14 blocks free.

--- Generation Step 1 ---
Sequence 1 (len: 5): Block Table -> [19, 17, 1, 0, 2]
Sequence 2 (len: 3): Block Table -> [13, 14, 7]
Manager status: 12 blocks free.

--- Generation Step 2 ---
Sequence 1 (len: 5): Block Table -> [19, 17, 1, 0, 2]
Sequence 2 (len: 4): Block Table -> [13, 14, 7, 3]
Manager status: 11 blocks free.

[Manager] Freeing blocks: [19, 17, 1, 0, 2]

Sequence 1 finished. Manager status: 16 blocks free.

--- New Large Request Arrives ---
[Sequence 3] Created with 10 initial tokens.
Successfully allocated for new large sequence!
Sequence 3 (len: 10): Block Table -> [19, 18, 17, 16, 15, 12, 11, 10, 9, 8]
Final Manager status: 6 blocks 