In [None]:
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import torch
from torch import nn, optim
from torch.autograd import Variable
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error

 参数:
        input_size。输入的x中预期的特征数量
        hidden_size。隐藏状态下的特征数量`h`。
        num_layers。递归层的数量。例如，设置 "num_layers=2 "意味着将两个LSTM堆叠在一起。
            意味着将两个LSTM堆叠在一起，形成一个 "堆叠的LSTM"。
            第二个LSTM接收第一个LSTM的输出并计算最终结果。
            计算最终结果。默认：1
        bias: 如果`False`，那么该层不使用偏置权重`b_ih`和`b_hh`。
            默认: `True```。
        batch_first: 如果`True`，那么输入和输出张量将以`(batch, seq)`的形式提供。
            以`(batch, seq, feature)`的形式提供，而不是`(seq, batch, feature)`。
            请注意，这不适用于隐藏状态或单元状态。见下面的
            输入/输出部分了解详情。 默认值: `False```。
        dropout。如果非零，在每个LSTM层的输出上引入一个`Dropout'层，但最后一层除外。
            LSTM层的输出，除了最后一层，放弃的概率等于
            :attr:`dropout`。默认：0
        bidirectional: 如果是`True'，成为一个双向LSTM。默认值：`False`。
        proj_size: 如果``> 0``，将使用LSTM的相应大小的投影。默认值：0

In [None]:
N_ITER = 5000
BATCH_SIZE = 64
DATA_PATH = './data/data.xls'
NUM_CLASSES = 1
HIDDEN_SIZE = 64
NUM_LAYERS = 2
LEARNING_RATE = 0.0001
NUM_EPOCHS = 500
SEQ_LENGTH = 3  # time step
INPUT_SIZE = 3  # input size
mm_x = MinMaxScaler()
mm_y = MinMaxScaler()


# 读取数据
def read_data(data_path):
    data = pd.read_excel(data_path)
    feature = data
    label = data.iloc[:, [2]]
    return feature, label


# 标准化数据
def normalization(x, y):
    # print(x.values)
    x = mm_x.fit_transform(x.values)
    y = mm_y.fit_transform(y)
    return x, y


# 建立滑动窗口
def sliding_windows(data):
    x = []
    y = []
    for i in range(len(data) - SEQ_LENGTH - 1):
        _x = data[i:i + SEQ_LENGTH, :]
        _y = data[i + SEQ_LENGTH, -1]
        x.append(_x)
        y.append(_y)
    x = np.array(x)
    y = np.array(y)
    return x, y


# 建立DataLoader
def data_generator(x_train, y_train, x_test, y_test):
    train_dataset = TensorDataset(torch.from_numpy(x_train).to(torch.float32),
                                  torch.from_numpy(y_train).to(torch.float32))
    test_dataset = TensorDataset(torch.from_numpy(x_test).to(torch.float32), torch.from_numpy(y_test).to(torch.float32))
    train_loader = DataLoader(dataset=train_dataset,
                              batch_size=BATCH_SIZE,
                              shuffle=False)
    test_Loader = DataLoader(dataset=test_dataset,
                             batch_size=BATCH_SIZE,
                             shuffle=False)
    return train_loader, test_Loader


feature, label = read_data(DATA_PATH)

feature, label = normalization(feature, label)

x, y = sliding_windows(feature)

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

train_loader, test_loader = data_generator(x_train, y_train, x_test, y_test)


# 建立 LSTM 模型
class LSTM(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        self.num_classes = num_classes
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.seq_length = SEQ_LENGTH

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True)

        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # h_0 = torch.zeros(
        #     self.num_layers,
        #     BATCH_SIZE, self.hidden_size
        # )
        # c_0 = torch.zeros(
        #     self.num_layers, BATCH_SIZE, self.hidden_size
        # )
        output, (h_n, c_n) = self.lstm(x, None)
        h_out = output[:, -1, :]
        # h_n.view(-1, self.hidden_size)
        out = self.fc(h_out)
        return out


model = LSTM(num_classes=NUM_CLASSES, input_size=INPUT_SIZE,
             hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)


# 训练模型
def train():
    iter = 0
    for epoch in range(NUM_EPOCHS):
        for i, (batch_x, batch_y) in enumerate(train_loader):
            batch_y = Variable(torch.reshape(batch_y, (len(batch_y), 1)))
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            iter += 1
            if iter % 100 == 0:
                print("iter: %d,    loss: %1.5f" % (iter, loss.item()))


def eval(test_x, test_y):
    model.eval()
    test_x = Variable(torch.from_numpy(test_x).to(torch.float32))
    test_y = Variable(torch.from_numpy(test_y).to(torch.float32))
    train_predict = model(test_x)
    data_predict = train_predict.data.numpy()
    y_data_plot = test_y.data.numpy()
    y_data_plot = np.reshape(y_data_plot, (-1, 1))
    data_predict = mm_y.inverse_transform(data_predict)
    y_data_plot = mm_y.inverse_transform(y_data_plot)

    plt.plot(y_data_plot)
    plt.plot(data_predict)
    plt.legend(('real', 'predict'), fontsize='15')
    plt.show()

    print('MAE/RMSE')
    print(mean_absolute_error(y_data_plot, data_predict))
    print(np.sqrt(mean_squared_error(y_data_plot, data_predict)))
    print(y_data_plot.flatten()[:20])
    print(data_predict.flatten()[:20])


train()
eval(x_test, y_test)
