In [None]:
# THIS IS NOT THE FINAL VERSION(NON WORKING VERSION AS  OF NOW)

In [1]:
import numpy as np
import torch
import math
from torch import nn
import torch.nn.functional as F

def get_device():
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        self.d_model = d_model
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        """
        Args:
            x: Tensor, shape [batch_size, seq_len, embedding_dim]
        """
        # Add positional encoding to the input
        x = x + self.pe[:x.size(1)]
        return x

class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape = parameters_shape
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta = nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y + self.beta
        return out

class VectorEmbedding(nn.Module):
    #For a given vector embedding, process it for the transformer
    def __init__(self, input_dim, d_model, max_sequence_length, drop_prob=0.1):
        super().__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.max_sequence_length = max_sequence_length
        self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
        self.dropout = nn.Dropout(p=drop_prob)
        
    def forward(self, x):
        # x shape: [batch_size, sequence_length, input_dim]
        x = self.input_projection(x)
        x = self.position_encoder(x)
        x = self.dropout(x)
        return x

FEATURE EXTRACTOR

In [3]:
class TemporalMultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        
        # For creating Q, K, V matrices
        self.queries = nn.Linear(d_model, d_model)
        self.keys = nn.Linear(d_model, d_model)
        self.values = nn.Linear(d_model, d_model)
        
        # W_u for concatenation of heads as shown in equation (11)
        self.Wu = nn.Linear(d_model, d_model)
        
    def forward(self, x, mask=None):
        batch_size, sequence_length, d_model = x.size()
        
        # Create Q, K, V matrices for input nodes
        Q = self.queries(x)
        K = self.keys(x)
        V = self.values(x)
        
        # Reshape for multi-head processing
        Q = Q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        K = K.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        V = V.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        
        # Transpose for attention computation
        Q = Q.permute(0, 2, 1, 3)  # [batch_size, num_heads, seq_len, head_dim]
        K = K.permute(0, 2, 1, 3)
        V = V.permute(0, 2, 1, 3)
        
        # Compute scaled dot-product as in equation (12)
        attention_scores = torch.matmul(Q, K.transpose(-2, -1))
        attention_scores = attention_scores / math.sqrt(self.head_dim)
        
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)
        
        # Apply softmax to get head_i
        attention_weights = F.softmax(attention_scores, dim=-1)
        
        # Multiply by V to complete head_i calculation
        context = torch.matmul(attention_weights, V)  # [batch_size, num_heads, seq_len, head_dim]
        
        # Reshape back and concatenate heads
        context = context.permute(0, 2, 1, 3)  # [batch_size, seq_len, num_heads, head_dim]
        context = context.reshape(batch_size, sequence_length, self.d_model)
        
        # Apply W_u to complete equation (11): MultiHead_i = W_u · concat([head_i0, ..., head_ih])
        output = self.Wu(context)
        
        return output

class SeparableConvolution(nn.Module):
    def __init__(self, d_model, hidden, drop_prob):
        super().__init__()
        # Depthwise convolution (applies filters to each input channel separately)
        self.depthwise_conv = nn.Conv1d(d_model, d_model, kernel_size=3, padding=1, groups=d_model)
        
        # Pointwise convolutions (1x1 convolutions)
        self.pointwise_conv1 = nn.Conv1d(d_model, hidden, kernel_size=1)
        self.pointwise_conv2 = nn.Conv1d(hidden, d_model, kernel_size=1)
        
        self.activation = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)
        
    def forward(self, x):
        # x shape: [batch_size, sequence_length, d_model]
        # Transpose for Conv1d which expects [batch, channels, seq_len]
        x = x.transpose(1, 2)
        
        # Apply depthwise convolution
        x = self.depthwise_conv(x)
        
        # Apply first pointwise convolution with activation
        x = self.pointwise_conv1(x)
        x = self.activation(x)
        
        # Apply second pointwise convolution
        x = self.pointwise_conv2(x)
        x = self.dropout(x)
        
        # Transpose back to original format
        x = x.transpose(1, 2)
        
        return x

ENCODER


In [4]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super().__init__()
        # First sub-layer: temporal self-attention
        self.attention = TemporalMultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        
        # Second sub-layer: separable convolution (replacing FFN)
        self.separable_conv = SeparableConvolution(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, self_attention_mask=None):
        # First sub-layer with residual connection and layer normalization
        residual_x = x.clone()
        x = self.attention(x, mask=self_attention_mask)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        
        # Second sub-layer with residual connection and layer normalization
        residual_x = x.clone()
        x = self.separable_conv(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

class Encoder(nn.Module):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 input_dim):
        super().__init__()
        self.vector_embedding = VectorEmbedding(input_dim, d_model, max_sequence_length, drop_prob)
        self.layers = nn.ModuleList([EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) 
                                   for _ in range(num_layers)])

    def forward(self, x, self_attention_mask=None):
        x = self.vector_embedding(x)
        for layer in self.layers:
            x = layer(x, self_attention_mask)
        return x


DECODER

In [5]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super().__init__()
        # First sub-layer: masked temporal self-attention
        self.masked_attention = TemporalMultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        
        # Second sub-layer: multi-head attention over encoder output
        self.encoder_decoder_attention = TemporalMultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)
        
        # Third sub-layer: separable convolution (replacing FFN)
        self.separable_conv = SeparableConvolution(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, encoder_output, self_attention_mask=None, cross_attention_mask=None):
        # First sub-layer with residual connection and layer normalization
        residual = x.clone()
        x = self.masked_attention(x, mask=self_attention_mask)
        x = self.dropout1(x)
        x = self.norm1(x + residual)
        
        # Second sub-layer with residual connection and layer normalization
        residual = x.clone()
        x = self.encoder_decoder_attention(
            x, 
            encoder_output, 
            mask=cross_attention_mask
        )
        x = self.dropout2(x)
        x = self.norm2(x + residual)
        
        # Third sub-layer with residual connection and layer normalization
        residual = x.clone()
        x = self.separable_conv(x)
        x = self.dropout3(x)
        x = self.norm3(x + residual)
        
        return x

class Decoder(nn.Module):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 output_dim):
        super().__init__()
        self.vector_embedding = VectorEmbedding(output_dim, d_model, max_sequence_length, drop_prob)
        self.positional_encoding = PositionalEncoding(d_model, max_sequence_length)
        self.layers = nn.ModuleList([DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) 
                                   for _ in range(num_layers)])

    def forward(self, x, encoder_output, self_attention_mask=None, cross_attention_mask=None):
        # Apply vector embedding
        x = self.vector_embedding(x)
        
        # Apply positional encoding as per equations (13) and (14)
        x = self.positional_encoding(x)
        
        # Pass through decoder layers
        for layer in self.layers:
            x = layer(x, encoder_output, self_attention_mask, cross_attention_mask)
        
        return x

TEMPORAL_TRANSFORMER

In [None]:
class TemporalTransformer(nn.Module):
    def __init__(self,
                input_dim,
                output_dim,
                d_model=32,
                ffn_hidden=64,
                num_heads=4,
                drop_prob=0.1,
                num_layers=3,
                max_sequence_length=100):
        super().__init__()
        self.encoder = Encoder(
            d_model=d_model,
            ffn_hidden=ffn_hidden,
            num_heads=num_heads,
            drop_prob=drop_prob,
            num_layers=num_layers,
            max_sequence_length=max_sequence_length,
            input_dim=input_dim
        )
        
        self.decoder = Decoder(
            d_model=d_model,
            ffn_hidden=ffn_hidden,
            num_heads=num_heads,
            drop_prob=drop_prob,
            num_layers=num_layers,
            max_sequence_length=max_sequence_length,
            output_dim=output_dim
        )
        
        self.output_projection = nn.Linear(d_model, output_dim)
        self.device = get_device()

    def forward(self,
                src,
                tgt,
                encoder_self_attention_mask=None,
                decoder_self_attention_mask=None,
                decoder_cross_attention_mask=None):
        """
        Args:
            src: input sequence of shape [batch_size, seq_len, input_dim]
            tgt: target sequence of shape [batch_size, seq_len, output_dim]
        Returns:
            output: predicted sequence of shape [batch_size, seq_len, output_dim]
        """
        # Encode the input sequence
        encoder_output = self.encoder(src, encoder_self_attention_mask)
        
        # Decode to get output sequence
        decoder_output = self.decoder(
            tgt,
            encoder_output,
            self_attention_mask=decoder_self_attention_mask,
            cross_attention_mask=decoder_cross_attention_mask
        )
        
        # Project to output dimension
        output = self.output_projection(decoder_output)
        
        return output

    def predict(self, src, max_length):
        #Generate output sequence given input sequence
        self.eval()
        with torch.no_grad():
            # Encode the input sequence
            encoder_output = self.encoder(src)
            
            # Initialize output sequence with zeros
            batch_size = src.size(0)
            output_dim = self.decoder.vector_embedding.input_projection.in_features
            outputs = torch.zeros(batch_size, max_length, output_dim).to(self.device)
            
            # Generate output tokens one by one
            for t in range(max_length):
                # Get predictions up to current time step
                current_output = outputs[:, :t+1, :]
                
                # Decode
                decoder_output = self.decoder(
                    current_output,
                    encoder_output,
                    self_attention_mask=self.get_decoder_mask(current_output.size(1))
                )
                
                # Get next prediction
                next_output = self.output_projection(decoder_output[:, -1, :])
                outputs[:, t, :] = next_output
                
        return outputs