In [2]:
import torch
from torch import nn, Tensor
from transformers import (
    RobertaTokenizer,
    RobertaModel,
    RobertaConfig,
    RobertaForSequenceClassification,
)
import torch.nn.functional as F  # noqa: N812


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(
        self,
        embed_dim: int = 768,
        num_heads: int = 8,
        dropout: float = 0.1,
        weight_kind: str = "softmax",
    ):
        super().__init__()
        self.layer_q = nn.Linear(embed_dim, embed_dim)
        self.layer_k = nn.Linear(embed_dim, embed_dim)
        self.layer_v = nn.Linear(embed_dim, embed_dim)
        self.layer_output = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        if weight_kind == "softmax":
            self.get_weight = nn.Softmax(dim=3)
        elif weight_kind == "tanh":
            self.get_weight = nn.Tanh()
        else:
            raise ValueError(f"Invalid value of `weight_kind`: {weight_kind}")
        return

    def forward(self, query: Tensor, key: Tensor, value: Tensor, mask: Tensor = None) -> Tensor:
        query = self.layer_q(query)
        key = self.layer_k(key)
        value = self.layer_v(value)

        query = query.view(-1, query.size(1), self.num_heads, self.head_dim)
        key = key.view(-1, key.size(1), self.num_heads, self.head_dim)
        value = value.view_as(key)

        query = query.permute(0, 2, 1, 3).contiguous()  # (-1 x num_heads x T1 x d_h)
        key = key.permute(0, 2, 3, 1).contiguous()  # (-1 x num_heads x d_h x T2)
        value = value.permute(0, 2, 1, 3).contiguous()  # (-1 x num_heads x T2 x d_h)

        attention = query @ key  # (-1 x num_heads x T1 x T2)
        if mask is not None:
            attention = attention.masked_fill(mask, -1e10)  # (-1 x num_heads x T1 x T2)
        attention = self.dropout(self.get_weight(attention))  # (-1 x num_heads x T1 x T2)

        output: Tensor
        output = attention @ value  # (-1 x num_heads x T1 x d_h)
        output = output.permute(0, 2, 1, 3).contiguous()  # (-1 x T1 x num_heads x d_h)
        output = output.view(-1, output.size(1), self.embed_dim)  # (-1 x T1 x d)
        output = self.layer_output(output)  # (-1 x T1 x d)

        return output

In [None]:
class Aggregater(nn.Module):
    def __init__(self, embed_dim, dropout):
        super().__init__()
        self.attention_weight_layer = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_output = nn.Linear(embed_dim, embed_dim)
        return

    def forward(self, ts_embed: Tensor, text_embed: Tensor) -> Tensor:
        # (-1 x 1 x d), (-1 x T x d)
        output: Tensor
        output = self.attention_weight_layer(text_embed)  # (-1 x T x d)
        output = output.permute(0, 2, 1).contiguous()  # (-1 x d x T)
        output = ts_embed @ output  # (-1 x 1 x T)

        output = self.dropout(output.softmax(2))  # (-1 x 1 x T)
        output = output @ text_embed  # (-1 x 1 x d)
        output = self.layer_output(output)  # (-1 x 1 x d)
        return

In [None]:
class MyModel(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.roberta_config = RobertaConfig.from_pretrained(config.roberta.pretrained_path)
        self.keyword_encoder = None  # TODO

        self.ts_encoder = DLinear()  # TODO

        self.aggregater = MultiHeadAttention(**config.aggregater)
        self.layer_norm1 = nn.LayerNorm(None)  # TODO
        self.context_keyword_attention = MultiHeadAttention(**config.context_keyword_attention)
        self.layer_norm2 = nn.LayerNorm(None)  # TODO
        self.final_ffn = nn.Sequential(  # (-1 x d_embed) -> (-1 x H)
            nn.Linear(None, None),
            nn.ReLU(),
            nn.Dropout(config.dropout),
            nn.Linear(None, config.forecast_size),
        )
        return

    def get_ouptut(
        self,
        input_ts: Tensor,
        input_text: Tensor,
        input_kw: Tensor,
    ):
        # `input_ts`: (-1 x T x d1) time series data (need to be normalized)
        # `input_text`: (-1 x max_sents x d_embed) output of RobertaForSequenceClassification.roberta
        # # good representation for sentiment analysis pretrained on twitter data
        # # max_sents: set as 30, padding of sentences applied
        # `input_kw`: (-1 x n_kw x d_embed) keyword data

        # step 1: make representations for time series, text (already done), and keywords
        rep_both = self.ts_encoder(input_ts)  # (-1 x d_embed)
        rep_kw = self.keyword_encoder(input_kw)

        # step 2: aggregate time series & text representations to make context vector
        # # considers relationship between time series history and sentimental representation of text
        temp = self.aggregater(rep_both, input_text, input_text, mask=None)  # (-1 x 1 x d_embed)
        rep_both = self.layer_norm1(rep_both + temp)
        # TODO: add keyword extraction

        # step 3: aggregate context vector & keyword representations to make final output
        output: Tensor
        output = self.context_keyword_attention(rep_both, rep_kw, rep_kw, mask=None)  # (-1 x 1 x d_embed)
        output = self.layer_norm2(rep_both + output)
        # sum of keywords with attention weights \in (-1, 1)

        # step 4: feed forward network
        output = output.squeeze(1)  # (-1 x d_embed)
        output = self.final_ffn(output)  # (-1 x H)

        return output

    def forward(self):
        return

    @torch.no_grad()
    def predict(self):
        return

    @torch.no_grad()
    def validate_batch(self):
        return