In [8]:
cd ./hanhwa

c:\Users\Sejong\Desktop\hanhwa


In [5]:
import importlib
import function_file as ff
importlib.reload(ff)

<module 'function_file' from 'c:\\Users\\Sejong\\Desktop\\hanhwa\\function_file\\__init__.py'>

In [116]:
import torch
import torch.nn as nn
import math
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len= 5000, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p = dropout)

        pe = torch.zeros(max_len, 1, d_model)
        position = torch.arange(max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0,d_model,2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe) # this stores the variable in the state_dict (used for non-trainable variables)

    def forward(self, x):
        """
        Arguments : 
            x : Tensor, shape : [input_window, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
    
class Transformer(nn.Module):
    def __init__(self, feature_size = 250, num_layers = 1, dropout = 0.1):
        super(Transformer, self).__init__()
        self.model_type = 'Transformer'
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(feature_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model = feature_size, nhead = 10, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers = num_layers)
        self.decoder = nn.Linear(feature_size, 1)
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src):
        if self.src_mask is None or self.src_mask.size(0) != len(src):
            device = src.device
            mask = self._generate_square_subsequent_mask(len(src)).to(device)
            self.src_mask = mask

        src = self.pos_encoder(src)
        output = self.transformer_encoder(src,self.src_mask)
        output = self.decoder(output)
        return output
    
    def _generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


In [70]:
from sklearn.preprocessing import MinMaxScaler
import numpy as np
SEED = 96
torch.manual_seed(SEED)
np.random.seed(SEED)

input_window = 30
output_window = 10
scaler_train = MinMaxScaler()
scaler_test = MinMaxScaler()
device = 'cuda' if torch.cuda.is_available() else 'cpu'


def create_inout_sequence_2(input_data, input_window, output_window):
    inout_seq = []
    L = len(input_data)
    for i in range(L - input_window):
        train_seq = np.append(input_data[i : i + input_window][:-output_window], output_window * [[0,0]], axis = 0)
        train_label = input_data[i : i + input_window]
        inout_seq.append((train_seq, train_label))

    return torch.FloatTensor(inout_seq)

def get_data():

    df = ff.time_series.time_series_dataframe()
    df = df.iloc[:, :2]

    train_len = int(len(df) * 0.8)
    train_data = df[:train_len]
    test_data = df[train_len:]

    temp_train = scaler_train.fit_transform(train_data['TEMP'].values.reshape(-1,1)).reshape(-1)
    train_data['TEMP'] = temp_train
    temp_test = scaler_test.fit_transform(test_data['TEMP'].values.reshape(-1,1)).reshape(-1)
    test_data['TEMP'] = temp_test
    train_data = train_data.values
    test_data = test_data.values

    
    
    
    train_sequence = create_inout_sequence_2(train_data, input_window, output_window)
    train_sequence = train_sequence[:-output_window]

    test_sequence = create_inout_sequence_2(test_data, input_window, output_window)
    test_sequence = test_sequence[:-output_window]

    return train_sequence.to(device), test_sequence.to(device)


def get_batch(source, i, batch_size):
    seq_len = min(batch_size, len(source) -1 -i)
    data = source[i : i + seq_len]
    input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window,1)) # 1 is feature size
    target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window,1))
    return input, target

In [75]:
def get_batch(source, i, batch_size):
    seq_len = min(batch_size, len(source) -1 -i)
    data = source[i : i + seq_len]
    input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window,1)) # 1 is feature size
    target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window,1))
    return input, target

train, test = get_data()

In [90]:
model = Transformer().to(device)
criterion = nn.MSELoss()
lr = 0.001
optimizer = torch.optim.Adam(params = model.parameters(), lr = lr)
epochs = 10

In [82]:
seq_len = 512
data = train[:seq_len]    
input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window,1)) # 1 is feature size
target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window,1))
    


In [84]:
input.shape, target.shape

(torch.Size([30, 512, 1, 2]), torch.Size([30, 512, 1, 2]))

In [None]:
model.train()
optimizer.zero_grad()
output = model(input)
output

In [113]:
max_len = 5000
d_model = 250
pe = torch.zeros(max_len,1, d_model)
position = torch.arange(max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0,d_model,2).float() * (-math.log(10000.0) / d_model))


pe[:,0, 0::2] = torch.sin(position * div_term)
pe[:,0, 1::2] = torch.cos(position * div_term)
pe.shape

torch.Size([5000, 1, 250])

In [115]:
tmp = pe[:100, :]
tmp.shape

torch.Size([100, 1, 250])

In [117]:
encoder = PositionalEncoding(250)
encoder

PositionalEncoding(
  (dropout): Dropout(p=0.1, inplace=False)
)