# MultiHead Attention

In [8]:
import torch
import torch.nn as nn 
import math

class SingleHeadAttention(nn.Module):
    def __init__(self, d_model):
        super(SingleHeadAttention, self).__init__()
        self.d_model = d_model

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, query, key, value):
        ''' 
        query, key, value have dimension of [batch_size, seq_len, d_model]
        '''

        q = self.query(query)
        k = self.key(key)
        v = self.value(value)

        attention = torch.matmul(q, k.transpose(1,2)) / math.sqrt(self.d_model)     # Attention has dimension of [batch_size, seq_len, seq_len]
        print('atten Dim: ', attention.shape)
        attention_weights = self.softmax(attention)     # Attention weights have dimension of [batch_size, seq_len, seq_len], as torch keeps dimension
        print('attention_weights Dim: ', attention_weights.shape)
        attented_values = torch.matmul(attention_weights, v)      # attented_values have dimension of [batch_size, seq_len, d_model]
        print('attented_values Dim: ', attented_values.shape)
        return attented_values


atten_layer = SingleHeadAttention(100)

query = torch.rand(10,20,100)
key = torch.rand(10,20,100)
value = torch.rand(10,20,100)

attented_values = atten_layer(query, key, value)
attented_values.shape

atten Dim:  torch.Size([10, 20, 20])
attention_weights Dim:  torch.Size([10, 20, 20])
attented_values Dim:  torch.Size([10, 20, 100])


torch.Size([10, 20, 100])

In [11]:
import torch
import torch.nn as nn 
import math

class SingleHeadSelfAttentionLayer(nn.Module):
    def __init__(self, d_model):
        super(SingleHeadSelfAttentionLayer, self).__init__()
        self.d_model = d_model
        self.dim_query, self.dim_key = 50, 50
        self.q_layer = nn.Linear(d_model, self.dim_query)
        self.k_layer = nn.Linear(d_model, self.dim_key)
        self.v_layer = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, X):
        ''' 
        X has dimension of [batch_size, seq_len, d_model]
        '''
        q = self.q_layer(X)
        k = self.k_layer(X)
        v = self.v_layer(X)

        scores = torch.matmul(q, k.transpose(1,2)) / math.sqrt(self.dim_query)
        print('atten Dim: ', scores.shape)

        attention_weights = self.softmax(scores)
        print('attention_weights Dim: ', attention_weights.shape)

        attented_values = torch.matmul(attention_weights, v)
        print('attented_values Dim: ', attented_values.shape)

        return attented_values
    
self_atten = SingleHeadSelfAttentionLayer(100)

X = torch.rand(10,20,100)
attented_values = self_atten(X)
print(attented_values.shape)



atten Dim:  torch.Size([10, 20, 20])
attention_weights Dim:  torch.Size([10, 20, 20])
attented_values Dim:  torch.Size([10, 20, 100])
torch.Size([10, 20, 100])


In [17]:
import torch
import torch.nn as nn
import math

class MultiHeadSelfAttentionLayer(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_head = d_model // num_heads

        assert self.d_head * num_heads == d_model, "d_model must be divisible by num_heads"
        
        self.q_layer = nn.Linear(d_model, d_model)
        self.k_layer = nn.Linear(d_model, d_model)
        self.v_layer = nn.Linear(d_model, d_model)

        self.fc = nn.Linear(d_model, d_model)

    def forward(self, query, key, value):
        ''' 
        query, key, value have dimension of [batch_size, seq_len, d_model]
        '''

        q = self.q_layer(query)
        k = self.k_layer(key)
        v = self.v_layer(value)

        q = q.view(q.shape[0], q.shape[1], self.num_heads, self.d_head)     # [batch_size, q_seq_len, num_heads, d_head]
        k = k.view(k.shape[0], k.shape[1], self.num_heads, self.d_head)     # [batch_size, k_seq_len, num_heads, d_head]
        v = v.view(v.shape[0], v.shape[1], self.num_heads, self.d_head)     # [batch_size, v_seq_len, num_heads, d_head]
        # during auto-regressive decoding with self-attention, we usually have k_seq_len = v_seq_len
        # However the q_seq_len should be 1, as we are decoding one token at a time.

        # Note! Multi-head are attended on seq position.
        scores = torch.einsum('binh,bjnh->binj', q, k) / math.sqrt(self.d_head)      # [batch_size, q_seq_len, num_heads, k_seq_len]. 
        atten_weights = torch.softmax(scores, dim=-1)
        print('Dim atten_weights: ', atten_weights.shape)

        attended_values = torch.einsum('binj,bjnh->binh', atten_weights, v)         # shape: [batch_size, q_seq_len, num_heads, d_head]
        print('Dim attended_values: ', attended_values.shape)
        attended_values = attended_values.contiguous().view(attended_values.shape[0], attended_values.shape[1], self.d_model)
        print('Dim attended_values flatten: ', attended_values.shape)
        output = self.fc(attended_values)

        return output



atten_layer = MultiHeadSelfAttentionLayer(1024, 8)

query = torch.rand(10,20,1024)
key = torch.rand(10,20,1024)
value = torch.rand(10,20,1024)

attented_values = atten_layer(query, key, value)
attented_values.shape


Dim atten_weights:  torch.Size([10, 20, 8, 20])
Dim attended_values:  torch.Size([10, 20, 8, 128])
Dim attended_values flatten:  torch.Size([10, 20, 1024])


torch.Size([10, 20, 1024])

# Transformer layer 

This code implements a basic transformer layer with the following components:

1. Multi-Head Attention: Uses PyTorch's built-in nn.MultiheadAttention.

2. Feed-Forward Network: Two linear layers with a ReLU activation in between.

3. Layer Normalization: Applied after both the attention and feed-forward parts.

4. Residual Connections: Implemented by adding the input to the output of each sub-layer.

5. Dropout: Applied to the outputs of both the attention and feed-forward parts.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class TransformerLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerLayer, self).__init__()
        self.multi_head_attention = nn.MultiheadAttention(d_model, num_heads)
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Multi-Head Attention
        attn_output, _ = self.multi_head_attention(x, x, x, attn_mask=mask)
        x = x + self.dropout(attn_output)
        x = self.norm1(x)

        # Feed Forward
        ff_output = self.linear2(F.relu(self.linear1(x)))
        x = x + self.dropout(ff_output)
        x = self.norm2(x)

        return x

# Example usage
d_model = 512  # Dimension of the model
num_heads = 8  # Number of attention heads
d_ff = 2048    # Dimension of the feed-forward network
seq_length = 10
batch_size = 32

layer = TransformerLayer(d_model, num_heads, d_ff)
input_tensor = torch.randn(seq_length, batch_size, d_model)
output = layer(input_tensor)

print(f"Input shape: {input_tensor.shape}")
print(f"Output shape: {output.shape}")

In [4]:
import torch
from torch import nn
import math



class MultiHeadSelfAttentionLayer(nn.Module):
    def __init__(self, h=12, dk=128):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.h = h
        self.dk = dk
        d_model = h * dk
        self.d_model = h * dk

        self.Wq = nn.Linear(d_model, d_model)
        self.Wk = nn.Linear(d_model, d_model)
        self.Wv = nn.Linear(d_model, d_model)
        self.softmax = nn.Softmax(dim=-1)

        self.fc = nn.Linear(d_model, d_model)

    def forward(self, x):
        '''
        x has shape [batch_size, seq_len, d_model]
        '''
        
        # q. k, v have shape [batch_size, seq_len, d_model]
        q = self.Wq(x)
        k = self.Wk(x)
        v = self.Wv(x)

        # reshape q, k, v to [batch_size, seq_len, h, dk]
        q = q.view(q.shape[0], q.shape[1], self.h, self.dk)
        k = k.view(k.shape[0], k.shape[1], self.h, self.dk)
        v = v.view(v.shape[0], v.shape[1], self.h, self.dk)
        
        # qk^T has shape [batch_size, h, seq_len, seq_len]
        qk_T = torch.einsum('bphd,bqhd->bhpq', q, k) / math.sqrt(self.dk)
        atten_scores = self.softmax(qk_T)  # attention scores has shape [batch_size, h, seq_len, seq_len]
        weighted_v = torch.einsum('bhpq,bqhd->bphd', atten_scores, v)

        weighted_v = weighted_v.contiguous().view(weighted_v.shape[0], weighted_v.shape[1], self.d_model)

        output = self.fc(weighted_v)
        return output

        

atten_layer = MultiHeadSelfAttentionLayer(dk=128, h=8)

# query = torch.rand(10,20,1024)
# key = torch.rand(10,20,1024)
# value = torch.rand(10,20,1024)
x = torch.rand(10,50,128*8)
y = atten_layer(x)
y.shape
# attented_values = atten_layer(query, key, value)
# attented_values.shape


torch.Size([10, 50, 1024])

In [7]:
a = []

if a:
    print('a')
else:
    print('b')

b


In [None]:
# beam seaerch


import numpy as np
from typing import Callable, List
import heapq

# vocab_size, beam_width, max_len = input().split()
vocab_size, beam_width, max_len = 8, 3, 10
vocab_size, beam_width, max_len = int(vocab_size), int(beam_width), int(max_len)
# prefix = [int(i) for i in input().split()]
prefix = [1,2,5]

# 用随机数模拟一个语言模型。它的输入 x 是整数组成的列表，表示输入给语言模型的单词 ID 序列；
# 输出是一个形如 [vocab_size] 的一维 numpy 数组，且满足概率归一化条件，表示下一个词的概率分布。
def language_model(x: List[int]) -> np.ndarray: # x 是整数 ID 形成的列表
    np.random.seed(sum(x))
    prob = np.random.rand(vocab_size)
    prob[0] += np.random.rand() / vocab_size  # 增大一下 eos_id (0) 的概率，避免解码停不下来
    prob = prob / prob.sum()  # 归一化一下，变成词表上的概率分布
    return prob  # np.ndarray of size [vocab_size]


# beam search
# 返回值是一个包含 beam_width 个元素的序列，其第 k 个元素表示概率第 k 大的解码结果（不定长的整数 ID 组成的列表）
def beam_search(model: Callable[[List[int]], np.ndarray], prefix: List[int], beam_width: int, eos_id=0, max_len=100) -> List[List[int]]:
    beam = [(np.log(1.0), prefix)]    # (seq, log_prob)
    for step in range(max_len-len(prefix)):
        candidates = []
        for score, seq in beam:
            if seq[-1] == eos_id:
                # candidates.append((score, seq))
                heapq.heappush(candidates, (score, seq))
                continue

            prob = model(seq)
            for i, p in enumerate(prob):
                # candidates.append((score + np.log(p+1e-9), seq + [i]))
                heapq.heappush(candidates, (score + np.log(p+1e-9), seq + [i]))
        # candidates.sort(key=lambda x: x[0], reverse=True)
        # candidates.sort(key=lambda x: x[1], reverse=True)
        # beam = candidates[:beam_width]
        beam = heapq.nlargest(beam_width, candidates, key=lambda x: x[0])

    return [seq for _, seq in beam]


# def beam_search(model: Callable[[List[int]], np.ndarray], prefix: List[int], beam_width: int, eos_id=0, max_len=100) -> List[List[int]]:
#     beam = [(np.log(1.0), prefix)]  # (log_prob, seq)
#     completed = []

#     for step in range(max_len - len(prefix)):
#         candidates = []
#         for score, seq in beam:
#             if seq[-1] == eos_id:  # If EOS, move to completed
#                 completed.append((score, seq))
#                 continue
            
#             prob = model(seq)  # Get next token probabilities
#             top_indices = np.argpartition(-prob, beam_width)[:beam_width]  # Fast top-k selection

#             for i in top_indices:
#                 p = prob[i] + 1e-9  # Avoid log(0) issues
#                 heapq.heappush(candidates, (score + np.log(p), seq + [i]))

#         # Keep only top-k candidates
#         beam = heapq.nlargest(beam_width, candidates, key=lambda x: x[0])

#         # Stop if no active beams left
#         if not beam:
#             break

#     # Return best results (include completed)
#     final_candidates = completed + beam
#     final_candidates.sort(key=lambda x: x[0], reverse=True)
    
#     return [seq for _, seq in final_candidates[:beam_width]]

result = beam_search(language_model, prefix=prefix, beam_width=beam_width, max_len=max_len)
print(result)
# for seq in result:
# 	print(' '.join([str(i) for i in seq]))


[[1, 2, 5, 0], [1, 2, 5, 1, 1, 0], [1, 2, 5, 1, 2, 3, 5, 7, 4, 6]]
