In [48]:
import importlib.metadata
import json
import logging
import os
import re
import tempfile
import time
import ast
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, TypeVar, Union
import math
import aeon


In [29]:
import os
import numpy as np
import aeon
from aeon.datasets import load_from_tsfile

In [4]:
DATA_PATH = "DATA/"

In [252]:
train_x, train_y = aeon.datasets.load_from_tsfile(DATA_PATH + "Blink_TRAIN.ts")
test_x, test_y = aeon.datasets.load_from_tsfile(DATA_PATH + "Blink_TEST.ts")

train_x, train_y = np.array(train_x), np.array(train_y)
test_x, test_y = np.array(test_x), np.array(test_y)

#reshape from (sample, feat_dim, seq_length) to (seq_length, sample, feat_dim)
train_x, test_x = np.transpose(train_x, (2, 0, 1)), np.transpose(test_x, (2, 0, 1))

# Separate x dimensions into 2 modalities
m1_train_x = train_x[:, :, :2]
m2_train_x = train_x[:, :, 2:]
m1_train_y, m2_train_y = train_y, train_y

#preserve labels
m1_test_x = test_x[:, :2, :]
m2_test_x = test_x[:, 2:, :]
m1_test_y, m2_test_y = test_y, test_y

print(m1_train_x.shape, m1_train_y.shape)

(510, 500, 2) (500,)


In [253]:
class PositionalEncoding(torch.nn.Module):
    r"""
    Implemented from "Language Modeling with nn.Transformer and TorchText" 

    To inject positional information into the embeddings, we use add a embedding based on the mapping of sin/cosine to our original embedding. 
    REMARKS: do we need to add this if our representations already host positional information?

    Args: 
        d_model: dimension of the embeddings, where embedding is shape [n_sample, seq_length, embedding_dim (d_model)]
    """

    def __init__(self, d_model: int, dropout: float = 0.1, seq_len: int = 5000):
        super().__init__()
        self.dropout = torch.nn.Dropout(p=dropout)

        position = torch.arange(seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(seq_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Optional[Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]]) -> torch.Tensor:
        """
        Arguments:
            x: Tensor, shape ``[n_sample, seq_length, embedding_dim]``
        """
        print(self.pe.shape)
        print(x.shape)

        #this transformation is for [n_sample, seq_length, embedding_dim]
        x = x + self.pe[:, :x.size(1)]


        #this transformation is for [seq_length n_sample, embedding_dim]
        #x = x + self.pe[:x.size(0)]
        
        return self.dropout(x)

In [254]:
pe_ = PositionalEncoding(d_model = 2, seq_len = 510)

In [255]:
hell0 = pe_(torch.Tensor(m1_train_x))

torch.Size([510, 1, 2])
torch.Size([510, 500, 2])


In [295]:
class cross_attn_block(torch.nn.Module):
    r"""
    Single Block for Cross Attention

    Args: 
        m1: first modality
        m2: second modality

    Shapes: 
        m1: (seq_length, N_samples, N_features)
        m2: (seq_length, N_samples, N_features)

    Returns: 
        embedding of m1 depending on attending on certain elements of m2, multihead_attn(k_m1, v_m1, q_m2)
    """

    def __init__(self, 
                 dim: int, 
                 heads: int, 
                 dropout: float, 
                 seq_length: int):

        super(cross_attn_block, self).__init__()

        self.positional_encoding = PositionalEncoding(dim, dropout, seq_length)

        #learnable
        self._to_key = torch.nn.Linear(dim, dim)
        self._to_query = torch.nn.Linear(dim, dim)
        self._to_value = torch.nn.Linear(dim, dim)

        self.attn = torch.nn.MultiheadAttention(embed_dim = dim, num_heads = heads, dropout = dropout)

    def forward(self, 
                m1: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]] = None, 
                m2: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]] = None, 
                mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        
        m1_x = self.positional_encoding(m1)
        m2_x = self.positional_encoding(m2)
        print("passed encoding")

        m1_k = self._to_key(m1_x)
        m1_v = self._to_query(m1_x)
        m2_q = self._to_value(m2_x)
        print("passed kqv")

        #crossing
        cross_x, attn_weights = self.attn(m1_k, m1_v, m2_q)
        print("passed attn:", cross_x.shape)

        return cross_x


class position_wise_ffn(torch.nn.Module):
    r"""
    Position-wise feed-forward network with a RELU activation - essentially contracts output, and squeezes it back to the same space

    ARGS:
        dim: dimension of the embeddings
        hidden_dim: dimension of the inflated hidden layer in feed-forward network
    
    """

    def __init__(self, 
                 dim: int, 
                 hidden_dim: int, 
                 dropout: float = 0.0):
        super(position_wise_ffn, self).__init__()

        self.ffn_1 = torch.nn.Linear(dim, hidden_dim)
        self.ffn_2 = torch.nn.Linear(hidden_dim, dim)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        x = self.ffn_1(x).relu()
        x = self.ffn_2(x)

        return x


class cross_attn_channel(torch.nn.Module):
    r"""
    Model for Cross Attention, architecture implementation taken from encoder layer of "Attention is all you need"
    Includes multi-head attn with crossing --> add + norm --> positionwise ffn --> add + norm --> output (based on paper)

    ARGS:
        dim_m1: time series modality 1
        dim_m2: time series modality 2

    Shapes:
        assuming seq_length is same for both
        m1: (seq_length, N_samples, N_features)
        m2: (seq_length, N_samples, N_features)
    """

    def __init__(self, 
                 dim_m1: int, 
                 dim_m2: int, 
                 outdim_m1: int, 
                 outdim_m2: int,
                 heads: Optional[int], 
                 seq_len: int, 
                 dropout: float = 0.0):
        super(cross_attn_channel, self).__init__()

        self.m1_cross_m2 = cross_attn_block(dim = dim_m1, heads = heads, dropout = dropout, seq_length = seq_len)
        self.m2_cross_m1 = cross_attn_block(dim = dim_m2, heads = heads, dropout = dropout, seq_length = seq_len)

        self.norm_m1 = torch.nn.LayerNorm(dim_m1)
        self.norm_m2 = torch.nn.LayerNorm(dim_m2)

        self.m1_pffn = position_wise_ffn(dim_m1, 512)
        self.m2_pffn = position_wise_ffn(dim_m2, 512)

        self.norm_pffn_m1 = torch.nn.LayerNorm(dim_m1)
        self.norm_pffn_m2 = torch.nn.LayerNorm(dim_m2)

        self.dropout = torch.nn.Dropout(dropout)



    def forward(self, 
                m1: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]] = None, 
                m2: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]] = None, 
                mask: Optional[torch.Tensor] = None) -> torch.Tensor:

        m1_x = self.m1_cross_m2(m1, m2)
        m2_x = self.m2_cross_m1(m2, m1)

        m1_x = self.norm_m1(m1 + self.dropout(m1_x))
        m2_x  = self.norm_m2(m2 + self.dropout(m2_x))

        m1_ffn = self.m1_pffn(m1_x)
        m2_ffn = self.m2_pffn(m2_x)

        m1_x = self.norm_pffn_m1(m1_x + self.dropout(m1_ffn))
        m2_x = self.norm_pffn_m2(m2_x + self.dropout(m2_ffn))

        return m1_x, m2_x



In [296]:
print(m2_train_x.shape, m1_train_x.shape)

(510, 500, 2) (510, 500, 2)


In [297]:
cross_attn_channel_ = cross_attn_channel(dim_m1 = m1_train_x.shape[-1], dim_m2 = m2_train_x.shape[-1], outdim_m1=16, outdim_m2=16, heads = 2, seq_len =  m2_train_x.shape[0])

In [299]:
m1, m2 = cross_attn_channel_(torch.Tensor(m1_train_x), torch.Tensor(m2_train_x))

torch.Size([510, 1, 2])
torch.Size([510, 500, 2])
torch.Size([510, 1, 2])
torch.Size([510, 500, 2])
passed encoding
passed kqv
passed attn: torch.Size([510, 500, 2])
torch.Size([510, 1, 2])
torch.Size([510, 500, 2])
torch.Size([510, 1, 2])
torch.Size([510, 500, 2])
passed encoding
passed kqv
passed attn: torch.Size([510, 500, 2])


In [301]:
m1.shape

torch.Size([510, 500, 2])

In [146]:
m2_train_x.shape[-1]

510

In [288]:
encoder_layer = torch.nn.TransformerEncoderLayer(d_model=512, nhead=2)
transformer_encoder = torch.nn.TransformerEncoder(encoder_layer, num_layers=2)
src = torch.rand(10, 32, 512)
out = transformer_encoder(src)

classifier = torch.nn.Linear(512, 2)
out = classifier(out)



In [289]:
out.shape

torch.Size([10, 32, 2])