In [None]:
# In[ ] 导入库
import numpy as np
import torch
from torch import nn
import torch.optim as optim
import xlrd
import xlwt


# In[] 获取数据
def get_data(file='sae_gan_data_16.xls'):
    workbook = xlrd.open_workbook(file) 
    sheet=workbook.sheets()[0]
    y=[]
    for i in range(sheet.ncols):    
        y.append(sheet.col_values(i))
    data=np.array(y)
    return data

# In[]  获取输入和输出数据
def get_X_Y(data):
    # 几年预测第几年之后的数据
    before=5
    after=3
    # 划分数据
    N=data.shape
    X=[]
    Y=[]
    for i in reversed(range(N[0])):
        for j in range(N[1]):
            xj=data[i,j:j+before]
            yj=data[i,j+before+after-1]
            X.append(xj)
            Y.append(yj)
            if j+before+after-1==(N[1]-1):
                break
    X=np.array(X)
    Y=np.array(Y).reshape(-1,1)
    
    return X,Y

# In[] 划分训练组和预测组 因为数据最后几个是 实际的数据，所以后几个需要进行预测
def split_data(X,Y,test_N=73):
    x_train=X[:-test_N,:]
    y_train=Y[:-test_N,:]
    x_test=X[-test_N:,:]
    y_test=Y[-test_N:,:]
    return x_train,y_train,x_test,y_test

# In[] 归一化与反归一化
class mapminmax():
    def __init__(self,X,lb=-1,ub=1):
        self.Min=np.min(X,axis=0) # X 为一列一个数据
        self.Max=np.max(X,axis=0)
        self.lb=lb
        self.ub=ub
    
    def apply(self,X):
        
        out=(X-self.Min)/(self.Max-self.Min)*(self.ub-self.lb)+self.lb
        return out
    
    def reversed(self,X):
        out=(X-self.lb)/(self.ub-self.lb)*(self.Max-self.Min)+self.Min
        return out
    
# In[ ] 搭建网络结果
class GRU(nn.Module):
    def __init__(self):
        super().__init__()
        n_features = x_train.shape[-1]
        n_out = y_train.shape[-1]
        
        self.rnn = nn.GRU(
            input_size=n_features,
            hidden_size=64,
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )
        self.out = nn.Sequential(
            nn.Linear(128, n_out)
        )

    def forward(self, x):
        r_out, (h_n, h_c) = self.rnn(x, None)  # None 表示 hidden state 会用全0的 state
        out = self.out(r_out[:, -1])
        return out

# In[] 获取数据
data=get_data()
X,Y=get_X_Y(data)
X_train,Y_train,X_test,Y_test=split_data(X,Y)
# In[] 标准化数据
xn=mapminmax(X_train)
x_train=xn.apply(X_train)
yn=mapminmax(Y_train)
y_train=yn.apply(Y_train)

# In[] 转成tensor格式
x_train=torch.from_numpy(x_train).unsqueeze(1)
x_train = torch.as_tensor(x_train, dtype=torch.float32)
y_train=torch.from_numpy(y_train)
y_train = torch.as_tensor(y_train, dtype=torch.float32)


# In[] 参数设置
epoches=500 # 迭代次数
batches=5 # 每次批处理数目
number_of_batches=int(np.ceil(x_train.shape[0]/batches)) # 训练组被划分成多少份
lr=np.linspace(0.01,0.0001,epoches) # 学习率
    

# In[] 生成网络
# 导入网络
net=GRU()
# 损失函数
loss_fn=nn.MSELoss()
# 优化器
optimizer=optim.Adam(net.parameters(),lr=lr[0])

# In[] 训练网络
for iter in range(epoches):
    for n_batch in range(number_of_batches): # 第n_batch组训练
        optimizer.param_groups[0]['lr']=lr[iter] # 修改lr学习率
        start=n_batch*batches # 本组开始位置
        end=start+batches # 本组结束位置
        x1=x_train[start:end] # 本组输入
        y1=y_train[start:end] # 本组输出
        net1=net(x1) # 向前传输计算网络的输出值
        loss=loss_fn(net1,y1) # 计算损失值
        optimizer.zero_grad() # 梯度归0
        loss.backward() # 反向传输
        optimizer.step() # 更新网络的参数
    # 计算训练组的整体误差值
    if iter%5==0:
        net1=net(x_train)
        loss=loss_fn(net1,y_train)
        print('第 %d次迭代 损失函数值：%.10f'%(iter,loss.item()))
        
        
        
# In[ ] 使用网络对训练组数据进行预测
y_train=net(x_train).detach().numpy()
# 预测值
y_train=yn.reversed(y_train)
# 计算误差
err_train=y_train-Y_train
# 误差率
err_ratio_train=err_train/Y_train
# 
# mae
mae_train=np.mean(np.abs(err_train))
print('\nGRU 训练组 mae: %.5f '%(mae_train))
# rmse
rmse_train=np.sqrt(np.mean(err_train**2))
print('\nGRU 训练组 rmse: %.5f '%(rmse_train))
# mape
mape_train=np.mean(np.abs(err_train)/Y_train)
print('\nGRU 训练组 mape: %.5f '%(mape_train))
# Dstat
Dstat_train=np.mean((y_train[1:]-Y_train[0:-1])*(Y_train[1:]-Y_train[0:-1])>=0)*100
print('\nGRU 训练组 Dstat: %.5f %% '%(Dstat_train))


# In[ ] 使用网络对 预测组数据进行预测
# 
x_test=xn.apply(X_test)
x_test=torch.from_numpy(x_test).unsqueeze(1)
x_test = torch.as_tensor(x_test, dtype=torch.float32)
#
y_test=net(x_test).detach().numpy()
# 预测值
y_test=yn.reversed(y_test)
# 计算误差
err_test=y_test-Y_test
# 误差率
err_ratio_test=err_test/Y_test
# 
mae_test=np.mean(np.abs(err_test))
print('\nGRU 预测组 mae: %.5f '%(mae_test))
# rmse
rmse_test=np.sqrt(np.mean(err_test**2))
print('\nGRU 预测组 rmse: %.5f '%(rmse_test))
# mape
mape_test=np.mean(np.abs(err_test)/Y_test)
print('\nGRU 预测组 mape: %.5f '%(mape_test))
# Dstat
Dstat_test=np.mean((y_test[1:]-Y_test[0:-1])*(Y_test[1:]-Y_test[0:-1])>=0)*100
print('\nGRU 预测组 Dstat: %.5f %% '%(Dstat_test))


# In[] 保存数据
# 
workbook = xlwt.Workbook(encoding='utf-8')
booksheet = workbook.add_sheet('train_data', cell_overwrite_ok=True)
booksheet.write(0,0,'real_data')
booksheet.write(0,1,'cal_data')
for i in range(y_train.shape[0]):
    booksheet.write(i+1,0,Y_train[i,0])
    booksheet.write(i+1,1,y_train[i,0])
 
booksheet = workbook.add_sheet('test_data', cell_overwrite_ok=True)
booksheet.write(0,0,'real_data')
booksheet.write(0,1,'cal_data')
for i in range(y_test.shape[0]):
    booksheet.write(i+1,0,Y_test[i,0])
    booksheet.write(i+1,1,y_test[i,0])
    
workbook.save('GRU16.xls')
