# 22-04-2025: Building Models with Attentions 

Building hybrid models with Attention block from scratch. Mainly to test my ideas and its usage. I like Attention blocks :)

Content
- [Gat](#graph-attention-networks-gat-manual-compute)
- [Attentions as Decision Model - Experimental](#attentions-as-decision-model)
- [Attentions as Decision Model in Grid Search Problems](2204_.ipynb#simulation-grid-example)
- [Application with LLM](#application-with-llm)
- [Attention Block for Ordinal Predictions](#attention-as-a-ranker)
- [Draft Setup for comparing models](#comparing-models-example-setup) 


# Graph Attention Networks (GAT) Manual compute

In [None]:
import torch
import torch.nn as nn


class AdjacencyMatrix:
    """Undirected Adjacency Matrix representing the edges."""

    def __init__(self, edge_list):
        assert edge_list.dim() == 2 and edge_list.size(0) == edge_list.size(1)

        self.A = AdjacencyMatrix.adjacency_to_edge_index(edge_list)
        self.A_init = edge_list

    def __len__(self):
        return self.A.size(0)

    def __getitem__(self, node_index: int, node_neighbor: int = None):
        assert -len(self) < node_index < len(self), (
            f"Inserted {node_index} out of scope."
        )

        if node_neighbor:
            assert -len(self) < node_neighbor < len(self), (
                f"Inserted {node_neighbor} out of scope."
            )
            return self.A[node_index][node_neighbor]

        return self.A[node_index]

    @staticmethod
    def adjacency_to_edge_index(adj: torch.Tensor) -> torch.Tensor:
        # Get indices where there are edges (nonzero entries)
        src, tgt = torch.nonzero(adj, as_tuple=True)
        edge_index = torch.stack([src, tgt], dim=0)
        return edge_index


class GAT(nn.Module):
    """Input dim = Feature vector dimension that exists at each node."""

    def __init__(
        self, num_node: int, input_dim: int, output_dim: int, edge_list: torch.Tensor
    ):
        super(GAT, self).__init__()
        self.num_node = num_node  # number of neighbourhoods
        self.feature_dim = input_dim  # feature_dim
        self.output_dim = (
            output_dim  # output_dim e.g. num of labels for classification tasks
        )
        self.edges = AdjacencyMatrix(edge_list)

        # alternately for the following 2 lines: nn.Linear(self.feature_dim, self.output_dim, bias=False)
        self.W = nn.Parameter(
            torch.randn(self.output_dim, self.feature_dim, requires_grad=False)
        )
        # self.bias = nn.Parameter(torch.ones(self.m))

        self.fn = nn.Linear(self.num_node, self.feature_dim, bias=False)
        self.attn = nn.Parameter(
            torch.empty(size=(2 * self.output_dim, 1)), requires_grad=False
        )
        nn.init.xavier_uniform_(self.attn, gain=1.414)
        self.active = nn.LeakyReLU(negative_slope=0.01)
        self.dropout = nn.Dropout(0.6)

    def forward(self, inputs):
        assert inputs.dim() == 3, (
            f"Input vector must be of shape: [batch_size, {self.num_node}, {self.feature_dim}]"
        )
        assert inputs.size(1) == self.num_node and inputs.size(2) == self.feature_dim, (
            f"Expected input feature `({self.num_node}, {self.feature_dim})` (num of nodes, feature dim) but received {inputs.shape}"
        )

        num_batch = inputs.size(0)
        e_i, e_j = self.edges.A
        e_i = (
            e_i.unsqueeze(0).repeat(batch_size, 1)
            + (torch.arange(batch_size) * self.num_node).unsqueeze(1).flatten()
        )
        e_j = (
            e_j.unsqueeze(0).repeat(batch_size, 1)
            + (torch.arange(batch_size) * self.num_node).unsqueeze(1).flatten()
        )

        x = inputs @ self.W.T  # [batch_size, num_node, self.output_dim]
        assert (
            x.size(0) == num_batch
            and x.size(1) == self.num_node
            and x.size(2) == self.output_dim
        ), f"Incorrect output after linear with shape {x.shape}. Returned: {x}"
        x = x.view(-1, self.output_dim)

        # create attn inputs for edges
        a_input = torch.cat(
            [x[e_i], x[e_j]], dim=1
        )  # [2 * self.output_dim, 2 * self.output_dim]
        assert a_input.size(0) == a_input.size(1) == 2 * self.output_dim, (
            f"Error concatenating: unexpected size of ({a_input.shape}). Output\n{a_input} "
        )

        a = self.active(torch.matmul(a_input, self.attn)).squeeze(-1)  # [2 * num_node]
        assert a.dim() == 1 and a.size(0) == 2 * self.output_dim, (
            f"Expected size {2 * self.output_dim} but received: {a.shape}"
        )
        print(f"Alpha {a}")

        # Softmax over neighbor edges
        denom = torch.zeros(num_batch * self.num_node, device=x.device).scatter_add(
            0, e_i, a
        )
        print(f"Softmaxed before dropout (shape {denom.shape}): {denom}")

        alpha = self.dropout(
            a / (denom[e_i] + 1e-16)
        )  # actually used self.dropout(torch.ones_like(a) / (denom[e_i] + 1e-16))
        print(f"Coeff (shape {alpha.shape}): {alpha}")

        # Aggregate features
        output = torch.zeros_like(x)
        output.index_add_(0, e_i, alpha.unsqueeze(-1) * x[e_j])

        return output, a


adj = torch.tensor(
    [[0, 1, 1, 0], [1, 0, 0, 2], [5, 0, 0, 1], [0, 1, 1, 0]], dtype=torch.float32
)

num_node = 4
input_dim = 1
output_dim = 4
x = torch.tensor(
    [
        [
            [1.0],  # Node 0
            [10.0],  # Node 1
            [3.0],  # Node 2
            [4.0],
        ]  # Node 3
    ]
)  # Shape: (1, 4, 1)

print(f"Inserting test x with shape {x.shape}")
model = GAT(
    num_node=num_node, input_dim=input_dim, output_dim=output_dim, edge_list=adj
)

Inserting test x with shape torch.Size([1, 4, 1])


In [163]:
output, a = model(x)
output.softmax(-1)

Alpha tensor([-0.1224, -0.0311,  6.6841,  2.7713,  1.0922, -0.0282, -0.0985, -0.0072],
       grad_fn=<SqueezeBackward1>)
Softmaxed before dropout (shape torch.Size([4])): tensor([-0.1536,  9.4554,  1.0640, -0.1056], grad_fn=<ScatterAddBackward0>)
Coeff (shape torch.Size([8])): tensor([0.0000, 0.0000, 1.7673, 0.7327, 2.5663, -0.0000, 0.0000, 0.1698],
       grad_fn=<MulBackward0>)


tensor([[0.2500, 0.2500, 0.2500, 0.2500],
        [0.0272, 0.0277, 0.0740, 0.8711],
        [0.0963, 0.0973, 0.1664, 0.6399],
        [0.2187, 0.2191, 0.2437, 0.3184]], grad_fn=<SoftmaxBackward0>)

# Attentions as Decision Model

Manual implementation of Attention (number of heads = 1). Alternatively, can use `nn.MultiheadAttention` with 1 head specified

#### Self Attention Manually computed

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, num_block: int, feat_dim: int, output_dim: int):
        super(SelfAttention, self).__init__()
        self.num_block = num_block
        self.feat_dim = feat_dim
        self.output_dim = output_dim
        self.scale_term = self.feat_dim**0.5

        self.q_proj = nn.Linear(self.feat_dim, self.feat_dim, bias=False)
        self.k_proj = nn.Linear(self.feat_dim, self.feat_dim, bias=False)
        self.v_proj = nn.Linear(self.feat_dim, self.feat_dim, bias=False)
        self.out_proj = nn.Linear(feat_dim, output_dim, bias=False)

    def forward(self, x):
        # inputs shape = (B, T, D) = (batch_dim, seq_length, self.output_dim)
        assert x.dim() == 3 and x.size(-1) == self.feat_dim, (
            f"Expected input shape (batch_size, seq_length, {self.feat_dim}) but received {x.shape}"
        )

        # x: (batch_size, seq_len, self.feat_dim)
        Q = self.q_proj(x)
        K = self.k_proj(x)
        V = self.v_proj(x)
        print("QKV shapes:", Q.shape, K.shape, V.shape)

        # Attention: Q (B,T,D) @ Káµ€ (B,D,L) => (B,T,L)
        attn_scores = Q @ K.transpose(-2, -1) / self.scale_term  # Scaled dot-product
        attn_weights = attn_scores.softmax(-1)
        attn_output_ = attn_weights @ V
        attn_output = self.out_proj(attn_output_)
        print(
            f"Attn scores {attn_scores.shape} Attn weights {attn_weights.shape} Attn output pre {attn_output_.shape} Attn output {attn_output.shape}"
        )

        return attn_output, attn_scores, attn_weights


def trial_run():
    """To test the model architecture"""

    seq_len = 4
    num_block = 30
    feat_dim = 8  # does it have to be same as seq length?
    output_dim = 1
    x = torch.tensor(
        [
            [
                [0.1, 0.2, 0.3, 0.4, 0.0, 0.1, 0.2, 0.3],
                [0.3, 0.2, 0.1, 0.0, 0.4, 0.3, 0.2, 0.1],
                [0.5, 0.6, 0.7, 0.8, 0.2, 0.1, 0.0, 0.3],
            ]
        ],
        dtype=torch.float32,
    )  # shape: [1, seq_len, output_dim]

    model = SelfAttention(num_block=1, feat_dim=feat_dim, output_dim=output_dim)
    output, scores, weights = model(x)

    print("Output:\n", output)
    print("Attention Scores:\n", scores)
    print("Attention Weights:\n", weights)


trial_run()

QKV shapes: torch.Size([1, 3, 8]) torch.Size([1, 3, 8]) torch.Size([1, 3, 8])
Attn scores torch.Size([1, 3, 3]) Attn weights torch.Size([1, 3, 3]) Attn output pre torch.Size([1, 3, 8]) Attn output torch.Size([1, 3, 1])
Output:
 tensor([[[0.1673],
         [0.1680],
         [0.1667]]], grad_fn=<UnsafeViewBackward0>)
Attention Scores:
 tensor([[[-0.0257, -0.0357, -0.0475],
         [-0.0135, -0.0113, -0.0030],
         [-0.0492, -0.0550, -0.0850]]], grad_fn=<DivBackward0>)
Attention Weights:
 tensor([[[0.3369, 0.3335, 0.3296],
         [0.3319, 0.3326, 0.3354],
         [0.3379, 0.3360, 0.3261]]], grad_fn=<SoftmaxBackward0>)


In [None]:
k_cache = output.past_key_values.key_cache
v_cache = output.past_key_values.value_cache

In [239]:
torch.concat(k_cache).shape

torch.Size([30, 3, 54, 64])

#### Simple Simulation Grid Example

In [None]:
def encode_maze(maze):
    H, W = maze.shape
    features = []

    for i in range(H):
        for j in range(W):
            val = maze[i, j]
            features.append(
                [
                    float(val == 2),  # is_start
                    float(val == 3),  # is_goal
                    float(val == 1),  # is_wall
                    i / H,  # normalized y
                    j / W,  # normalized x
                ]
            )

    return torch.tensor([features], dtype=torch.float32)  # [1, seq_len, feat_dim]


def agent_step(attn_weights, maze, H, W):
    # Find index of start
    flat_maze = maze.flatten()
    start_idx = (flat_maze == 2).nonzero().item()

    # Look at attention from start node to all others
    attn_from_start = attn_weights[0, start_idx]

    # Mask walls
    wall_mask = flat_maze == 1
    attn_from_start[wall_mask] = -float("inf")

    # Choose highest attention score (simulating agent move)
    next_idx = attn_from_start.argmax().item()
    next_pos = (next_idx // W, next_idx % W)

    return next_pos


def trial_as_decision():
    maze = torch.tensor(
        [[2, 0, 1], [1, 0, 3], [0, 0, 1]]
    )  # 2=start, 3=goal, 1=wall ~ goal would be [1, 2]

    H, W = maze.shape
    x = encode_maze(maze)

    num_block = 1
    feat_dim = 5
    model = SelfAttention(num_block=1, feat_dim=feat_dim, output_dim=feat_dim)
    output, scores, weights = model(x)

    next_pos = agent_step(weights, maze, H, W)
    print("Agent should move to:", next_pos)
    print("Output:", output)
    print("Attention Scores:", scores)
    print("Attention Weights:", weights)


In [205]:
trial_as_decision()

QKV shapes: torch.Size([1, 9, 5]) torch.Size([1, 9, 5]) torch.Size([1, 9, 5])
Attn scores torch.Size([1, 9, 9]) Attn weights torch.Size([1, 9, 9]) Attn output pre torch.Size([1, 9, 5]) Attn output torch.Size([1, 9, 5])
Agent should move to: (0, 0)


#### Simulation Grid Example

In [None]:
import random
import torch.optim as optim

ACTIONS = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # up, down, left, right
NUM_ACTION = 4


class MazeAttentionAgent(nn.Module):
    def __init__(self, num_block: int, feat_dim: int, output_dim: int):
        super().__init__()
        self.attn = SelfAttention(num_block, feat_dim, output_dim)
        self.policy_head = nn.Linear(feat_dim, NUM_ACTION)  # 4 = up/down/left/right

    def forward(self, x):
        attn_output, _, attn_weights = self.attn(x)
        logits = self.policy_head(attn_output)  # (B, N, 4)
        return logits, attn_weights


def generate_random_maze(H, W):
    maze = torch.zeros(H, W, dtype=torch.int32)
    maze[random.randint(0, H - 1), random.randint(0, W - 1)] = 2  # start
    maze[random.randint(0, H - 1), random.randint(0, W - 1)] = 3  # goal
    for _ in range(int(H * W * 0.3)):
        maze[random.randint(0, H - 1), random.randint(0, W - 1)] = 1  # walls

    print(f"Generated Maze:\n", maze)
    return maze


def is_valid(pos, maze):
    y, x = pos
    return 0 <= y < maze.shape[0] and 0 <= x < maze.shape[1] and maze[y, x] != 1


def move(pos, action_idx):
    dy, dx = ACTIONS[action_idx]
    return (pos[0] + dy, pos[1] + dx)


def train(agent, epochs=1000, maze_size=(5, 5)):
    gamma = 0.99  # discount factor
    optimizer = optim.Adam(agent.parameters(), lr=1e-3)
    maze = generate_random_maze(*maze_size)
    encoded = encode_maze(maze)

    for epoch in range(epochs):
        logits, _ = agent(encoded)

        flat_maze = maze.flatten()
        print(flat_maze)
        start_idx = (flat_maze == random.choice(range(NUM_ACTION))).nonzero().item()
        start_pos = (start_idx // maze.shape[1], start_idx % maze.shape[1])

        pos = start_pos
        log_probs = []
        rewards = []
        max_steps = 10

        for step in range(max_steps):
            idx = pos[0] * maze.shape[1] + pos[1]
            probs = logits[0, idx].softmax(-1)
            dist = torch.distributions.Categorical(probs)
            action = dist.sample()
            next_pos = move(pos, action.item())

            if not is_valid(next_pos, maze):
                rewards.append(-1.0)
                log_probs.append(dist.log_prob(action))
                break

            pos = next_pos
            log_probs.append(dist.log_prob(action))

            if maze[pos[0], pos[1]] == 3:
                rewards.append(10.0)
                break
            else:
                rewards.append(-0.1)

        # Discounted rewards
        G = 0
        returns = []
        for r in reversed(rewards):
            G = r + gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns)

        # Policy gradient loss
        loss = 0
        for log_prob, ret in zip(log_probs, returns):
            loss -= log_prob * ret

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if epoch % 100 == 0:
            print(f"Epoch {epoch} | Return: {sum(rewards):.2f}")


def run():
    num_block = 1
    feat_dim = 5
    num_epochs = 1000
    maze_size = (5, 5)
    # Initialize the agent
    agent = MazeAttentionAgent(num_block=num_block, feat_dim=5, output_dim=feat_dim)
    # Train the agent
    train(agent, epochs=num_epochs, maze_size=maze_size)


run()

Generated Maze:
 tensor([[0, 0, 0, 1, 1],
        [0, 1, 0, 1, 2],
        [0, 0, 0, 1, 0],
        [1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0]], dtype=torch.int32)
QKV shapes: torch.Size([1, 25, 5]) torch.Size([1, 25, 5]) torch.Size([1, 25, 5])
Attn scores torch.Size([1, 25, 25]) Attn weights torch.Size([1, 25, 25]) Attn output pre torch.Size([1, 25, 5]) Attn output torch.Size([1, 25, 5])
tensor([0, 0, 0, 1, 1, 0, 1, 0, 1, 2, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
        0], dtype=torch.int32)
Epoch 0 | Return: -1.00
QKV shapes: torch.Size([1, 25, 5]) torch.Size([1, 25, 5]) torch.Size([1, 25, 5])
Attn scores torch.Size([1, 25, 25]) Attn weights torch.Size([1, 25, 25]) Attn output pre torch.Size([1, 25, 5]) Attn output torch.Size([1, 25, 5])
tensor([0, 0, 0, 1, 1, 0, 1, 0, 1, 2, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
        0], dtype=torch.int32)
QKV shapes: torch.Size([1, 25, 5]) torch.Size([1, 25, 5]) torch.Size([1, 25, 5])
Attn scores torch.Size([1, 25, 25]) Attn weights torch.

RuntimeError: a Tensor with 18 elements cannot be converted to Scalar

# Application with LLM

Sequence Length in Multi-head Attentions

- In most LLM applications, the multi-head attentions accepts input size of (batch_size, seq_length, emb_dim)
- The `seq_length` (sequence length) means the number of channels and the `emb_dim` represents the vector representation per sequence input or channel (and NOT the batch input feature)

In [None]:
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    DynamicCache,
    GenerationConfig,
)

In [None]:
output_config = dict(
    use_cache=True,
    output_attentions=False,
    return_dict_in_generate=True,
    return_legacy_cache=False,
)
decode_config = dict(max_new_tokens=50, do_sample=False, stop_strings="\n\n")


class ModelManager:
    def __init__(self, system_prompt=None):
        if system_prompt is None:
            prompt = "You are Naomi, a friendly and sarcastic human chatting with users online!"
        else:
            prompt = system_prompt

        self.history = [{"role": "system", "content": prompt}]

    def chat(self, user_input: str):
        self.history.append({"role": "user", "content": user_input})

        input_text = tokenizer.apply_chat_template(
            self.history, tokenize=False, add_generation_prompt=True
        )
        inputs = tokenizer.encode(
            input_text,
            max_length=50,
            padding="max_length",
            padding_side="left",
            truncation="longest_first",
            return_tensors="pt",
        )
        end_prompt_idx = inputs.size(1)  # type: ignore
        print(f"Input tokens length: {end_prompt_idx}")
        outputs = model.generate(
            inputs,
            tokenizer=tokenizer,
            **decode_config,
            **output_config,  # type: ignore
        )

        output_token = outputs.sequences.squeeze(0)
        print(f"Output tokens (post squeeze): {output_token.shape}")
        response = tokenizer.decode(
            output_token[end_prompt_idx:].cpu().numpy(), skip_special_tokens=True
        )
        text = response.replace("assistant\n", "")
        print(f"Response: {text}")
        print("--" * 10)
        print(
            "Full output:\n",
            tokenizer.decode(outputs.sequences.squeeze(0).cpu().numpy()),
        )
        # odict_keys(['sequences', 'attentions', 'past_key_values']) - it must output attentions to enable use_cache=True (return past_key_values)
        self.history.append({"role": "assistant", "content": text})
        return outputs

    @staticmethod
    def signal_random_injection(mod_signal):
        num_slices = torch.randint(low=50, high=300, size=(1,)).item()

        # Inject noise into a random slice of the last dimension
        # Select a start index randomly such that it doesn't overflow
        start_idx = torch.randint(0, mod_signal.size(-1) - num_slices, (1,)).item()
        end_idx = start_idx + num_slices

        # Create your modification signal (e.g., ones or random noise)
        injected_patch = torch.randn_like(mod_signal[:, :, start_idx:end_idx]) * 50

        # Inject the patch
        mod_signal[:, :, start_idx:end_idx] += injected_patch

        print(f"Injected {num_slices} dims from {start_idx} to {end_idx}")
        return mod_signal

    @staticmethod
    def injection_layer(module, input, output=None):
        print(
            f"{'--' * 10} Injection layer summary {'--' * 10}\nInput: {len(input) if input else None}\nOutput: {len(output) if output else None}"
        )
        hidden_states = input[0]
        mod_signal = torch.zeros_like(
            hidden_states
        )  # one of the methods is to multiply by some bias * 0.1
        mod_signal = ModelManager.signal_random_injection(
            mod_signal
        )  # mod_signal[:, :, :100] = 100
        print(
            f"Hidden states shape: {hidden_states.shape}\nSignal from Attn {mod_signal.shape}"
        )
        print("--" * 50)
        return hidden_states + mod_signal

    def setup(self, hook_layer: int):
        model.model.layers[
            hook_layer
        ].post_attention_layernorm.register_forward_pre_hook(
            ModelManager.injection_layer
        )


target_layer = -1
treadmill = ModelManager()
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM-135M-Instruct")
model = AutoModelForCausalLM.from_pretrained(
    "HuggingFaceTB/SmolLM-135M-Instruct", attn_implementation="eager"
)
model.eval()
treadmill.setup(target_layer)
output = treadmill.chat("Hey Naomi, I fucking hate you..")

Input tokens length: 50
-------------------- Injection layer summary --------------------
Input: 1
Output: None
Injected 119 dims from 26 to 145
Hidden states shape: torch.Size([1, 50, 576])
Signal from Attn torch.Size([1, 50, 576])
----------------------------------------------------------------------------------------------------
-------------------- Injection layer summary --------------------
Input: 1
Output: None
Injected 153 dims from 327 to 480
Hidden states shape: torch.Size([1, 1, 576])
Signal from Attn torch.Size([1, 1, 576])
----------------------------------------------------------------------------------------------------
-------------------- Injection layer summary --------------------
Input: 1
Output: None
Injected 280 dims from 10 to 290
Hidden states shape: torch.Size([1, 1, 576])
Signal from Attn torch.Size([1, 1, 576])
----------------------------------------------------------------------------------------------------
-------------------- Injection layer summary ----

In [357]:
treadmill.chat("What is your name?")

Input tokens length: 50
-------------------- Injection layer summary --------------------
Input: 1
Output: None
Injected 247 dims from 241 to 488
Hidden states shape: torch.Size([1, 50, 576])
Signal from Attn torch.Size([1, 50, 576])
----------------------------------------------------------------------------------------------------
-------------------- Injection layer summary --------------------
Input: 1
Output: None
Injected 61 dims from 20 to 81
Hidden states shape: torch.Size([1, 1, 576])
Signal from Attn torch.Size([1, 1, 576])
----------------------------------------------------------------------------------------------------
-------------------- Injection layer summary --------------------
Input: 1
Output: None
Injected 197 dims from 300 to 497
Hidden states shape: torch.Size([1, 1, 576])
Signal from Attn torch.Size([1, 1, 576])
----------------------------------------------------------------------------------------------------
-------------------- Injection layer summary -----

GenerateDecoderOnlyOutput(sequences=tensor([[    1,  9690,   198,  2683,   359, 45242,    28,   253,  7952,   284,
         24459,  2607,  1205, 36498,   351,  3629,  2329,    17,     2,   198,
             1,  4093,   198, 22234, 45242,    28,   339,   275, 24431, 13735,
           346,   950,     2,   198,     1,   520,  9531,   198,    57,  5248,
           588, 22657,   288,  4875,   338,   346,  2316,  1953,   253,  9228,
           655,    30,   339,  5248,  1535,   288,   724,   346,    30,   198,
           198]]), scores=None, logits=None, attentions=None, hidden_states=None, past_key_values=DynamicCache())

# Attention as a Ranker

- Full script in macbooks local `algorithms`
- The Ranker model was supposed to rank by inputting 1s and 0s but it seems that it could perform better as a multi-label classifier (when the loss function  is nn.BCEWithLogitsLoss) with sigmoid output 
- The Ranker model works when the loss is MSELoss and the output is sigmoid

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_classification

In [None]:
def create_ordinal_target(num, length: int) -> torch.Tensor:
    """
    Creates a batch of binary target vectors where each vector contains 1s from index 0 to num[i]-1
    and 0s for the rest, given a list of integers and a fixed vector length.

    Args:
        num (list): A list of integers specifying how many 1s each vector should contain.
        length (int): The total length of each output vector (or the number of ranking placements)

    Returns:
        torch.Tensor: A tensor of shape (len(num), length) with 1s up to each corresponding
                      value in `num`, and 0s elsewhere.

    Example:
        >>> create_target([2, 4], 5)
        tensor([[1, 1, 0, 0, 0],
                [1, 1, 1, 1, 0]])
    """
    assert not isinstance(num, int), f"Num must be either tensor / numpy list."
    num_data = len(num)
    vec = torch.zeros((num_data, length), dtype=torch.float, requires_grad=False)
    for idx, d in enumerate(num):
        vec[idx, :d] = 1.0
    return vec


class Ranker(nn.Module):
    def __init__(self, num_ranks, emb_dim, num_heads, dropout_rate=0.6, seq_len=25):
        super(Ranker, self).__init__()
        self.num_ranks = num_ranks
        self.emb_dim = emb_dim
        self.num_heads = num_heads
        self.dropout_rate = dropout_rate
        self.seq_len = seq_len

        # batch_first means that input and output follows (1, seq_length, emb_dim)
        self.attn = nn.MultiheadAttention(
            self.emb_dim,
            self.num_heads,
            dropout=self.dropout_rate,
            batch_first=True,
            bias=False,
        )
        self.fn = nn.Linear(self.emb_dim, self.num_ranks, bias=True)

    def forward(self, Q, K, V):
        assert Q.size(1) == self.seq_len and Q.size(2) == self.emb_dim, (
            f"Expected Q shape [batch_size, {self.seq_len}, {self.emb_dim}](batch_size, seq_len, emb_dim) but received: {Q.shape}"
        )
        assert Q.shape == K.shape == V.shape

        batch_size = Q.size(0)
        attn_output, attn_weights = self.attn(Q, K, V)
        assert (
            attn_output.size(0) == batch_size
            and attn_output.size(1) == self.seq_len
            and attn_output.size(2) == self.emb_dim
        ), (
            f"Expected Attn output shape to be (batch_size {self.seq_len}, {self.emb_dim}) but received {attn_output.shape}"
        )
        assert (
            attn_weights.size(0) == batch_size
            and attn_weights.size(1) == attn_weights.size(2) == self.seq_len
        ), (
            f"Expected Attn output weight shape to be (batch_size {self.seq_len}, {self.seq_len}) but received {attn_weights.shape}"
        )

        output = attn_output.softmax(-1)
        x = self.fn(output)
        assert (
            x.dim() == 3 and x.size(1) == self.seq_len and x.size(2) == self.num_ranks
        )
        x = x.sum(axis=1).unsqueeze(1)
        assert x.dim() == 3 and x.size(0) == batch_size and x.size(2) == self.num_ranks

        # print(f"Prediction:\n", torch.round(x * 1e2) / 1e2)
        return x.sigmoid()


def display_results(model_output, target, threshold: float = 0.65):
    """counts by batch input higher than threshold"""
    results = (model_output > threshold).sum(axis=-1)
    target_results = (target > threshold).sum(axis=-1)
    print(
        "Model predicted ranks: ",
        results.flatten(),
        "Number of correct",
        (results.flatten() == target_results.flatten()).sum(axis=0).item(),
        "target:",
        target_results.flatten(),
    )


def generate_pattern(levels, emb_dim, noise_scale=0.01):
    pattern = torch.cat(
        [torch.ones(length, emb_dim) * val for length, val in levels], dim=0
    )
    return pattern + noise_scale * torch.randn_like(pattern)


def mock_dataset(batch_size, emb_dim, seq_len):
    assert batch_size == 3, "This mock currently supports batch_size=3 for simplicity"

    # Define sequence lengths
    spike_pattern = torch.ones(seq_len, emb_dim) * 0.1
    spike_pattern[seq_len // 2] = 1.5  # Spike at center timestep

    base_patterns = [
        generate_pattern([(seq_len, 0.1)], emb_dim),
        spike_pattern + 0.01 * torch.randn_like(spike_pattern),
        generate_pattern([(seq_len, 0.9)], emb_dim),
    ]

    # Add Gaussian noise
    q = torch.stack(
        [pattern + 0.01 * torch.randn_like(pattern) for pattern in base_patterns]
    )
    k = torch.stack(
        [pattern + 0.01 * torch.randn_like(pattern) for pattern in base_patterns]
    )
    v = torch.stack(
        [pattern + 0.01 * torch.randn_like(pattern) for pattern in base_patterns]
    )

    target = torch.tensor(
        [[[1, 0, 0, 0, 0]], [[1, 1, 0, 0, 0]], [[1, 1, 1, 0, 0]]], dtype=torch.float
    )

    print("Q:\n", q.shape)
    print("K:\n", k.shape)
    print("V:\n", v.shape)
    print("Target:\n", target.shape)
    print("--" * 10)
    return q, k, v, target


def formally_mock_data(batch_size, num_ranks, emb_dim, seq_len):
    # mock dataset generation args
    X, y = make_classification(
        n_samples=batch_size,
        n_features=emb_dim * seq_len * 3,
        n_informative=num_ranks,
        n_redundant=1,
        n_classes=num_ranks,
        class_sep=1.0,
        random_state=42,
    )

    X_grouped = torch.tensor(X, dtype=torch.float).view(batch_size, 3, seq_len, emb_dim)
    print(X_grouped.shape)
    q = torch.tensor(X_grouped[:, 0, :, :]).clone().detach()
    k = torch.tensor(X_grouped[:, 1, :, :]).clone().detach()
    v = torch.tensor(X_grouped[:, 2, :, :]).clone().detach()
    print(q.shape)
    target = create_ordinal_target(y, length=num_ranks)
    target = target.unsqueeze(1)
    return q, k, v, target


def trial(max_epoch: int = 100):
    emb_dim = 50
    num_heads = 2
    dropout_rate = 0.6
    num_ranks = 5
    seq_len = 25
    batch_size = 10

    q, k, v, target = formally_mock_data(
        batch_size=batch_size, num_ranks=num_ranks, emb_dim=emb_dim, seq_len=seq_len
    )
    model = Ranker(num_ranks, emb_dim, num_heads, dropout_rate, seq_len)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.MSELoss()

    model.train()
    for epoch in range(int(max_epoch)):
        optimizer.zero_grad()

        # Assuming model(q, k, v) returns predictions of shape (batch_size, seq_len, 1)
        output = model(q, k, v)
        loss = criterion(output, target)
        display_results(output, target)
        loss.backward()
        optimizer.step()

        print(f"Epoch {epoch + 1}/{max_epoch} - Loss: {loss.item():.4f}")

    print(f"Final Prediction:\n", torch.round(output * 1e2) / 1e2)


# qkv shape [batch_size, seq_len, emb_dim]
trial()

torch.Size([10, 3, 25, 50])
torch.Size([10, 25, 50])
Model predicted ranks:  tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) Number of correct 2 target: tensor([4, 4, 0, 2, 1, 1, 2, 3, 0, 3])
Epoch 1/100 - Loss: 0.2565
Model predicted ranks:  tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) Number of correct 2 target: tensor([4, 4, 0, 2, 1, 1, 2, 3, 0, 3])
Epoch 2/100 - Loss: 0.2527
Model predicted ranks:  tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) Number of correct 2 target: tensor([4, 4, 0, 2, 1, 1, 2, 3, 0, 3])
Epoch 3/100 - Loss: 0.2495
Model predicted ranks:  tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) Number of correct 2 target: tensor([4, 4, 0, 2, 1, 1, 2, 3, 0, 3])
Epoch 4/100 - Loss: 0.2460
Model predicted ranks:  tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) Number of correct 2 target: tensor([4, 4, 0, 2, 1, 1, 2, 3, 0, 3])
Epoch 5/100 - Loss: 0.2429
Model predicted ranks:  tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) Number of correct 2 target: tensor([4, 4, 0, 2, 1, 1, 2, 3, 0, 3])
Epoch 6/100 - Loss: 0.2400
Model

  q = torch.tensor(X_grouped[:, 0, :, :]).clone().detach()
  k = torch.tensor(X_grouped[:, 1, :, :]).clone().detach()
  v = torch.tensor(X_grouped[:, 2, :, :]).clone().detach()


In [None]:
emb_dim = 4
num_heads = 2
dropout_rate = 0.6
num_ranks = 3
seq_len = 25
batch_size = 3

assert emb_dim % num_heads == 0, (
    f"Number of emb_dim must be divisible by num heads, received emb_dim={emb_dim} and num_heads={num_heads}"
)

attn = nn.MultiheadAttention(
    emb_dim, num_heads, dropout=dropout_rate, batch_first=True, bias=False
)
fn = nn.Linear(emb_dim, num_ranks)

q = torch.randn(batch_size, seq_len, emb_dim)
k = torch.randn(batch_size, seq_len, emb_dim)
v = torch.randn(batch_size, seq_len, emb_dim)
target = torch.tensor([[[1, 0, 0]], [[1, 1, 0]], [[1, 1, 1]]], dtype=torch.float)

# print("Q:\n", q)
# print("K:\n", k)
# print("V:\n", v)
print("Target:\n", target.shape)

Target:
 torch.Size([3, 1, 3])


In [129]:
output = attn(q, k, v)
# outputs tuple with length = 2
# idx 0 = attn output after softmax = shape = (1, 25, 4) = (batch_size, seq_len, emb_dim)
# idx 1 = attn weights = shape = (1, 25, 25) = (batch_size, seq_len, seq_len)

print("Attention output shape:", output[0].shape)
print("Attention output weights shape:", output[1].shape)

Attention output shape: torch.Size([3, 25, 4])
Attention output weights shape: torch.Size([3, 25, 25])


In [157]:
fn(output[0]).sum(axis=1).softmax(-1).shape

torch.Size([3, 3])

In [152]:
x = fn(output[0]).sum(axis=1).softmax(-1)
x.shape

torch.Size([3, 3])

In [140]:
torch.round(x * 1e2) / 1e2

tensor([[0.2100, 0.0000, 0.7900],
        [0.9900, 0.0000, 0.0100],
        [0.9800, 0.0000, 0.0200]], grad_fn=<DivBackward0>)

# Comparing Models Example Setup

In [None]:
class BattleField:
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained(
            "HuggingFaceTB/SmolLM-135M-Instruct"
        )
        self.infer_model = AutoModelForCausalLM.from_pretrained(
            "HuggingFaceTB/SmolLM-135M-Instruct", attn_implementation="eager"
        )
        self.assist_model = AutoModelForCausalLM.from_pretrained(
            "HuggingFaceTB/SmolLM-135M-Instruct", attn_implementation="eager"
        )

        assert id(self.assist_model) != id(self.infer_model)
        assert (
            self.assist_model.model.layers[0].self_attn.q_proj.weight.data_ptr()
            != self.infer_model.model.layers[0].self_attn.q_proj.weight.data_ptr()
        )

In [359]:
BattleField()

<__main__.BattleField at 0x32f7a95b0>