In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from sklearn.preprocessing import MinMaxScaler
import os
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset # 텐서데이터셋
from torch.utils.data import DataLoader # 데이터로더
import matplotlib.pyplot as plt

In [None]:
data=pd.read_csv("dataset/sensor_LSTM.csv")
data

In [None]:
data.shape

In [None]:
data.columns

In [None]:
data.head()

In [None]:
data.describe()

In [None]:
data= data.dropna()

In [None]:
data=data[:80000]

In [None]:
data.drop(['Unnamed: 0', 'timestamp'],axis=1, inplace=True)

In [None]:
data['machine_status'].value_counts()

In [None]:
conditions = [(####빈칸을 채우세요####), (####빈칸을 채우세요####), (####빈칸을 채우세요####)]
choices = [####빈칸을 채우세요####]
data['Operation'] = np.select(conditions, choices, default=0)

In [None]:
data.columns

In [None]:
df = pd.DataFrame(data, columns=['sensor_04', 'sensor_06', 'sensor_07', 'sensor_08', 'sensor_09','Operation'])

In [None]:
df.plot(subplots =True, sharex = True, figsize = (20,20))

In [None]:
df.shape

Training set:
We choose 50,000 data points with 2 broken points to train the model,

Testing set:
the remaining 170,000 points with 5 broken states will be used to test the predictivity of the model.

In [None]:
seq_length = 20
batch = 100
train_size = int(len(df)*0.7)
train_set = df[0:train_size]
test_set = df[train_size-seq_length:]

In [None]:
train_set.plot(subplots =True, sharex = True, figsize = (20,20))

In [None]:
test_set.plot(subplots =True, sharex = True, figsize = (20,20))

In [None]:
train_set.head()

In [None]:
# Input scale
scaler_x = MinMaxScaler()
scaler_x.fit(train_set.iloc[:, :-1])

train_set.iloc[:, :-1] = scaler_x.transform(train_set.iloc[:, :-1])
test_set.iloc[:, :-1] = scaler_x.transform(test_set.iloc[:, :-1])

# Output scale
scaler_y = MinMaxScaler()
scaler_y.fit(train_set.iloc[:, [-1]])

train_set.iloc[:, -1] = scaler_y.transform(train_set.iloc[:, [-1]])
test_set.iloc[:, -1] = scaler_y.transform(test_set.iloc[:, [-1]])

In [None]:
train_set.head()

In [None]:
device = torch.device('cpu')
# 데이터셋 생성 함수
def build_dataset(time_series, seq_length):
    dataX = []
    dataY = []
    for i in range(0, len(time_series)-seq_length):
        _x = time_series[i:i+seq_length, :]
        _y = time_series[i+seq_length, [-1]]
        # print(_x, "-->",_y)
        dataX.append(_x)
        dataY.append(_y)

    return np.array(dataX), np.array(dataY)

trainX, trainY = build_dataset(np.array(train_set), seq_length)
testX, testY = build_dataset(np.array(test_set), seq_length)


# 텐서로 변환
trainX_tensor = torch.FloatTensor(trainX)
trainY_tensor = torch.FloatTensor(trainY)

testX_tensor = torch.FloatTensor(testX)
testY_tensor = torch.FloatTensor(testY)

testX_tensor = testX_tensor.to(device)
testY_tensor = testY_tensor.to(device)
# 텐서 형태로 데이터 정의
dataset = TensorDataset(trainX_tensor, trainY_tensor)
# 데이터로더는 기본적으로 2개의 인자를 입력받으며 배치크기는 통상적으로 2의 배수를 사용
dataloader = DataLoader(dataset,
                        batch_size=batch,
                        shuffle=True,
                        drop_last=True)

In [None]:
trainX_tensor.shape

In [None]:
testX_tensor.shape

In [None]:
print(trainX_tensor[0:2])

LSTM

In [None]:

# 설정값
data_dim = 6
hidden_dim = 10
output_dim = 1
learning_rate = 0.01
nb_epochs = 100

class Net(nn.Module):
    # # 기본변수, layer를 초기화해주는 생성자
    def __init__(self, input_dim, hidden_dim, seq_len, output_dim, layers):
        super(Net, self).__init__()
        self.hidden_dim = hidden_dim
        self.seq_len = seq_len
        self.output_dim = output_dim
        self.layers = layers

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=layers,
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim, bias = True)

    # 학습 초기화를 위한 함수
    def reset_hidden_state(self):
        self.hidden = (
                torch.zeros(self.layers, self.seq_len, self.hidden_dim),
                torch.zeros(self.layers, self.seq_len, self.hidden_dim))

    # 예측을 위한 함수
    def forward(self, x):
        x, _status = self.lstm(x)
        x = self.fc(x[:, -1])
        return x

Training

In [None]:
def train_model(model, train_df, num_epochs = None, lr = None, verbose = 10, patience = 10):

    criterion = nn.MSELoss().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr = lr)
    nb_epochs = num_epochs

    # epoch마다 loss 저장
    train_hist = np.zeros(nb_epochs)

    for epoch in range(nb_epochs):
        avg_cost = 0
        total_batch = len(train_df)

        for batch_idx, samples in enumerate(train_df):

            x_train, y_train = samples

            # seq별 hidden state reset
            model.reset_hidden_state()

            # H(x) 계산
            outputs = model(x_train)

            # cost 계산
            loss = criterion(outputs, y_train)

            # cost로 H(x) 개선
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            avg_cost += loss/total_batch

        train_hist[epoch] = avg_cost

        if epoch % verbose == 0:
            print('Epoch:', '%04d' % (epoch), 'train loss :', '{:.4f}'.format(avg_cost))

        # patience번째 마다 early stopping 여부 확인
        if (epoch % patience == 0) & (epoch != 0):

            # loss가 커졌다면 early stop
            if train_hist[epoch-patience] < train_hist[epoch]:
                print('\n Early Stopping : %04d epoch' %(epoch))

                break

    return model.eval(), train_hist

학습 시작

In [None]:
# 모델 학습
# 설정값
data_dim = 6
output_dim = 1
hidden_dim = ####빈칸을 채우세요####
learning_rate = ####빈칸을 채우세요####
nb_epochs = ####빈칸을 채우세요####
net = Net(data_dim, hidden_dim, seq_length, output_dim, 1)
model, train_hist = train_model(net, dataloader, num_epochs = nb_epochs, lr = learning_rate, verbose = 5, patience = 10)

In [None]:
# epoch별 손실값
fig = plt.figure(figsize=(10, 4))
plt.plot(train_hist, label="Training loss")
plt.legend()
plt.show()

In [None]:
# 모델 저장
PATH ='####빈칸을 채우세요####'
#torch.save(model.state_dict(), PATH)

# 불러오기
model = Net(data_dim, hidden_dim, seq_length, output_dim, 1)
model.load_state_dict(torch.load(PATH), strict=False)
model.eval()

In [None]:
print(testX_tensor[0:2])
print(testY_tensor[0:2])

In [None]:
# 예측 테스트
a=####빈칸을 채우세요####
testX_tensor_2000=testX_tensor[####빈칸을 채우세요####]
testY_tensor_2000=testY_tensor[####빈칸을 채우세요####]
with torch.no_grad():
    pred = []
    for pr in range(len(testX_tensor_2000)):

        model.reset_hidden_state()

        predicted = model(torch.unsqueeze(testX_tensor_2000[pr], 0))
        predicted = torch.flatten(predicted).item()
        pred.append(predicted)

    # INVERSE
    pred_inverse = scaler_y.inverse_transform(np.array(pred).reshape(-1, 1))
    testY_inverse = scaler_y.inverse_transform(testY_tensor_2000)

In [None]:
fig = plt.figure(figsize=(10,3))
plt.plot(np.arange(len(pred_inverse)), pred_inverse, label = 'pred')
plt.plot(np.arange(len(testY_inverse)), testY_inverse, label = 'true')
plt.title("Test plot")
plt.legend()
plt.show()