<a href="https://colab.research.google.com/github/arumajirou/-daily-test/blob/main/ETSformer_pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
#@title # **ライブラリのインストール**
#einopsライブラリは、テンソルを操作したり並べ替えたりするための関数やクラスを多数提供する。

!pip install einops

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting einops
  Downloading einops-0.6.0-py3-none-any.whl (41 kB)
[K     |████████████████████████████████| 41 kB 509 kB/s 
[?25hInstalling collected packages: einops
Successfully installed einops-0.6.0


In [3]:
#@title # **ライブラリのインポート**
#mathモジュールには、数学の定数π（約3.14）を表すπをはじめ、様々な数学関数が含まれています
from math import pi

#コレクションモジュールには、データのコレクションを扱うのに便利なデータ型が多数用意されており、namedtupleは、名前付きフィールドを持つtupleのサブクラスを作成することができる
from collections import namedtuple

#torchモジュールは、Pythonの深層学習ライブラリとして人気のあるPyTorchの一部です。これはテンソル（多次元配列）を扱い、それに対して様々な操作を行うための関数やクラスを提供します。
import torch

#nn.functionalモジュールもPyTorchの一部で、テンソルにニューラルネットワークの演算を適用するための関数を多数提供しています
import torch.nn.functional as F

#einsum関数は、アインシュタイン和算を行うためのユーティリティ関数であり、複数のインデックスに対する和をテンソル表記で簡潔に表現することができる。
from torch import nn, einsum

#scipy.fftpackモジュールは高速フーリエ変換 (FFT) を扱うための関数を提供します
#next_fast_len関数は、FFTの次の高速な長さを返します。
#これは、FFTアルゴリズムを使って計算するのがより速い長さです。
from scipy.fftpack import next_fast_len

#einopsライブラリは、テンソルを操作したり並べ替えたりするための関数やクラスを多数提供する。
#rearrange関数はテンソルの次元を並べ替えることができ、
#repeat関数はテンソルを1つまたは複数の次元に沿って繰り返すことができる。
from einops import rearrange, repeat

#Rearrangeクラスは、入力にrearrange関数を適用するPyTorchレイヤーです。
from einops.layers.torch import Rearrange


In [15]:

#@title  # **constants(定数)**

Intermediates = namedtuple('Intermediates', ['growth_latents', 'seasonal_latents', 'level_output'])
print(Intermediates)

<class '__main__.Intermediates'>


## **ChatGPTによるconstants(定数)コードの解説**


- Intermediatesは、growth_latents、seasonal_latents、level_outputの3つのフィールドを持つ名前付きタプルのように見えます。
- 名前付きタプルはフィールドに名前を付けたタプルのサブクラスで、Pythonの有効な識別子を使うことができます。
- 名前付きタプルは、辞書に似ていますが、より簡潔な構文と属性によってフィールドにアクセスする機能を持つ、単純なレコードを表現する便利な方法です。

例えば、次のような名前付きタプルを作成することができます。

In [16]:
#@title ##**constants(定数)コードの説明のため生成したChatGPTのコード**
from collections import namedtuple

Intermediates = namedtuple('Intermediates', ['growth_latents', 'seasonal_latents', 'level_output'])

intermediates = Intermediates(growth_latents=1, seasonal_latents=2, level_output=3)

print(intermediates.growth_latents)  # prints 1
print(intermediates.seasonal_latents)  # prints 2
print(intermediates.level_output)  # prints 3

1
2
3


また、通常のタプルと同様に、インデックスを使用して名前付きタプルのフィールドにアクセスすることができます。

In [17]:
#@title ## **constants(定数)コードの説明のため生成したChatGPTのコード**
print(intermediates[0])  # prints 1
print(intermediates[1])  # prints 2
print(intermediates[2])  # prints 3

1
2
3


- 名前付きタプルは不変であり、作成後にそのフィールドの値を変更することはできません。
- フィールドの値を変更する必要がある場合は、更新された値で新しい名前付きタプルを作成することができます。

In [12]:

#@title # **helper functions(ヘルパー関数)**

def exists(val):
    return val is not None


## **ChatGPTによるhelper functions(ヘルパー関数)コードの解説**
- これは、与えられた値がNoneでなければTrueを、そうでなければFalseを返す単純なユーティリティ関数のように見える。

例えば、こんな感じです。

In [13]:
#@title ## **helper functions(ヘルパー関数)コードの説明のため生成したChatGPTのコード**
val = None
print(exists(val))  # False

val = "hello"
print(exists(val))  # True

False
True


- この関数は、0,"",[],{}のような値に対しても真を返すことに注意してください、
- なぜならこれらはすべてPythonでは非Noneの値とみなされるからです。
- Noneだけをチェックしたいのであれば、代わりにis演算子を使用することができます。

In [14]:
#@title ## **helper functions(ヘルパー関数)コードの説明のため生成したChatGPTのコード**

val = 0
print(val is None)  # False

val = None
print(val is None)  # True

False
True


In [None]:

#@title # **fourier helpers(フーリエヘルパー関数)**

def fourier_extrapolate(signal, start, end):
    device = signal.device
    fhat = torch.fft.fft(signal)
    fhat_len = fhat.shape[-1]
    time = torch.linspace(start, end - 1, end - start, device = device, dtype = torch.complex64)
    freqs = torch.linspace(0, fhat_len - 1, fhat_len, device = device, dtype = torch.complex64)
    res = fhat[..., None, :] * (1.j * 2 * pi * freqs[..., None, :] * time[..., :, None] / fhat_len).exp() / fhat_len
    return res.sum(dim = -1).real


## **ChatGPTによるfourier helpers(フーリエヘルパー関数)コードの解説**
- この関数は、離散フーリエ変換（DFT）とその逆である逆DFT（IDFT）を実装し、与えられた信号シグナルを 開始から 終了までの時間範囲に渡って外挿するようなものである。

- DFTは信号を周波数成分に分解する数学的変換であり、IDFTはその逆で周波数成分から信号を再構成する変換である。
- DFTは次のように定義される。

 - $X[k] = \sum_{n=0}^{N-1} x[n] e^{-j \frac{2}{N} kn}$

- ここで、$x[n]$は入力信号、$X[k]$は出力周波数スペクトル、$N$は信号のサンプル数である。IDFTは逆変換として定義される。

 - $x[n] = \frac{1}{N}\X[k] e^{j ∕frac{2∕pi}{N} kn}$

- この関数では，入力信号の周波数スペクトルを複素数値のテンソルとして返すtorch.fft.fft関数を用いて DFT を計算します。
- これは $(1.j * 2 * pi * freqs[..., None, :] * time[..., :, None] / fhat_len)$ として定義され、最後の次元で sum(dim = -1) を使って合計を取ることによって計算されます。
- そして、結果の実数部はrealを使って返されます。

- 外挿は時間軸の範囲を始点から 終点まで拡張することで行われ、これにより再構成された信号は入力信号の元の範囲を超えて拡張される。
- 拡張された時間軸はtorch.linspace関数を使って作成され、与えられた始点と終点の間に等間隔の値を生成します

---


In [None]:

# classes

def InputEmbedding(time_features, model_dim, kernel_size = 3, dropout = 0.):
    return nn.Sequential(
        Rearrange('b n d -> b d n'),
        nn.Conv1d(time_features, model_dim, kernel_size = kernel_size, padding = kernel_size // 2),
        nn.Dropout(dropout),
        Rearrange('b d n -> b n d'),
    )

def FeedForward(dim, mult = 4, dropout = 0.):
    return nn.Sequential(
        nn.Linear(dim, dim * mult),
        nn.Sigmoid(),
        nn.Dropout(dropout),
        nn.Linear(dim * mult, dim),
        nn.Dropout(dropout)
    )

class FeedForwardBlock(nn.Module):
    def __init__(
        self,
        *,
        dim,
        **kwargs
    ):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.ff = FeedForward(dim, **kwargs)
        self.post_norm = nn.LayerNorm(dim)

    def forward(self, x):
        x = self.norm(x)
        return self.post_norm(x + self.ff(x))

# encoder related classes

## multi-head exponential smoothing attention

def conv1d_fft(x, weights, dim = -2, weight_dim = -1):
    # Algorithm 3 in paper

    N = x.shape[dim]
    M = weights.shape[weight_dim]

    fast_len = next_fast_len(N + M - 1)

    f_x = torch.fft.rfft(x, n = fast_len, dim = dim)
    f_weight = torch.fft.rfft(weights, n = fast_len, dim = weight_dim)

    f_v_weight = f_x * rearrange(f_weight.conj(), '... -> ... 1')
    out = torch.fft.irfft(f_v_weight, fast_len, dim = dim)
    out = out.roll(-1, dims = (dim,))

    indices = torch.arange(start = fast_len - N, end = fast_len, dtype = torch.long, device = x.device)
    out = out.index_select(dim, indices)
    return out

class MHESA(nn.Module):
    def __init__(
        self,
        *,
        dim,
        heads = 8,
        dropout = 0.
    ):
        super().__init__()
        self.heads = heads
        self.initial_state = nn.Parameter(torch.randn(heads, dim // heads))

        self.dropout = nn.Dropout(dropout)
        self.alpha = nn.Parameter(torch.randn(heads))

        self.project_in = nn.Linear(dim, dim)
        self.project_out = nn.Linear(dim, dim)

    def naive_Aes(self, x, weights):
        n, h = x.shape[-2], self.heads

        # in appendix A.1 - Algorithm 2

        arange = torch.arange(n, device = x.device)

        weights = repeat(weights, '... l -> ... t l', t = n)
        indices = repeat(arange, 'l -> h t l', h = h, t = n)

        indices = (indices - rearrange(arange + 1, 't -> 1 t 1')) % n

        weights = weights.gather(-1, indices)
        weights = self.dropout(weights)

        # causal

        weights = weights.tril()

        # multiply

        output = einsum('b h n d, h m n -> b h m d', x, weights)
        return output

    def forward(self, x, naive = False):
        b, n, d, h, device = *x.shape, self.heads, x.device

        # linear project in

        x = self.project_in(x)

        # split out heads

        x = rearrange(x, 'b n (h d) -> b h n d', h = h)

        # temporal difference

        x = torch.cat((
            repeat(self.initial_state, 'h d -> b h 1 d', b = b),
            x
        ), dim = -2)

        x = x[:, :, 1:] - x[:, :, :-1]

        # prepare exponential alpha

        alpha = self.alpha.sigmoid()
        alpha = rearrange(alpha, 'h -> h 1')

        # arange == powers

        arange = torch.arange(n, device = device)
        weights = alpha * (1 - alpha) ** torch.flip(arange, dims = (0,))

        if naive:
            output = self.naive_Aes(x, weights)
        else:
            output = conv1d_fft(x, weights)

        # get initial state contribution

        init_weight = (1 - alpha) ** (arange + 1)
        init_output = rearrange(init_weight, 'h n -> h n 1') * rearrange(self.initial_state, 'h d -> h 1 d')

        output = output + init_output

        # merge heads

        output = rearrange(output, 'b h n d -> b n (h d)')
        return self.project_out(output)

## frequency attention

class FrequencyAttention(nn.Module):
    def __init__(
        self,
        *,
        K = 4,
        dropout = 0.
    ):
        super().__init__()
        self.K = K
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        freqs = torch.fft.rfft(x, dim = 1)

        # get amplitudes

        amp = freqs.abs()
        amp = self.dropout(amp)

        # topk amplitudes - for seasonality, branded as attention

        topk_amp, _ = amp.topk(k = self.K, dim = 1, sorted = True)

        # mask out all freqs with lower amplitudes than the lowest value of the topk above

        topk_freqs = freqs.masked_fill(amp < topk_amp[:, -1:], 0.+0.j)

        # inverse fft

        return torch.fft.irfft(topk_freqs, dim = 1)

## level module

class Level(nn.Module):
    def __init__(self, time_features, model_dim):
        super().__init__()
        self.alpha = nn.Parameter(torch.Tensor([0.]))
        self.to_growth = nn.Linear(model_dim, time_features)
        self.to_seasonal = nn.Linear(model_dim, time_features)

    def forward(self, x, latent_growth, latent_seasonal):
        # following equation in appendix A.2

        n, device = x.shape[1], x.device

        alpha = self.alpha.sigmoid()

        arange = torch.arange(n, device = device)
        powers = torch.flip(arange, dims = (0,))

        # Aes for raw time series signal with seasonal terms (from frequency attention) subtracted out

        seasonal =self.to_seasonal(latent_seasonal)
        Aes_weights = alpha * (1 - alpha) ** powers
        seasonal_normalized_term = conv1d_fft(x - seasonal, Aes_weights)

        # auxiliary term

        growth = self.to_growth(latent_growth)
        growth_smoothing_weights = (1 - alpha) ** powers
        growth_term = conv1d_fft(growth, growth_smoothing_weights)

        return seasonal_normalized_term + growth_term

# decoder classes

class LevelStack(nn.Module):
    def forward(self, x, num_steps_forecast):
        return repeat(x[:, -1], 'b d -> b n d', n = num_steps_forecast)

class GrowthDampening(nn.Module):
    def __init__(
        self,
        dim,
        heads = 8
    ):
        super().__init__()
        self.heads = heads
        self.dampen_factor = nn.Parameter(torch.randn(heads))

    def forward(self, growth, *, num_steps_forecast):
        device, h = growth.device, self.heads

        dampen_factor = self.dampen_factor.sigmoid()

        # like level stack, it takes the last growth for forecasting

        last_growth = growth[:, -1]
        last_growth = rearrange(last_growth, 'b l (h d) -> b l 1 h d', h = h)

        # prepare dampening factors per head and the powers

        dampen_factor = rearrange(dampen_factor, 'h -> 1 1 1 h 1')
        powers = (torch.arange(num_steps_forecast, device = device) + 1)
        powers = rearrange(powers, 'n -> 1 1 n 1 1')

        # following Eq(2) in the paper

        dampened_growth = last_growth * (dampen_factor ** powers).cumsum(dim = 2)
        return rearrange(dampened_growth, 'b l n h d -> b l n (h d)')

# main class

class ETSFormer(nn.Module):
    def __init__(
        self,
        *,
        model_dim,
        time_features = 1,
        embed_kernel_size = 3,
        layers = 2,
        heads = 8,
        K = 4,
        dropout = 0.
    ):
        super().__init__()
        assert (model_dim % heads) == 0, 'model dimension must be divisible by number of heads'
        self.model_dim = model_dim
        self.time_features = time_features

        self.embed = InputEmbedding(time_features, model_dim, kernel_size = embed_kernel_size, dropout = dropout)

        self.encoder_layers = nn.ModuleList([])

        for ind in range(layers):
            is_last_layer = ind == (layers - 1)

            self.encoder_layers.append(nn.ModuleList([
                FrequencyAttention(K = K, dropout = dropout),
                MHESA(dim = model_dim, heads = heads, dropout = dropout),
                FeedForwardBlock(dim = model_dim) if not is_last_layer else None,
                Level(time_features = time_features, model_dim = model_dim)
            ]))

        self.growth_dampening_module = GrowthDampening(dim = model_dim, heads = heads)

        self.latents_to_time_features = nn.Linear(model_dim, time_features)
        self.level_stack = LevelStack()

    def forward(
        self,
        x,
        *,
        num_steps_forecast = 0,
        return_latents = False
    ):
        one_time_feature = x.ndim == 2

        if one_time_feature:
            x = rearrange(x, 'b n -> b n 1')

        z = self.embed(x)

        latent_growths = []
        latent_seasonals = []

        for freq_attn, mhes_attn, ff_block, level in self.encoder_layers:
            latent_seasonal = freq_attn(z)
            z = z - latent_seasonal

            latent_growth = mhes_attn(z)
            z = z - latent_growth

            if exists(ff_block):
                z = ff_block(z)

            x = level(x, latent_growth, latent_seasonal)

            latent_growths.append(latent_growth)
            latent_seasonals.append(latent_seasonal)

        latent_growths = torch.stack(latent_growths, dim = -2)
        latent_seasonals = torch.stack(latent_seasonals, dim = -2)

        latents = Intermediates(latent_growths, latent_seasonals, x)

        if num_steps_forecast == 0:
            return latents

        latent_seasonals = rearrange(latent_seasonals, 'b n l d -> b l d n')
        extrapolated_seasonals = fourier_extrapolate(latent_seasonals, x.shape[1], x.shape[1] + num_steps_forecast)
        extrapolated_seasonals = rearrange(extrapolated_seasonals, 'b l d n -> b l n d')

        dampened_growths = self.growth_dampening_module(latent_growths, num_steps_forecast = num_steps_forecast)
        level = self.level_stack(x, num_steps_forecast = num_steps_forecast)

        summed_latents = dampened_growths.sum(dim = 1) + extrapolated_seasonals.sum(dim = 1)
        forecasted = level + self.latents_to_time_features(summed_latents)

        if one_time_feature:
            forecasted = rearrange(forecasted, 'b n 1 -> b n')

        if return_latents:
            return forecasted, latents

        return forecasted

# classification wrapper

class MultiheadLayerNorm(nn.Module):
    def __init__(self, dim, heads = 1, eps = 1e-5):
        super().__init__()
        self.eps = eps
        self.g = nn.Parameter(torch.ones(heads, 1, dim))
        self.b = nn.Parameter(torch.zeros(heads, 1, dim))

    def forward(self, x):
        std = torch.var(x, dim = -1, unbiased = False, keepdim = True).sqrt()
        mean = torch.mean(x, dim = -1, keepdim = True)
        return (x - mean) / (std + self.eps) * self.g + self.b

class ClassificationWrapper(nn.Module):
    def __init__(
        self,
        *,
        etsformer,
        num_classes = 10,
        heads = 16,
        dim_head = 32,
        level_kernel_size = 3,
        growth_kernel_size = 3,
        seasonal_kernel_size = 3,
        dropout = 0.
    ):
        super().__init__()
        assert isinstance(etsformer, ETSFormer)
        self.etsformer = etsformer
        model_dim = etsformer.model_dim
        time_features = etsformer.time_features

        inner_dim = dim_head * heads
        self.scale = dim_head ** -0.5
        self.dropout = nn.Dropout(dropout)

        self.queries = nn.Parameter(torch.randn(heads, dim_head))

        self.growth_to_kv = nn.Sequential(
            Rearrange('b n d -> b d n'),
            nn.Conv1d(model_dim, inner_dim * 2, growth_kernel_size, bias = False, padding = growth_kernel_size // 2),
            Rearrange('... (kv h d) n -> ... (kv h) n d', kv = 2, h = heads),
            MultiheadLayerNorm(dim_head, heads = 2 * heads),
        )

        self.seasonal_to_kv = nn.Sequential(
            Rearrange('b n d -> b d n'),
            nn.Conv1d(model_dim, inner_dim * 2, seasonal_kernel_size, bias = False, padding = seasonal_kernel_size // 2),
            Rearrange('... (kv h d) n -> ... (kv h) n d', kv = 2, h = heads),
            MultiheadLayerNorm(dim_head, heads = 2 * heads),
        )

        self.level_to_kv = nn.Sequential(
            Rearrange('b n t -> b t n'),
            nn.Conv1d(time_features, inner_dim * 2, level_kernel_size, bias = False, padding = level_kernel_size // 2),
            Rearrange('b (kv h d) n -> b (kv h) n d', kv = 2, h = heads),
            MultiheadLayerNorm(dim_head, heads = 2 * heads),
        )

        self.to_out = nn.Linear(inner_dim, model_dim)

        self.to_logits = nn.Sequential(
            nn.LayerNorm(model_dim),
            nn.Linear(model_dim, num_classes)
        )

    def forward(self, timeseries):
        latent_growths, latent_seasonals, level_output = self.etsformer(timeseries)

        latent_growths = latent_growths.mean(dim = -2)
        latent_seasonals = latent_seasonals.mean(dim = -2)

        # queries, key, values

        q = self.queries * self.scale

        kvs = torch.cat((
            self.growth_to_kv(latent_growths),
            self.seasonal_to_kv(latent_seasonals),
            self.level_to_kv(level_output)
        ), dim = -2)

        k, v = kvs.chunk(2, dim = 1)

        # cross attention pooling

        sim = einsum('h d, b h j d -> b h j', q, k)
        sim = sim - sim.amax(dim = -1, keepdim = True).detach()

        attn = sim.softmax(dim = -1)
        attn = self.dropout(attn)

        out = einsum('b h j, b h j d -> b h d', attn, v)
        out = rearrange(out, 'b ... -> b (...)')

        out = self.to_out(out)

        # project to logits

        return self.to_logits(out)