<a href="https://colab.research.google.com/github/oune/monitoring_colab/blob/main/ae_lstm_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import os
import pickle
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
import numpy as np
from typing import List
import easydict
from functools import reduce

In [None]:
## 인코더
class Encoder(nn.Module):

    def __init__(self, input_size=4096, hidden_size=1024, num_layers=2):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True,
                            dropout=0.1, bidirectional=False)

    def forward(self, x):
        outputs, (hidden, cell) = self.lstm(x)  # out: tensor of shape (batch_size, seq_length, hidden_size)

        return (hidden, cell)
    
## 디코더
class Decoder(nn.Module):

    def __init__(self, input_size=4096, hidden_size=1024, output_size=4096, num_layers=2):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True,
                            dropout=0.1, bidirectional=False)

        self.relu = nn.ReLU()
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden):
        output, (hidden, cell) = self.lstm(x, hidden)  # out: tensor of shape (batch_size, seq_length, hidden_size)
        prediction = self.fc(output)

        return prediction, (hidden, cell)
    
## LSTM Auto Encoder
class LSTMAutoEncoder(nn.Module):

    def __init__(self,
                 input_dim: int,
                 latent_dim: int,
                 window_size: int=1,
                 **kwargs) -> None:
        """
        :param input_dim: 변수 Tag 갯수
        :param latent_dim: 최종 압축할 차원 크기
        :param window_size: 길이
        :param kwargs:
        """

        super(LSTMAutoEncoder, self).__init__()

        self.latent_dim = latent_dim
        self.input_dim = input_dim
        self.window_size = window_size

        if "num_layers" in kwargs:
            num_layers = kwargs.pop("num_layers")
        else:
            num_layers = 1

        self.encoder = Encoder(
            input_size=input_dim,
            hidden_size=latent_dim,
            num_layers=num_layers,
        )
        self.reconstruct_decoder = Decoder(
            input_size=input_dim,
            output_size=input_dim,
            hidden_size=latent_dim,
            num_layers=num_layers,
        )

    def forward(self, src:torch.Tensor, **kwargs):
        batch_size, sequence_length, var_length = src.size()

        ## Encoder 넣기
        encoder_hidden = self.encoder(src)
        
        inv_idx = torch.arange(sequence_length - 1, -1, -1).long()
        reconstruct_output = []
        temp_input = torch.zeros((batch_size, 1, var_length), dtype=torch.float).to(src.device)
        hidden = encoder_hidden
        for t in range(sequence_length):
            temp_input, hidden = self.reconstruct_decoder(temp_input, hidden)
            reconstruct_output.append(temp_input)
        reconstruct_output = torch.cat(reconstruct_output, dim=1)[:, inv_idx, :]
        
        return [reconstruct_output, src]

    def loss_function(self,
                      *args,
                      **kwargs) -> dict:
        recons = args[0]
        input = args[1]
        
        ## MSE loss(Mean squared Error)
        loss =F.mse_loss(recons, input)
        return loss

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
root = '/content/drive/Othercomputers/내 노트북/부직포 압출장비/'

In [None]:
## 설정 폴더
args = easydict.EasyDict({
    "batch_size": 128, ## 배치 사이즈 설정
    "device": torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'), ## GPU 사용 여부 설정
    "input_size": 3, ## 입력 차원 설정
    "latent_size": 1, ## Hidden 차원 설정
    "output_size": 3, ## 출력 차원 설정
    "window_size" : 3, ## sequence Lenght
    "num_layers": 2,     ## LSTM layer 갯수 설정
    "learning_rate" : 0.001, ## learning rate 설정
    "max_iter" : 100000, ## 총 반복 횟수 설정
    'early_stop' : True,  ## valid loss가 작아지지 않으면 early stop 조건 설정
})


In [None]:
model_name = 'model8.pth'
model_path = os.path.join(root, model_name)
model = LSTMAutoEncoder(input_dim=args.input_size, latent_dim=args.latent_size, window_size=args.window_size, num_layers=args.num_layers)
model.load_state_dict(torch.load(model_path))
model.to(args.device)
model.eval()

LSTMAutoEncoder(
  (encoder): Encoder(
    (lstm): LSTM(3, 1, num_layers=2, batch_first=True, dropout=0.1)
  )
  (reconstruct_decoder): Decoder(
    (lstm): LSTM(3, 1, num_layers=2, batch_first=True, dropout=0.1)
    (relu): ReLU()
    (fc): Linear(in_features=1, out_features=3, bias=True)
  )
)

In [None]:
df = pd.read_csv('/content/drive/Othercomputers/내 노트북/부직포 압출장비/data.csv', index_col=0)

In [None]:
## 데이터를 불러올 때 index로 불러오기
def make_data_idx(dates, window_size=1):
    input_idx = []
    for idx in range(window_size-1, len(dates)):
        input_idx.append(list(range(idx - window_size+1, idx+1)))  
    return input_idx

In [None]:
input_ids = make_data_idx(df.index.to_list(), window_size=args.window_size)
selected_column = [item for item in df.columns][:args.input_size]
var_data = torch.tensor(df[selected_column].values.astype(np.float), dtype=torch.float)
idx = 1
temp_input_ids = input_ids[idx]
input_values = var_data[temp_input_ids]

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  var_data = torch.tensor(df[selected_column].values.astype(np.float), dtype=torch.float)


In [None]:
input_values

tensor([[ -21.7688, -308.8175, -701.6309],
        [ -21.7739, -363.2509, -534.2027],
        [ -21.7729, -260.5415, -830.6281]])

In [None]:
input_values.size()

torch.Size([3, 3])

In [None]:
def getData(df, start, end):
  ## 정규화
  if df is not None:
      mean_df = df.mean()
      std_df = df.std()
      df = (df-mean_df)/std_df

  ## 연속한 index를 기준으로 학습에 사용합니다.
  index_list = df.index.to_list()
  input_ids = make_data_idx(index_list, window_size=args.window_size)

  var_data = torch.tensor(df.astype(np.float), dtype=torch.float)
  
  return torch.stack([var_data[input_ids[idx]] for i in range(start, end)], dim=1)

In [None]:
## Dataset을 상속받아 데이터를 구성
class TagDataset(Dataset):
    def __init__(self, input_size, df, mean_df=None, std_df = None, window_size=1):
        
        ## 변수 갯수
        self.input_size = input_size
        
        ## 복원할 sequence 길이
        self.window_size = window_size
        
        ## Summary용 데이터 Deep copy
        original_df = df.copy()
        
        ## 정규화
        if mean_df is not None and std_df is not None:
            sensor_columns = [item for item in df.columns]
            df[sensor_columns] = (df[sensor_columns]-mean_df)/std_df
        
        ## 연속한 index를 기준으로 학습에 사용합니다.
        index_list = df.index.to_list()
        self.input_ids = make_data_idx(index_list, window_size=window_size)
        
        ## sensor 데이터만 사용하여 reconstruct에 활용
        self.selected_column = [item for item in df.columns][:input_size]
        self.var_data = torch.tensor(df[self.selected_column].values.astype(np.float), dtype=torch.float)
        
        ## Summary 용
        self.df = original_df.iloc[np.array(self.input_ids)[:, -1]]
        
    ## Dataset은 반드시 __len__ 함수를 만들어줘야함(데이터 길이)
    def __len__(self):
        return len(self.input_ids)
    
    ## Dataset은 반드시 __getitem__ 함수를 만들어줘야함
    ## torch 모듈은 __getitem__ 을 호출하여 학습할 데이터를 불러옴.
    def __getitem__(self, item):
        temp_input_ids = self.input_ids[item]
        input_values = self.var_data[temp_input_ids]
        return input_values

In [None]:
def get_loss_list(args, model, test_loader):
    test_iterator = tqdm(enumerate(test_loader), total=len(test_loader), desc="testing")
    loss_list = []
    
    with torch.no_grad():
        for i, batch_data in test_iterator:
                
            batch_data = batch_data.to(args.device)
            predict_values = model(batch_data)
            
            ## MAE(Mean Absolute Error)로 계산
            loss = F.l1_loss(predict_values[0], predict_values[1], reduce=False)
            #loss = loss.sum(dim=2).sum(dim=1).cpu().numpy()
            loss = loss.mean(dim=1).cpu().numpy()
            loss_list.append(loss)
    loss_list = np.concatenate(loss_list, axis=0)
    return loss_list

In [None]:
mean_df = df.mean()
std_df = df.std()

In [None]:
dataset = TagDataset(df=df, input_size=args.input_size, window_size=args.window_size, mean_df=mean_df, std_df=std_df)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.var_data = torch.tensor(df[self.selected_column].values.astype(np.float), dtype=torch.float)


In [None]:
train_loader = torch.utils.data.DataLoader(
                 dataset=dataset,
                 batch_size=args.batch_size,
                 shuffle=False)

In [None]:
loss_list = get_loss_list(args, model, train_loader)
mean = np.mean(loss_list, axis=0)
std = np.cov(loss_list.T)

testing:   0%|          | 0/13894 [00:00<?, ?it/s]



In [None]:
## Anomaly Score
class Anomaly_Calculator:
    def __init__(self, mean:np.array, std:np.array):
        assert mean.shape[0] == std.shape[0] and mean.shape[0] == std.shape[1], '평균과 분산의 차원이 똑같아야 합니다.'
        self.mean = mean
        self.std = std
    
    def __call__(self, recons_error:np.array):
        x = (recons_error-self.mean)
        return np.matmul(np.matmul(x, self.std), x.T)

## 비정상 점수 계산기
anomaly_calculator = Anomaly_Calculator(mean, std)

In [None]:
oneData = iter(train_loader).next()
oneData = model(oneData.to(args.device))

In [None]:
loss_list = []
with torch.no_grad():  
  predict_values = oneData
  loss = F.l1_loss(predict_values[0], predict_values[1], reduce=False)
  loss = loss.mean(dim=1).cpu().numpy()
  loss_list.append(loss)

loss_list = np.concatenate(loss_list, axis=0)
ans_score = anomaly_calculator(loss_list).mean()
ans_score

0.18104747994509568

# 실제 데이터를 이용하여 결과를 내도록 개발

In [None]:
oneData = iter(train_loader).next()
# print(oneData)
print(len(oneData))
print()
res = model(oneData.to(args.device))
# print(oneData)
print(len(res))

128

2


In [None]:
oneData[:2]

tensor([[[ 0.6300, -0.9033, -0.5847],
         [ 0.6331, -0.8772, -0.5333],
         [ 0.6250, -0.9986, -0.3913]],

        [[ 0.6331, -0.8772, -0.5333],
         [ 0.6250, -0.9986, -0.3913],
         [ 0.6265, -0.7694, -0.6427]]])