In [3]:
import torch.nn as nn
import torch

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, seq_len, temperature = 10000):
        '''
        d_model: feature dimension (default = 768)
        seq_len: sequence length
        '''
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.temperature = temperature
        

    def forward(self):
        pos = torch.arange(self.seq_len, dtype=torch.float32).unsqueeze(1)              # pos = [[0], [1], ..., [seq_len-1]]
        i = torch.arange(self.d_model // 2, dtype=torch.float32).unsqueeze(0)           # i = [[0, 1, ..., d_model/2 - 1]]

        # Compute the positional encodings
        angle_rates = 1 / (self.temperature ** (2 * i / self.d_model))
        pos_encoding = torch.zeros(self.seq_len, self.d_model, dtype=torch.float32)
        pos_encoding[:, 0::2] = torch.sin(pos * angle_rates)
        pos_encoding[:, 1::2] = torch.cos(pos * angle_rates)

        # Add a dimension for batch size
        pos_encoding = pos_encoding.unsqueeze(0)

        # Disable gradient because PE are not learnable parameters
        pos_encoding.requires_grad_(False)

        return pos_encoding     # pos_encoding = [1, seq_len, 768]    

In [4]:
from transformers import ViTModel
from transformers import ViTImageProcessor
import math

class InputEmbeddings(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, frames):
        '''
        frames: sequence of PIL Image
        input_embed: (batch, seq_len, d_model)
        '''
        model_name = 'google/vit-base-patch16-224'
        model = ViTModel.from_pretrained(model_name)
        processor = ViTImageProcessor.from_pretrained(model_name)
        input_embed = []
        for frame in frames:
            inputs = processor(images=frame, return_tensors='pt')
            pixel_values = inputs.pixel_values                      # pixel_values = [1, 3, 224, 224]
            with torch.no_grad():
                output = model(pixel_values)
                output = output.last_hidden_state[:, 0]             # Get the [CLS] output, shape = [1, 768]
                input_embed.append(output)
        input_embed = torch.cat(input_embed, dim=0)                 # input_embed = [seq_len, 768]
        input_embed = input_embed.unsqueeze(0)                      # input_embed = [1, seq_len, 768] (Add batch dimension)
        d_model = input_embed.shape[-1]
        # Scale the embeddings
        input_embed = input_embed * math.sqrt(d_model)
        
        return input_embed        # input_embed = [1, seq_len, 768]

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
class LayerNormalization(nn.Module):
    def __init__(self, d_model, epsilon=10**-6):
        super().__init__()
        self.epsilon = epsilon
        self.alpha = nn.Parameter(torch.ones(d_model))      # alpha is a learnable parameter
        self.bias = nn.Parameter(torch.zeros(d_model))      # bias is a learnable parameter

    def forward(self, x):
        '''
        Args:
            x: (batch, seq_len, d_model)
            return: normalized x (batch, seq_len, d_model)
        '''
        mean = x.mean(dim=-1, keepdim=True)         # (batch, seq_len, 1)
        std = x.std(dim=-1, keepdim=True)           # (batch, seq_len, 1)
        return self.alpha * (x-mean) / (std + self.epsilon) + self.bias     # (batch, seq_len, d_model)

In [6]:
class FeedForwardBlock(nn.Module):
    def __init__(self, d_model, d_ff, dropout):
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.linear_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        '''
        x: (batch, seq_len, d_model)
        '''
        output = self.linear_1(x)       # output: (batch, seq_len, d_ff)
        output = torch.relu(output)     
        output = self.dropout(output)
        output = self.linear_2(output)  # output: (batch, seq_len, d_model)
        
        return output

In [91]:
class MultiHeadAttentionBlock(nn.Module):
    def __init__(self, d_model, num_heads, dropout):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads

        w_q = [nn.Linear(d_model, d_model) for _ in range(num_heads)]      # w_q = (num_heads, d_model, d_model)
        w_k = [nn.Linear(d_model, d_model) for _ in range(num_heads)]      # w_k = (num_heads, d_model, d_model)
        w_v = [nn.Linear(d_model, d_model) for _ in range(num_heads)]      # w_v = (num_heads, d_model, d_model)
        
        self.w_q = nn.ModuleList(w_q)
        self.w_k = nn.ModuleList(w_k)
        self.w_v = nn.ModuleList(w_v)
        self.w_o = nn.Linear(num_heads * d_model, d_model, bias=False)      # w_o = (num_heads * d_model, d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        '''
        x: (batch, seq_len, d_model)
        '''
        q = [layer(x) for layer in self.w_q]        # q[i] = (batch, seq_len, d_model)
        k = [layer(x) for layer in self.w_k]        # k[i] = (batch, seq_len, d_model)
        v = [layer(x) for layer in self.w_v]        # v[i] = (batch, seq_len, d_model)
        q, k, v = torch.stack(q), torch.stack(k), torch.stack(v)        # q, k, v = (num_heads, batch, seq_len, d_model)
        q, k, v = q.permute(1, 0, 2, 3)                                 # q, k, v = (batch, num_heads, seq_len, d_model)

        k_transpose = k.transpose(-2, -1)           # k_transpose = (batch, num_heads, d_model, seq_len)
        attention_scores = q @ k_transpose          # attention_score = (batch, num_heads, seq_len, seq_len)
        
        # Normalise the attention scores
        attention_scores = attention_scores / self.d_model      # attention_scores = (batch, num_heads, seq_len, seq_len)
        
        # Apply softmax to attention scores
        attention_scores = attention_scores.softmax(dim=-1)     # attention_scores = (batch, num_heads, seq_len, seq_len)

        # Dropout
        if self.dropout is not None:
            attention_scores = self.dropout(attention_scores)   # attention_scores = (batch, num_heads, seq_len, seq_len)

        # Calculate all heads
        heads = attention_scores @ v                            # heads = (batch, num_heads, seq_len, d_model)

        # Concatenate heads along the seq_len dimension
        heads = heads.transpose(1, 2)                                           # heads = (batch, seq_len, num_heads, d_model)
        heads = heads.contiguous().view(heads.shape[0], heads.shape[1], -1)     # heads = (batch, seq_len, num_heads * d_model)

        # Linear transform with output weights
        output = self.w_o(heads)                                # output = (batch, seq_len, d_model)

        return output

In [101]:
class AddNormBlock(nn.Module):
    def __init__(self, d_model, dropout):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = LayerNormalization(d_model)
    
    def forward(self, x, sublayer, *args, **kwargs):
        """
        Apply residual connection to any sublayer with the same size.
        x: Input tensor
        sublayer: A function representing the sublayer (e.g., multi-head attention, feed-forward)
        args: Additional positional arguments for the sublayer
        kwargs: Additional keyword arguments for the sublayer
        """
        return self.norm(x + self.dropout(sublayer(x, *args, **kwargs)))

In [102]:
class EncoderBlock(nn.Module):
    def __init__(self, mhsa_block: MultiHeadAttentionBlock, 
                 feed_forward_block: FeedForwardBlock, 
                 d_model: int,
                 dropout: float):
        super().__init__()
        self.mhsa_block = mhsa_block
        self.feed_forward_block = feed_forward_block
        self.add_norm_block = nn.ModuleList([AddNormBlock(d_model, dropout) for _ in range(2)])
        
    def forward(self, x):
        '''
        Args:
            x: input [batch, seq_len, d_model]
        '''
        x = self.add_norm_block[0](x, lambda x: self.mhsa_block(x))
        x = self.add_norm_block[1](x, self.feed_forward_block)
        return x 

In [103]:
class Encoder(nn.Module):
    def __init__(self, d_model: int, layers: nn.ModuleList):
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(d_model)
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        
        return self.norm(x)

In [None]:
class PredictionLayer(nn.Module):
    def __init__(self, d_model, dropout, vit_num_features=197, d1=512, d2=256):
        '''
        Args:
            d_model: feature dimension of an input embedding
            vit_num_features: the number of features produced by ViT model
            d1, d2: dimensions of the two hidden layers in PredictionLayer

        '''
        super().__init__()
        self.fc1 = nn.Linear(d_model, d1)
        self.fc2 = nn.Linear(d1, d2)
        self.fc3 = nn.Linear(d2, vit_num_features * d_model)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        '''
        Args:
            x: (batch, seq_len, d_model)
        '''
        x = self.relu(self.fc1(x))      # (batch, seq_len, d_model) x (d_model, d1) = (batch, seq_len, d1)
        x = self.dropout(x)
        x = self.relu(self.fc2(x))      # (batch, seq_len, d1) x (d1, d2) = (batch, seq_len, d2)
        x = self.dropout(x)
        x = self.fc3(x)                 # (batch, seq_len, d2) x (d2, vit_num_features * d_model) = (batch, seq_len, vit_num_features * d_model)

        return x

In [None]:
class SCPModel(nn.Module):
    def __init__(self, encoder: Encoder, pred_layer: PredictionLayer, src_embed: InputEmbeddings, src_pos: PositionalEncoding):
        super().__init__()
        self.encoder = encoder
        self.pred_layer = pred_layer
        self.src_embed = src_embed
        self.src_pos = src_pos

    def forward(self, src):
        '''
        Args:
            src: 'n' frames
        '''

        # Get input embedding
        src = self.src_embed(src)           # src = (batch, seq_len, d_model)

        # Get positional encoding
        pos_encoding = self.src_pos()       # pos_encoding = (batch, seq_len, d_model)

        # Add input embedding + positional encoding to generate the complete input
        input = src + pos_encoding          # input = (batch, seq_len, d_model)

        # Get output from the encoder module
        output = self.encoder(input)        # output = (batch, seq_len, d_model)

        # Get output from the prediction layer module
        output = self.pred_layer(output)    # output = (batch, embed_num_features * d_model)

        # Pass the complete input into Encoder and return the result
        return output

In [None]:
def build_model(d_model, seq_len, N = 6, h = 8, dropout = 0.1, d_ff = 2048):
    '''
    d_model: feature dimension of an input embedding
    seq_len: length of the input sequence
    N: number of encoder blocks in the model
    h: number of heads for multi-head self-attention
    d_ff: the dimension of the hidden layer of Feed Forward Block
    '''
    # Input embedding layer
    src_embed = InputEmbeddings()
    
    # Positional encoding layer
    pos_enc = PositionalEncoding(d_model, seq_len)

    # Create the encoder blocks
    encoder_blocks = []
    for _ in range(N):
        mhsa_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(mhsa_block, feed_forward_block, d_model, dropout)
        encoder_blocks.append(encoder_block)
    
    # Create the encoder 
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))

    # Create the prediction layer
    pred_layer = PredictionLayer(d_model, dropout)

    # Create the Semantic Concentration Encoder
    model = SCPModel(encoder, pred_layer, src_embed, pos_enc)

    # Initialise the parameters of the model
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    
    return model

# Testing scripts

In [16]:
# Example usage
batch = 1
d_model = 2  # Example value
seq_len = 3  # Example value

input = torch.rand(batch, seq_len, d_model)

fcn = YourModel(d_model, seq_len)

output = fcn(input)

print(output.shape)  # Should print torch.Size([32, 1, 197, 512])

torch.Size([1, 1, 197, 2])


In [93]:
num_heads, seq_len, d_model = 3, 4, 5

# Initialize q and k with random values
q = torch.randn(num_heads, seq_len, d_model)
k = torch.randn(num_heads, seq_len, d_model)
v = torch.randn(num_heads, seq_len, d_model)

k_transpose = k.transpose(-2, -1) 

result = q @ k_transpose
result = result / 2

masks = torch.randint(0, 2, (3, 4, 4))
result = torch.where(masks == 1, result, torch.tensor(float('-inf')))
result = result.softmax(dim=-1)
result

tensor([[[0.0715, 0.5544, 0.3741, 0.0000],
         [0.0026, 0.0000, 0.0000, 0.9974],
         [0.2822, 0.1855, 0.3516, 0.1807],
         [0.8689, 0.0290, 0.1021, 0.0000]],

        [[1.0000, 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 1.0000, 0.0000],
         [0.0000, 0.0000, 1.0000, 0.0000],
         [0.9349, 0.0000, 0.0000, 0.0651]],

        [[0.0000, 0.0000, 1.0000, 0.0000],
         [0.3121, 0.0000, 0.1527, 0.5352],
         [0.0000, 0.0000, 0.0000, 1.0000],
         [0.2662, 0.0000, 0.7338, 0.0000]]])

In [94]:
dropout = nn.Dropout(0.5)
result = dropout(result)
result

tensor([[[0.1430, 1.1089, 0.7482, 0.0000],
         [0.0000, 0.0000, 0.0000, 1.9948],
         [0.0000, 0.0000, 0.7032, 0.3614],
         [0.0000, 0.0000, 0.2042, 0.0000]],

        [[2.0000, 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 2.0000, 0.0000],
         [0.0000, 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000, 0.0000]],

        [[0.0000, 0.0000, 2.0000, 0.0000],
         [0.0000, 0.0000, 0.3054, 0.0000],
         [0.0000, 0.0000, 0.0000, 0.0000],
         [0.5324, 0.0000, 0.0000, 0.0000]]])

In [97]:
d_model = 128
num_heads = 8
dropout = 0.1
attention_block = MultiHeadAttentionBlock(d_model, num_heads, dropout)

seq_len = 10
x = torch.rand(seq_len, d_model)
masks = torch.ones(num_heads, seq_len, seq_len)  # Adjust masks as necessary for your tests

output = attention_block(x, masks)
expected_shape = (seq_len, num_heads * d_model)

print(type(output))
print(output.shape)

<class 'torch.Tensor'>
torch.Size([10, 1024])


In [98]:
from PIL import Image
import requests
from torchvision import transforms

url = 'http://images.cocodataset.org/val2017/000000039769.jpg'
images = []
transform = transforms.ToTensor()
for _ in range(4):
    im = Image.open(requests.get(url, stream=True).raw)
    im = transform(im)
    images.append(im)

tensor_images = torch.stack(images)
tensor_images.shape

torch.Size([4, 3, 480, 640])

In [99]:
IE = InputEmbeddings()
input = IE(tensor_images)
_, seq_len, d_model = input.shape

PE = PositionalEncoding(d_model, seq_len)
pe = PE()

Some weights of the model checkpoint at google/vit-base-patch16-224 were not used when initializing ViTModel: ['classifier.bias', 'classifier.weight']
- This IS expected if you are initializing ViTModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ViTModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [21]:
input = input + pe.to(device)
input.shape

torch.Size([1, 4, 768])

In [22]:
ff = FeedForwardBlock(d_model, 200, 0.1).to(device)
output = ff(input)
output.shape

torch.Size([1, 4, 768])