In [4]:
import random
import os
import torch
import torch.nn as nn
import numpy as np
from loguru import logger
from torchprofile import profile_macs

def set_seed(seed: int = 42) -> None:
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # Set a fixed value for the hash seed
    os.environ["PYTHONHASHSEED"] = str(seed)
    print(f"Random seed set as {seed}")

set_seed()

Random seed set as 42


In [5]:
"""
Baseline Transformer Module

This module contains the implementation of a Transformer model for sign language tasks.

Classes:
- TokenEmbedding: Create embedding for the target seqeunce
- LandmarkEmbedding: Create embedding for the source(frames)seqeunce
- Encoder: Implements the transformer encoder stack.
- Decoder: Implements the transformer decoder stack.
- Transformer: The main transformer model class with methods for training and inference.

Methods:
- Transformer.generate: Perform inference on a new sequence
"""
import torch
from torch import nn


class TokenEmbedding(nn.Module):
    """Embed the tokens with postion encoding"""

    def __init__(self, num_vocab, maxlen, embedding_dim):
        """_summary_

        Parameters
        ----------
        num_vocab : int
            number of vocabulary
        maxlen : int
            maximuin length of sequence
        embedding_dim : int
            embedding output dimension
        """
        super().__init__()
        self.token_embed_layer = nn.Embedding(num_vocab, embedding_dim)
        self.postion_embed_layer = nn.Embedding(maxlen, embedding_dim)

    def forward(self, x):
        """_summary_

        Parameters
        ----------
        x : tensors
            _description_

        Returns
        -------
        tensors
            _description_
        """
        maxlen = x.size(-1)
        x = self.token_embed_layer(x)
        positions = torch.arange(0, maxlen).to(x.device)
        positions = self.postion_embed_layer(positions)
        return x + positions


class LandmarkEmbedding(nn.Module):
    """_summary_"""

    def __init__(self, embedding_dim):
        super().__init__()
        # Calculate the padding for "same" padding
        padding = (11 - 1) // 2

        # Define three 1D convolutional layers with ReLU activation and stride 2
        self.conv1 = nn.Conv1d(
            in_channels=1, out_channels=64, kernel_size=11, stride=2, padding=padding
        )
        self.conv2 = nn.Conv1d(
            in_channels=64, out_channels=128, kernel_size=11, stride=2, padding=padding
        )
        self.conv3 = nn.Conv1d(
            in_channels=128, out_channels=256, kernel_size=11, stride=2, padding=padding
        )

        # Output embedding layer
        self.embedding_layer = nn.Linear(256, embedding_dim)

    def forward(self, x):
        # Input x should have shape (batch_size, input_size, input_dim)
        x = x.unsqueeze(1)  # Add a channel dimension for 1D convolution

        # Apply convolutional layers with ReLU activation and stride 2
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))

        # Global average pooling to reduce spatial dimensions
        x = torch.mean(x, dim=2)

        # Apply the linear embedding layer
        x = self.embedding_layer(x)

        return x


class TransformerEncoder(nn.Module):
    """_summary_"""

    def __init__(
        self,
        embedding_dim,
        num_heads,
        feed_forward_dim,
        rate=0.1,
    ):
        """_summary_

        Parameters
        ----------
        embedding_dim : _type_
            _description_
        num_heads : _type_
            _description_
        feed_forward_dim : _type_
            _description_
        rate : float, optional
            _description_, by default 0.1
        """
        super().__init__()
        self.multi_attention = nn.MultiheadAttention(embedding_dim, num_heads)
        self.ffn = nn.Sequential(
            nn.Linear(embedding_dim, feed_forward_dim),
            nn.ReLU(),
            nn.Linear(feed_forward_dim, embedding_dim),
        )

        self.layernorm1 = nn.LayerNorm(embedding_dim, eps=1e-6)
        self.layernorm2 = nn.LayerNorm(embedding_dim, eps=1e-6)
        self.dropout1 = nn.Dropout(rate)
        self.dropout2 = nn.Dropout(rate)

    def forward(self, inputs_x):
        multi_attention_out, _ = self.multi_attention(inputs_x, inputs_x, inputs_x)
        multi_attention_out = self.dropout1(multi_attention_out)
        out1 = self.layernorm1(inputs_x + multi_attention_out)

        ffn_out = self.ffn(out1)
        ffn_out = self.dropout2(ffn_out)
        x = self.layernorm2(out1 + ffn_out)
        return x


class TransformerDecoder(nn.Module):
    """_summary_"""

    def __init__(self, embedding_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.num_heads_ = num_heads
        self.layernorm1 = nn.LayerNorm(embedding_dim, eps=1e-6)
        self.layernorm2 = nn.LayerNorm(embedding_dim, eps=1e-6)
        self.layernorm3 = nn.LayerNorm(embedding_dim, eps=1e-6)
        self.decoder_multi_attention = nn.MultiheadAttention(embedding_dim, num_heads)
        self.encoder_multi_attention = nn.MultiheadAttention(embedding_dim, num_heads)
        self.decoder_dropout = nn.Dropout(0.5)
        self.encoder_dropout = nn.Dropout(dropout_rate)
        self.ffn_dropout = nn.Dropout(dropout_rate)
        self.ffn = nn.Sequential(
            nn.Linear(embedding_dim, feed_forward_dim),
            nn.ReLU(),
            nn.Linear(feed_forward_dim, embedding_dim),
        )

    def _causal_attention_mask(self, sequence_length, batch_size=1, device=None):
        mask = torch.triu(torch.ones(sequence_length, sequence_length), diagonal=1).to(
            device
        )
        mask = mask.unsqueeze(0).expand(
            batch_size * self.num_heads_, sequence_length, sequence_length
        )
        return mask

    def forward(
        self,
        encoder_out,
        src_target_,
    ):
        input_shape = src_target_.size()
        batch_size = 1  # input_shape[0]
        seq_len = input_shape[0]
        x_device = src_target_.device

        # Mask
        causal_mask = self._causal_attention_mask(
            sequence_length=seq_len, batch_size=batch_size, device=x_device
        )

        target_att, _ = self.decoder_multi_attention(
            src_target_, src_target_, src_target_, attn_mask=causal_mask
        )
        target_norm_out = self.layernorm1(
            src_target_ + self.decoder_dropout(target_att)
        )

        encoder_out, _ = self.encoder_multi_attention(
            target_norm_out, encoder_out, encoder_out
        )
        enc_out_norm = self.layernorm2(encoder_out + self.encoder_dropout(encoder_out))

        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm


class ASLTransformer(nn.Module):
    def __init__(
        self,
        num_hidden_dim=64,
        multi_num_head=8,
        num_feed_forward=128,
        target_maxlen=64,
        num_layers_enc=4,
        num_layers_dec=4,
    ):
        """_summary_

        Parameters
        ----------
        num_hidden_dim : int, optional
            _description_, by default 64
        multi_num_head : int, optional
            _description_, by default 8
        num_feed_forward : int, optional
            _description_, by default 128
        target_maxlen : int, optional
            _description_, by default 64
        num_layers_enc : int, optional
            _description_, by default 4
        num_layers_dec : int, optional
            _description_, by default 4
        """
        super().__init__()
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = 62

        self.encoder_input = LandmarkEmbedding(embedding_dim=num_hidden_dim)
        self.decoder_input = TokenEmbedding(
            num_vocab=self.num_classes,
            embedding_dim=num_hidden_dim,
            maxlen=target_maxlen,
        )

        self.encoder = nn.Sequential(
            self.encoder_input,
            *[
                TransformerEncoder(
                    embedding_dim=num_hidden_dim,
                    num_heads=multi_num_head,
                    feed_forward_dim=num_feed_forward,
                )
                for _ in range(num_layers_enc)
            ],
        )

        for i in range(num_layers_dec):
            self.add_module(
                f"decoder_layer_{i}",
                TransformerDecoder(
                    embedding_dim=num_hidden_dim,
                    num_heads=multi_num_head,
                    feed_forward_dim=num_feed_forward,
                ),
            )

        self.classifier = nn.Linear(
            in_features=num_hidden_dim, out_features=self.num_classes
        )

    def _decoder_run(self, enc_out, target):
        decoder_out = self.decoder_input(target)
        for i in range(self.num_layers_dec):
            decoder_out = getattr(self, f"decoder_layer_{i}")(enc_out, decoder_out)
        return decoder_out

    def forward(self, source, target):
        if len(source.shape) == 2:  # Check if single input
            source = source.unsqueeze(0)  # Add batch dimension
        if len(target.shape) == 1:  # Check if single input
            target = target.unsqueeze(0)  # Add batch dimension

        encoder_out = self.encoder(source)
        transformer_output = self._decoder_run(encoder_out, target)
        return self.classifier(transformer_output)

    def generate(self, source, target_start_token_idx=60):
        if len(source.shape) == 2:  # Check if single input
            source = source.unsqueeze(0)  # Add batch dimension

        encoder_out = self.encoder(source)
        decoder_input = (
            torch.ones((source.shape[0], 1), dtype=torch.long)
            .to(source.device)
            * target_start_token_idx
        )
        dec_logits = []

        for _ in range(self.target_maxlen - 1):
            decoder_out = self._decoder_run(encoder_out, decoder_input)
            logits = self.classifier(decoder_out)

            logits = torch.argmax(logits, dim=-1, keepdim=True)
            last_logit = logits[:, -1]
            dec_logits.append(last_logit)
            decoder_input = torch.cat([decoder_input, last_logit], dim=-1)

        return decoder_input.squeeze(0) if len(source.shape) == 2 else decoder_input

In [7]:
# Create a sample input
batch_source_sequence = torch.randn(2, 128, 345)  # Sample source sequence (batch_size, maxlen, num_hid)
batch_target_sequence = torch.randint(0, 60, (2, 64))  # Sample target sequence (batch_size, maxlen)
single_src_seq = torch.rand(128,345)
single_trg_seq = torch.randint(0,60,(64,))

try:
    # Instantiate the Transformer model
    transformer_model = ASLTransformer(
        num_hidden_dim=200,
        multi_num_head= 4,
        num_feed_forward=400,
        target_maxlen=64,
        num_layers_enc=2,
        num_layers_dec=1,)

    # Forward pass to get predictions
    predictions = transformer_model(single_src_seq, single_trg_seq)

    # Print the shape of the predictions
    print(f"final {predictions.shape}")
    
except Exception as error:
    logger.exception(f" ERROR Message ==> {error}")

[32m2023-12-11 03:21:41.460[0m | [31m[1mERROR   [0m | [36m__main__[0m:[36m<module>[0m:[36m24[0m - [31m[1m ERROR Message ==> Expected 2D (unbatched) or 3D (batched) input to conv1d, but got input of size: [1, 1, 128, 345][0m
[33m[1mTraceback (most recent call last):[0m

  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Users\Yinka\AppData\Local\Programs\Python\Python311\Lib\site-packages\ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
    │   └ <bound method Application.launch_instance of <class 'ipykernel.kernelapp.IPKernelApp'>>
    └ <module 'ipykernel.kernelapp' from 'c:\\Users\\Yinka\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\site-packages\\ipyker...
  File "c:\Users\Yinka\AppData\Local\Programs\Python\Python311\Lib\site-packages\traitlets\config\application.py", line 1041, in launch_instance
    app.start()
    │   └ <function IPKernelApp.start at 0x0000017524A5C40

In [8]:
import torch

# Your original sequence tensor
original_tensor = torch.randn(118, 346)

# Define the desired output shape
desired_shape = (128, 346)

# Calculate the padding on the first dimension from the bottom
padding_bottom = max(0, desired_shape[0] - original_tensor.size(0))

# Pad the tensor along the first dimension from the bottom
padded_tensor = torch.nn.functional.pad(original_tensor, (0, 0, 0, padding_bottom))

# Now, padded_tensor has the shape (128, 346)
print(padded_tensor.shape)


torch.Size([128, 346])


In [9]:
original_tensor[115:,:5]

tensor([[ 0.0490,  1.5782, -0.0793, -0.8889, -0.6999],
        [ 0.3881,  1.1002, -0.7594, -1.0423,  1.1450],
        [ 2.1911,  0.6852,  0.7096, -1.1343, -0.3205]])

In [10]:
padded_tensor[115:,:5]

tensor([[ 0.0490,  1.5782, -0.0793, -0.8889, -0.6999],
        [ 0.3881,  1.1002, -0.7594, -1.0423,  1.1450],
        [ 2.1911,  0.6852,  0.7096, -1.1343, -0.3205],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000]])