## LSTF-linear
- https://today-1.tistory.com/60

In [1]:
import os
import sys
import time
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
from scipy import stats
import warnings; warnings.filterwarnings('ignore')
#plt.style.use('ggplot')
plt.style.use('seaborn-whitegrid')
%matplotlib inline

# Options for pandas
pd.options.display.max_columns = 200
pd.set_option("display.max_rows", 100)

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [3]:
class LTSF_Linear(nn.Module):
    def __init__(self, window_size, forecast_size, individual, feature_size):
        super(LTSF_Linear, self).__init__()
        self.window_size = window_size
        self.forecast_size = forecast_size
        self.individual = individual
        self.channels = feature_size
        
        if self.individual:
            self.Linear = torch.nn.ModuleList()
            for i in range(self.channels):
                self.Liner.append(torch.nn.Linear(self.window_size, self.forecast_size))
        else:
            self.Linear = torch.nn.Linear(self.window_size, self.forecast_size)
            
    def forward(self, x):
        # x: [Batch, Input length, feature size]
        if self.individual:
            output = torch.zeros([x.size(0), self.forecast_size, x.size(2)],
                                 dtype = x.dtype).to(x.device)
            for i in range(self.channels):
                output[:, :, i] = self.Linear[i](x[:, :, i])
            x = output
        else:
            x = self.Linear(x.permute(0, 2, 1)).permute(0, 2, 1)
        return x # [Batch, Output length, feature size]        

In [4]:
batch_size = 32 
in_seq_len = 3 
pred_len = 2
channel = 5
X = torch.randn(batch_size, in_seq_len, channel)
X.shape

torch.Size([32, 3, 5])

In [5]:
one_linear = nn.Linear(
    in_features=in_seq_len,
    out_features=pred_len
)
one_linear

Linear(in_features=3, out_features=2, bias=True)

In [6]:
output = torch.zeros(batch_size, pred_len, channel)
output.shape

torch.Size([32, 2, 5])

In [7]:
# 한 채널만 예시로
output[:,:,0]=one_linear(X[:,:,0])

In [8]:
print(output[:,0,0])
print(output[:,1,0])

tensor([-0.0422,  1.7329, -0.2813,  0.8301,  0.5596,  1.6638,  1.4399,  0.5159,
         1.4506,  0.7740,  0.6422, -0.4226,  0.4887,  0.2160,  1.2640,  0.1114,
         0.5451,  0.9673, -0.0668,  0.9135,  0.8441,  0.6854,  1.8970,  1.4095,
         0.5142, -0.0325,  1.2938,  1.9044, -0.1198,  0.8811, -0.6986,  0.3351],
       grad_fn=<SelectBackward0>)
tensor([ 0.0275,  0.1370,  0.3669, -0.1579,  0.0354,  0.6058,  0.2239,  0.2995,
         1.0410,  0.7718,  0.8155,  0.8653, -0.2585,  0.5678,  0.8835,  1.2759,
         0.3337, -0.0947, -0.1599, -0.2123,  0.5505,  0.5007,  0.0680,  0.3847,
         0.3740, -0.1458,  0.3952,  0.8452, -0.3571,  0.6026,  1.0700, -0.0476],
       grad_fn=<SelectBackward0>)


In [9]:
output = one_linear(X.permute(0,2,1)).permute(0,2,1)
output.shape

torch.Size([32, 2, 5])

### NLinear

In [10]:
class LTSF_NLinear(nn.Module):
    def __init__(
        self,
        window_size,
        forecast_size, 
        individual,
        feature_size
    ):
        super(LTSF_NLinear, self).__init__()
        self.window_size = window_size
        self.forcast_size = forecast_size
        self.individual = individual
        self.channels = feature_size
        if self.individual:
            self.Linear = torch.nn.ModuleList()
            for i in range(self.channels):
                self.Linear.append(torch.nn.Linear(self.window_size, self.forcast_size))
        else:
            self.Linear = torch.nn.Linear(self.window_size, self.forcast_size)
            
    def forward(self, x):
        seq_last = x[: -1:, :].detach() # 기존 텐서를 복사하는 방법
        x = x - seq_last
        if self.individual:
            output = torch.zeros([x.size(0), self.forcast_size, x.size(2)],
                                 dtype=x.dtype).to(x.device)
            for i in range(self.channels):
                output[:,:,i] = self.Linear[i](x[:,:,i])
            x = output
        else:
            x = self.Linear(x.permute(0,2,1)).permute(0,2,1)
        x = x + seq_last
        return x

### DLinear

In [11]:
class moving_avg(nn.Module):
    def __init__(self, kernel_size, stride):
        super(moving_avg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(
            kernel_size=kernel_size,
            stride=stride,
            padding = 0
        )
        
    def forward(self, x):
        # [BATCH SIZE, SEQ_LEN, CHANNEL]
        # padding on the both ends of time series
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = torch.cat([front, x, end], dim=1) # 패딩을 위한 것
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x # [BATCH SIZE, SEQ_LEN, CHANNEL]

In [12]:
class series_decomp(nn.Module):
    def __init__(self, kernel_size):
        super(series_decomp, self).__init__()
        self.moving_avg = moving_avg(kernel_size, stride=1)

    def forward(self, x):
        moving_mean = self.moving_avg(x)
        residual = x - moving_mean
        return moving_mean, residual 

In [13]:
class LTSF_DLinear(torch.nn.Module):
    def __init__(self, window_size, forecast_size, kernel_size, individual, feature_size):
        super(LTSF_DLinear, self).__init__()
        self.window_size = window_size
        self.forcast_size = forecast_size
        self.decompsition = series_decomp(kernel_size)
        self.individual = individual
        self.channels = feature_size
        if self.individual:
            self.Linear_Seasonal = torch.nn.ModuleList()
            self.Linear_Trend = torch.nn.ModuleList()
            for i in range(self.channels):
                self.Linear_Trend.append(
                    torch.nn.Linear(
                        self.window_size, 
                        self.forcast_size))
                self.Linear_Trend[i].weight = torch.nn.Parameter((1/self.window_size)*torch.ones([self.forcast_size, self.window_size]))
                self.Linear_Seasonal.append(torch.nn.Linear(self.window_size, self.forcast_size))
                self.Linear_Seasonal[i].weight = torch.nn.Parameter((1/self.window_size)*torch.ones([self.forcast_size, self.window_size]))
        else:
            self.Linear_Trend = torch.nn.Linear(self.window_size, self.forcast_size)
            self.Linear_Trend.weight = torch.nn.Parameter((1/self.window_size)*torch.ones([self.forcast_size, self.window_size]))
            self.Linear_Seasonal = torch.nn.Linear(self.window_size,  self.forcast_size)
            self.Linear_Seasonal.weight = torch.nn.Parameter((1/self.window_size)*torch.ones([self.forcast_size, self.window_size]))

    def forward(self, x):
        # x: [Batch, Input length, Channel]
        trend_init, seasonal_init = self.decompsition(x)
        # seasonal_init: [Batch, Input length, Channel]
        # trend_init: [Batch, Input length, Channel]
        trend_init, seasonal_init = trend_init.permute(0,2,1), seasonal_init.permute(0,2,1)
        if self.individual:
            trend_output = torch.zeros([trend_init.size(0), trend_init.size(1), self.forcast_size], dtype=trend_init.dtype).to(trend_init.device)
            seasonal_output = torch.zeros([seasonal_init.size(0), seasonal_init.size(1), self.forcast_size], dtype=seasonal_init.dtype).to(seasonal_init.device)
            for idx in range(self.channels):
                trend_output[:, idx, :] = self.Linear_Trend[idx](trend_init[:, idx, :])
                seasonal_output[:, idx, :] = self.Linear_Seasonal[idx](seasonal_init[:, idx, :])                
        else:
            trend_output = self.Linear_Trend(trend_init)
            seasonal_output = self.Linear_Seasonal(seasonal_init)
        x = seasonal_output + trend_output
        return x.permute(0,2,1)

---