In [50]:
import sys
import os
import math
sys.path.append(os.path.abspath(".."))

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from dltime.base.layers import Conv1dSame

In [70]:
def scaled_dot_product_attention(q, k, v, mask=None):
    """
    自注意力机制
    - q: query, shape: [..., seq_len_q, depth]
    - k: key, shape: [..., seq_len_k, depth]
    - v: value, shape: [..., seq_len_v, depth_v], seq_len_k == seq_len_v
    有seq_len_q个query, seq_len_k个key, 计算其注意力值及其输出
    """
    # q, k做矩阵乘法, 得到各个query查询各个key得到的value
    matmul_qk = torch.matmul(q, k.transpose(-1, -2)) # [..., seq_len_q, seq_len_k]
    
    # 将得到的value除以sqrt(d_k), 使其不至于太大, 不然输入到softmax后容易导致梯度消失
    dk = torch.tensor(k.shape[-1], dtype=torch.float32) # d_k
    scaled_attention_logits = matmul_qk / torch.sqrt(dk)

    # 需要 mask 的位置加上一个很大的负值, 使其输入到softmax之后对应概率为0
    if mask is not None:
        scaled_attention_logits += (mask * -1e9).unsqueeze(-2)
    
    # 计算Attention权重矩阵
    attention_weights = F.softmax(scaled_attention_logits, dim=-1) # [..., seq_len_q, seq_len_k]
    
    # 各个value按Attention矩阵加权, 得到各个query对应的最终输出
    output = torch.matmul(attention_weights, v) # [..., seq_len_q, depth_v]
    return output, attention_weights 


In [36]:
class ConvSelfAttention(torch.nn.Module):
    def __init__(self, c_in, c_out=256, filter_map={1: 32, 3: 32, 5: 64, 7: 64, 9: 32, 11: 32}):
        super(ConvSelfAttention, self).__init__()
        self.c_in = c_in
        self.c_out = c_out
        self.filter_map = filter_map

        self.wq = Conv1dSame(c_in, c_out, ks=1, stride=1)
        self.wk = nn.ModuleList([Conv1dSame(c_in, co, ks=ks, stride=1) for ks, co in filter_map.items()])
        self.wv = nn.ModuleList([Conv1dSame(c_in, co, ks=ks, stride=1) for ks, co in filter_map.items()])

        self.final_linear = nn.Linear(c_out, c_out)

    def forward(self, q, k, v, mask):  # q=k=v=x [b, seq_len, embedding_dim] embedding_dim其实也=d_model

        q = self.wq(q)  # =>[bs, d_model, seq_len]
        k = torch.cat([conv(k) for conv in self.wk], dim=1)  # =>[bs, d_model, seq_len]
        v = torch.cat([conv(v) for conv in self.wv], dim=1)  # =>[bs, d_model, seq_len]

        scaled_attention, attention_weights = scaled_dot_product_attention(\
            q.transpose(-1, -2), k.transpose(-1, -2), v.transpose(-1, -2), mask)
        # => [b, seq_len_q, d_model], [b, seq_len_q, d_model]
        output = self.final_linear(scaled_attention)  # =>[b, seq_len_q, d_model=512]
        return output.transpose(-1, -2), attention_weights  # [b, d_model, seq_len], [b, seq_len_q, seq_len_k]

In [24]:
def _get_activation_fn(activation):
    if activation == "relu":
        return F.relu
    elif activation == "gelu":
        return F.gelu
    raise ValueError("activation should be relu/gelu, not {}".format(activation))

In [28]:
model = ConvSelfAttention(c_in=6, c_out=512)
x = torch.rand(64, 6, 96) # [b,seq_len,d_model,embedding_dim]
print(x.shape)
out, attn_weights = model(x, x, x, mask=None)
print(out.shape, attn_weights.shape) # [1, 60, 512], [1, 8, 60, 60]

torch.Size([64, 6, 96])
torch.Size([64, 512, 96])
torch.Size([64, 512, 96])
torch.Size([64, 512, 96])
torch.Size([64, 512, 96]) torch.Size([64, 96, 96])


In [60]:
class TransformerConvAttnBNEncoderLayer(nn.Module):
    r"""This transformer encoder layer block is made up of self-attn and feedforward network.
    It differs from TransformerEncoderLayer in torch/nn/modules/transformer.py in that it replaces LayerNorm
    with BatchNorm.

    Args:
        d_model: the number of expected features in the input (required).
        nhead: the number of heads in the multiheadattention models (required).
        dim_feedforward: the dimension of the feedforward network model (default=2048).
        dropout: the dropout value (default=0.1).
        activation: the activation function of intermediate layer, relu or gelu (default=relu).
    """

    def __init__(self, d_model=256, dim_feedforward=512, dropout=0.1, activation="relu"):
        super(TransformerConvAttnBNEncoderLayer, self).__init__()
        self.self_attn = ConvSelfAttention(d_model, d_model)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.BatchNorm1d(d_model, eps=1e-5)  # normalizes each feature across batch samples and time steps
        self.norm2 = nn.BatchNorm1d(d_model, eps=1e-5)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)

    def __setstate__(self, state):
        if 'activation' not in state:
            state['activation'] = F.relu
        super(TransformerConvAttnBNEncoderLayer, self).__setstate__(state)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        r"""Pass the input through the encoder layer.

        Args:
            src: the sequence to the encoder layer (required).
            src_mask: the mask for the src sequence (optional).
            src_key_padding_mask: the mask for the src keys per batch (optional).

        Shape:
            see the docs in Transformer class.
        """
        src2 = self.self_attn(src, src, src, mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)  # (batch_size, d_model, seq_len)
        src = self.norm1(src)
        src = src.permute(0, 2, 1)  # restore (batch_size, seq_len, d_model)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)  # (batch_size, d_model, d_model)
        src = src.permute(0, 2, 1)
        src = self.norm2(src)       # (batch_size, d_model, seq_len)
        return src

In [49]:
model = TransformerConvAttnBNEncoderLayer(d_model=256)
x = torch.rand(64, 256, 96) # [b,seq_len,d_model,embedding_dim]
print(x.shape)
out = model(x)
print(out.shape) # [1, 60, 512], [1, 8, 60, 60]

torch.Size([64, 256, 96])
torch.Size([64, 256, 96])


In [92]:
from dltime.models.ts_transformer import get_pos_encoder
class TSTransformerEncoderGAPClassifier(nn.Module):
    """
    Simplest classifier/regressor. Can be either regressor or classifier because the output does not include
    softmax. Concatenates final layer embeddings and uses 0s to ignore padding embeddings in final output layer.
    """

    def __init__(self, feat_dim, max_len, d_model, num_layers, dim_feedforward, num_classes,
                 dropout=0.1, pos_encoding='fixed', activation='gelu', freeze=False):
        super(TSTransformerEncoderGAPClassifier, self).__init__()

        self.max_len = max_len
        self.d_model = d_model

        self.project_inp = nn.Linear(feat_dim, d_model)
        self.pos_enc = get_pos_encoder(pos_encoding)(d_model, dropout=dropout*(1.0 - freeze), max_len=max_len)

        encoder_layer = TransformerConvAttnBNEncoderLayer(d_model, dim_feedforward, dropout*(1.0 - freeze), activation=activation)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)

        self.act = _get_activation_fn(activation)

        self.dropout1 = nn.Dropout(dropout)

        self.feat_dim = feat_dim
        self.num_classes = num_classes
        self.output_layer = nn.Linear(d_model, num_classes)

    def forward(self, X, padding_masks):
        """
        Args:
            X: (batch_size, feat_dim, seq_len) torch tensor of masked features (input)
            padding_masks: (batch_size, seq_length) boolean tensor, 1 means keep vector at this position, 0 means padding
        Returns:
            output: (batch_size, num_classes)
        """
        inp = X.permute(0, 2, 1)    # [bs, seq_len, d_in]
        inp = self.project_inp(inp) * math.sqrt(
            self.d_model)           # [bs, seq_len, d_model]
        inp = inp.permute(1, 0, 2)  # [seq_len, bs, d_model]
        inp = self.pos_enc(inp)     # add positional encoding
        inp = inp.permute(1, 2, 0)  # [bs, d_model, seq_len]

        output = self.transformer_encoder(inp, src_key_padding_mask=padding_masks)  # (batch_size, d_model, seq_length)
        output = self.act(output)   # the output transformer encoder/decoder embeddings don't include non-linearity
        output = self.dropout1(output)

        # Output
        gap_weight = F.softmax(padding_masks * -1e9, dim=-1).unsqueeze(-1)
        output = torch.bmm(output, gap_weight).squeeze()
        output = self.output_layer(output)  # (batch_size, num_classes)

        return output

In [None]:
# bs, seq_len, 1 bs, d_model, seq_len 

In [93]:
model = TSTransformerEncoderGAPClassifier(feat_dim=6, max_len=384, d_model=256, num_layers=2, dim_feedforward=512, num_classes=3)
x = torch.rand(64, 6, 384) # [b,seq_len,d_model,embedding_dim]
mask = torch.cat([torch.zeros(64, 96), torch.ones(64, 384-96)], dim=1)
print(x.size(), mask.size())
out = model(x, mask)
print(out.shape) # [1, 60, 512], [1, 8, 60, 60]

torch.Size([64, 6, 384]) torch.Size([64, 384])
torch.Size([64, 3])
