In [8]:
from transformers import MBartForConditionalGeneration, MBart50TokenizerFast
import torch 
from torch.nn.utils.rnn import pad_sequence
# Load pre-trained MBART model and tokenizer (MBART-50 for multilingual tasks)
model_name = "../mbart_model"
tokenizer = MBart50TokenizerFast.from_pretrained(model_name)
model = MBartForConditionalGeneration.from_pretrained(model_name)

# Example batch of input sentences in various languages
batch_sentences = [
    "Hello, how are you?",   # English
    "What have you been up to recently?", # French
    "Do you want to go for a run?",    # Spanish
]



# Tokenize the input batch of sentences
inputs = tokenizer(batch_sentences, return_tensors="pt", padding=True, truncation=True)

# Generate translations (for example, to French) or any other target language
# Specify the target language for the model to generate in
forced_bos_token_id = tokenizer.lang_code_to_id["zh_CN"]

# Perform inference with the model to generate translations
outputs = model.generate(inputs['input_ids'], attention_mask=inputs['attention_mask'], 
                         forced_bos_token_id=forced_bos_token_id)

predictions = [] 
for i in range(len(outputs)): 
    predictions.append(outputs[i, :])

pad_tensor = torch.ones(200-len(predictions[0]))
predictions[0] = torch.cat((predictions[0],pad_tensor.long()),dim = 0)
predictions = pad_sequence(predictions,batch_first=True,padding_value=1)

# Decode the generated outputs back to text
translated_sentences = tokenizer.batch_decode(predictions, skip_special_tokens=True)

# Print the generated translations
for i, translation in enumerate(translated_sentences):
    print(f"Original: {batch_sentences[i]}")
    print(f"Translated: {translation}")
    print()

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Original: Hello, how are you?
Translated: 你好,你好吗?

Original: What have you been up to recently?
Translated: 你最近做了些什么?

Original: Do you want to go for a run?
Translated: 你想跑吗?



## Testing LLM adaptor 2 shape


In [27]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence

# Define a dummy PAD_IDX for padding purposes
PAD_IDX = 0

class LLMAdapter2(nn.Module):
    '''
    LLM adapter aims to capture temporal relations and transform 32 tokens into 1024 tokens.
    This version introduces an additional projection layer between the two convolution layers.
    '''
    def __init__(self, num_tokens=32, hidden_dim=1024, kernel_size=5):
        super(LLMAdapter2, self).__init__()
        
        # Store parameters
        self.num_tokens = num_tokens
        self.hidden_dim = hidden_dim
        
        # First projection from input tokens to hidden_dim/2
        self.proj = nn.Linear(self.num_tokens, self.hidden_dim // 2)

        # First convolutional block
        self.conv_block_1 = nn.Sequential(
            nn.Conv1d(self.hidden_dim // 2, self.hidden_dim // 2, kernel_size=kernel_size, stride=1, padding=0),
            nn.BatchNorm1d(self.hidden_dim // 2),
            nn.ReLU(inplace=True),
            nn.AvgPool1d(kernel_size=2, ceil_mode=False)
        )

        # New projection layer between convolution layers
        self.intermediate_proj = nn.Linear(self.hidden_dim // 2, self.hidden_dim)

        # Second convolutional block
        self.conv_block_2 = nn.Sequential(
            nn.Conv1d(self.hidden_dim, self.hidden_dim, kernel_size=kernel_size, stride=1, padding=0),
            nn.BatchNorm1d(self.hidden_dim),
            nn.ReLU(inplace=True),
            nn.AvgPool1d(kernel_size=2, ceil_mode=False)
        )


    def forward(self, x, src_length):
        # Input shape: (batch_size, num_frames, num_tokens)
        
        # Split the input into individual batches according to src_length
        start = 0
        x_batch = []
        for length in src_length:
            end = start + length
            x_batch.append(x[start:end])
            start = end
        
        # Pad sequences to ensure uniform batch sizes
        x = pad_sequence(x_batch, padding_value=PAD_IDX, batch_first=True)
        print(f"After padding: {x.shape}")  # Check padding result (batch_size, num_frames, num_tokens)
        
        # Apply the initial projection layer
        x = self.proj(x)  # Shape: (batch_size, num_frames, hidden_dim / 2)
        print(f"After initial projection: {x.shape}")  # Should be (batch_size, num_frames, 512)
        
        # Permute to (batch_size, hidden_dim / 2, num_frames) for Conv1d
        x = x.permute(0, 2, 1)
        print(f"After permute (before first conv): {x.shape}")  # Should be (batch_size, 512, num_frames)
        
        # First convolutional block
        x = self.conv_block_1(x)  # Shape: (batch_size, hidden_dim / 2, reduced_num_frames)
        print(f"After first conv block: {x.shape}")  # Check after first conv
        
        # Apply the intermediate projection layer
        x = x.permute(0, 2, 1)  # Back to (batch_size, reduced_num_frames, hidden_dim / 2)
        x = self.intermediate_proj(x)  # Shape: (batch_size, reduced_num_frames, hidden_dim)
        print(f"After intermediate projection: {x.shape}")  # Should be (batch_size, reduced_num_frames, 1024)
        x = x.permute(0, 2, 1)  # Back to (batch_size, hidden_dim, reduced_num_frames)
        print(f"After permute (before second conv): {x.shape}")  # Check before second conv
        
        # Second convolutional block
        x = self.conv_block_2(x)  # Shape: (batch_size, hidden_dim, further_reduced_num_frames)
        print(f"After second conv block: {x.shape}")  # Check after second conv
        
        # Convert back to (batch_size, further_reduced_num_frames, hidden_dim)
        x = x.permute(0, 2, 1)
        print(f"Final output shape: {x.shape}")  # Should be (batch_size, further_reduced_num_frames, 1024)

        return x

# Create an instance of LLMAdapter2
model = LLMAdapter2()

# Test input
batch_size = 10
num_frames = 50  # Let's assume each sequence has 15 frames
num_tokens = 32  # As specified in the model

# Random test tensor simulating a batch of 10 sequences, each with 15 frames and 32 tokens
test_input = torch.rand((batch_size * num_frames, num_tokens))

# Source lengths for each batch (assuming all sequences have 15 frames)
src_length = torch.tensor([num_frames] * batch_size)

# Forward pass
output = model(test_input, src_length)

After padding: torch.Size([10, 50, 32])
After initial projection: torch.Size([10, 50, 512])
After permute (before first conv): torch.Size([10, 512, 50])
After first conv block: torch.Size([10, 512, 23])
After intermediate projection: torch.Size([10, 23, 1024])
After permute (before second conv): torch.Size([10, 1024, 23])
After second conv block: torch.Size([10, 1024, 9])
Final output shape: torch.Size([10, 9, 1024])


## Testing LLM adaptor 3 shape

In [32]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence

# Define a dummy PAD_IDX for this example
PAD_IDX = 0

class LLMAdapter3(nn.Module):
    def __init__(self, num_tokens=32, hidden_dim=1024, kernel_size=5):
        super(LLMAdapter3, self).__init__()
        self.num_tokens = num_tokens
        self.hidden_dim = hidden_dim
        
        # Temporal convolution over the time dimension
        self.temporal_conv = nn.Sequential(
            nn.Conv1d(self.num_tokens, self.num_tokens * 2, kernel_size=5, stride=1, padding=0),
            nn.BatchNorm1d(self.num_tokens * 2),  # Channels must match Conv1d output channels
            nn.ReLU(inplace=True),
            # Reduce kernel size for pooling to avoid sequence collapse
            nn.AvgPool1d(kernel_size=1, ceil_mode=False),  

            nn.Conv1d(self.num_tokens * 2, self.num_tokens * 4, kernel_size=5, stride=1, padding=0),
            nn.BatchNorm1d(self.num_tokens * 4),  # Channels must match Conv1d output channels
            nn.ReLU(inplace=True),
            nn.AvgPool1d(kernel_size=1, ceil_mode=False)  # Adjusted pooling to avoid reducing size to zero
        )
        
        # Final projection layer
        self.final_proj = nn.Sequential(
            nn.Linear(self.num_tokens * 4, self.hidden_dim)
        )
        self.out = nn.Sequential(nn.BatchNorm1d(self.hidden_dim),
            nn.ReLU(inplace=True))

    def forward(self, x, src_length):
        start = 0
        x_batch = []
        for length in src_length:
            end = start + length
            x_batch.append(x[start:end])
            start = end
        print(f"Before padding: {x.shape}") 
        x = pad_sequence(x_batch, padding_value=PAD_IDX, batch_first=True)
        print(f"After padding: {x.shape}")  # Print shape after padding
        
        # Permute to match Conv1d expected shape: (batch_size, channels, sequence_length)
        x = x.permute(0, 2, 1)
        print(f"After permute: {x.shape}")  # Shape should now be (batch_size, num_tokens, num_frames)
        
        # Apply temporal convolution
        x = self.temporal_conv(x)
        print(f"After temporal_conv: {x.shape}")  # Check shape after convolution
        
        # Permute back to (batch_size, sequence_length, hidden_dim)
        x = x.permute(0, 2, 1)
        print(f"After second permute: {x.shape}")  # Shape should be (batch_size, num_frames, num_tokens*4)
        
        # Apply final projection (we need to flatten or reshape input to match Linear input requirements)
        batch_size, seq_len, hidden_dim = x.shape
        x = self.final_proj(x)
        #x = self.final_proj(x.reshape(batch_size * seq_len, hidden_dim))
        print(f"After final_proj: {x.shape}")  # Check final shape

        print(f"before out shape : {x.shape}")
        x = self.out(x.permute(0, 2, 1)).permute(0, 2, 1)
        return x

# Create an instance of LLMAdapter3
model = LLMAdapter3()

# Test input
batch_size = 10
num_frames = 50 # Let's assume each sequence has 15 frames
num_tokens = 32  # As specified in the model

# Random test tensor simulating a batch of 10 sequences, each with 15 frames and 32 tokens
test_input = torch.rand((batch_size * num_frames, num_tokens))

# Source lengths for each batch (assuming all sequences have 15 frames)
src_length = torch.tensor([num_frames] * batch_size)

# Forward pass
output = model(test_input, src_length)

Before padding: torch.Size([500, 32])
After padding: torch.Size([10, 50, 32])
After permute: torch.Size([10, 32, 50])
After temporal_conv: torch.Size([10, 128, 42])
After second permute: torch.Size([10, 42, 128])
After final_proj: torch.Size([10, 42, 1024])
before out shape : torch.Size([10, 42, 1024])


### Test some generation

In [None]:
import torch 
import torch.nn as nn

In [None]:
## Take in a configuration 
config = OmegaConf.load("E:\SLT_FYP\final_product\configs\Sign2Text_CSL_config_v3.yaml")

# Load the model 




# Load the dataset and dataloader 



# Set the evaluation loop with 1 break point



# Generate the predictions and store them in a list

