<a href="https://colab.research.google.com/github/jodog0412/STOCK-PRICE-PREDICTION/blob/main/stock_price_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. Data Preprocessing

In [None]:
!pip install yfinance
import pandas as pd
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt

In [None]:
data=yf.Ticker("XOM").history(period="2y")
data=data['Close']
plt.plot(data)

In [None]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
data=scaler.fit_transform(data.to_numpy().reshape(-1,1)).flatten()

In [None]:
iw = 128
ow = 128
train=data[:-iw]

In [None]:
print(f"data_size: {data.shape} train_size: {train.shape}")

In [None]:
from torch.utils.data import DataLoader, Dataset

class windowDataset(Dataset):
    def __init__(self, y, input_window=80, output_window=20, stride=5):
        #총 데이터의 개수
        L = y.shape[0]
        #stride씩 움직일 때 생기는 총 sample의 개수
        num_samples = (L - input_window - output_window) // stride + 1

        #input과 output
        X = np.zeros([input_window, num_samples])
        Y = np.zeros([output_window, num_samples])

        for i in np.arange(num_samples):
            start_x = stride*i
            end_x = start_x + input_window
            X[:,i] = y[start_x:end_x]

            start_y = stride*i + input_window
            end_y = start_y + output_window
            Y[:,i] = y[start_y:end_y]

        # size: [num_samples, input_window, 1]
        X = X.reshape(X.shape[1], X.shape[0], 1) 
        Y = Y.reshape(Y.shape[1], Y.shape[0], 1)
        self.x = X
        self.y = Y
        self.len = len(X)

    def __getitem__(self, i):
        return self.x[i], self.y[i]
    def __len__(self):
        return self.len

In [None]:
train_dataset = windowDataset(train, input_window=iw, output_window=ow, stride=1)
train_loader = DataLoader(train_dataset, batch_size=64)

# 2. Model

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.nn import Transformer
import math

## 1) Transformer

In [None]:
class TF_config():
    def __init__(self):
        self.d_model=512
        self.nhead=8
        self.nlayers=4
        self.dropout=0.1

class TFModel(nn.Module):
    def __init__(self,configs, iw=iw, ow=ow):
        super(TFModel, self).__init__()
        d_model=configs.d_model
        nhead=configs.nhead
        dropout=configs.dropout
        nlayers=configs.nlayers
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=nlayers) 
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        self.encoder = nn.Sequential(
            nn.Linear(1, d_model//2),
            nn.ReLU(),
            nn.Linear(d_model//2, d_model)
        )
        
        self.linear =  nn.Sequential(
            nn.Linear(d_model, d_model//2),
            nn.ReLU(),
            nn.Linear(d_model//2, 1)
        )

        self.linear2 = nn.Sequential(
            nn.Linear(iw, (iw+ow)//2),
            nn.ReLU(),
            nn.Linear((iw+ow)//2, ow)
        ) 

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, src, srcmask):
        src = self.encoder(src)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src.transpose(0,1), srcmask).transpose(0,1)
        output = self.linear(output)[:,:,0]
        output = self.linear2(output)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

    def gen_attention_mask(x):
        mask = torch.eq(x, 0)
        return mask

## 2) DLinear

In [None]:
class DL_config():
    def __init__(self):
        self.seq_len=iw
        self.pred_len=ow
        self.individual=0
        self.enc_in=4
        
class moving_avg(nn.Module):
    """
    Moving average block to highlight the trend of time series
    """
    def __init__(self, kernel_size, stride):
        super(moving_avg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        # padding on the both ends of time series
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = torch.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x

class series_decomp(nn.Module):
    """
    Series decomposition block
    """
    def __init__(self, kernel_size):
        super(series_decomp, self).__init__()
        self.moving_avg = moving_avg(kernel_size, stride=1)

    def forward(self, x):
        moving_mean = self.moving_avg(x)
        res = x - moving_mean
        return res, moving_mean

class DLinear(nn.Module):
    """
    Decomposition-Linear
    """
    def __init__(self, configs):
        super(DLinear, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len

        # Decompsition Kernel Size
        kernel_size = 25
        self.decompsition = series_decomp(kernel_size)
        self.individual = configs.individual
        self.channels = configs.enc_in

        if self.individual: #individual=1
            self.Linear_Seasonal = nn.ModuleList()
            self.Linear_Trend = nn.ModuleList()
            
            for i in range(self.channels):
                self.Linear_Seasonal.append(nn.Linear(self.seq_len,self.pred_len))
                self.Linear_Trend.append(nn.Linear(self.seq_len,self.pred_len))

                # Use this two lines if you want to visualize the weights
                # self.Linear_Seasonal[i].weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))
                # self.Linear_Trend[i].weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))
        else: #individual=0
            self.Linear_Seasonal = nn.Linear(self.seq_len,self.pred_len)
            self.Linear_Trend = nn.Linear(self.seq_len,self.pred_len)
            
            # Use this two lines if you want to visualize the weights
            # self.Linear_Seasonal.weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))
            # self.Linear_Trend.weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))

    def forward(self, x):
        # x: [Batch, Input length, Channel]
        seasonal_init, trend_init = self.decompsition(x)
        seasonal_init, trend_init = seasonal_init.permute(0,2,1), trend_init.permute(0,2,1)
        if self.individual: #individual=1
            seasonal_output = torch.zeros([seasonal_init.size(0),seasonal_init.size(1),self.pred_len],dtype=seasonal_init.dtype).to(seasonal_init.device)
            trend_output = torch.zeros([trend_init.size(0),trend_init.size(1),self.pred_len],dtype=trend_init.dtype).to(trend_init.device)
            for i in range(self.channels):
                seasonal_output[:,i,:] = self.Linear_Seasonal[i](seasonal_init[:,i,:])
                trend_output[:,i,:] = self.Linear_Trend[i](trend_init[:,i,:])
        else: #individual=0
            seasonal_output = self.Linear_Seasonal(seasonal_init)
            trend_output = self.Linear_Trend(trend_init)

        x = seasonal_output + trend_output
        return x.permute(0,2,1) # to [Batch, Output length, Channel]

# 3. Train

In [None]:
from tqdm import tqdm
device = torch.device("cuda")
lr = 1e-4
criterion = nn.MSELoss()
class modelParam():
    def __init__(self,label):
        if label=='TF': self.model=TFModel(configs=TF_config()).to(device)
        else: self.model=DLinear(configs=DL_config()).to(device)
        self.optimizer=torch.optim.Adam(self.model.parameters(), lr=lr)

    def epoch(self,epoch):
        self.epoch=epoch
        return self.epoch

In [None]:
TF=modelParam('TF')
model=TF.model.to(device)
optimizer=TF.optimizer
epoch=TF.epoch(70)
progress = tqdm(range(epoch))
model.train()

losses=[]
for i in progress:
    batchloss = 0.0
    for (inputs, outputs) in train_loader:
        optimizer.zero_grad()
        src_mask = model.generate_square_subsequent_mask(inputs.shape[1]).to(device)
        result = model(inputs.float().to(device),  src_mask)
        loss = criterion(result, outputs[:,:,0].float().to(device))
        loss.backward()
        optimizer.step()
        batchloss += loss
    losses.append(batchloss.cpu().item())
    progress.set_description("loss: {:0.6f}".format(batchloss.cpu().item() / len(train_loader)))

In [None]:
plt.plot(losses)

In [None]:
DL=modelParam('DL')
model=DL.model.to(device)
optimizer=DL.optimizer
epoch=DL.epoch(140)
progress = tqdm(range(epoch))
model.train()
losses=[]
for i in progress:
    batchloss = 0.0
    for (inputs, outputs) in train_loader:
        optimizer.zero_grad()
        result = model(inputs.float().to(device))
        loss = criterion(result, outputs.float().to(device))
        loss.backward()
        optimizer.step()
        batchloss += loss
    losses.append(batchloss.cpu().item())
    progress.set_description("loss: {:0.6f}".format(batchloss.cpu().item() / len(train_loader)))

In [None]:
plt.plot(losses)

In [None]:
def evaluate(label):
    input = torch.tensor(train[-iw:]).reshape(1,-1,1).to(device).float().to(device)
    if label=='TF':
        model=TF.model.to(device)
        src_mask = model.generate_square_subsequent_mask(input.shape[1]).to(device)
        model.eval()
        predictions = model(input, src_mask)
        return predictions.detach().cpu().numpy()
    else:
        model=DL.model.to(device)
        input = torch.tensor(train[-iw:]).reshape(1,-1,1).to(device).float().to(device)
        model.eval()
        predictions = model(input)
        return predictions.detach().cpu().numpy().reshape(1,-1)

In [None]:
real = data
real = scaler.inverse_transform(real.reshape(-1,1))[:,0]
TF_pred = evaluate('TF')
TF_pred = scaler.inverse_transform(TF_pred)[0]
DL_pred = evaluate('DL')
DL_pred = scaler.inverse_transform(DL_pred)[0]
plt.figure(figsize=(12,8))
plt.plot(range(300,len(data)),real[300:,],label="real")
plt.plot(range(len(data)-ow,len(data)),TF_pred,label="TF_predict")
plt.plot(range(len(data)-ow,len(data)),DL_pred,label="DL_predict")
plt.legend()
plt.show()