In [None]:
# 参考: https://qiita.com/tsubauaaa/items/8411a22465811ec2ee11


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
%matplotlib inline


In [None]:
data1 = pd.read_csv('../../../dataset/lstm_torch/pcr_positive_daily.csv')
data2 = pd.read_csv('../../../dataset/lstm_torch/pcr_tested_daily.csv')

data = pd.merge(data1,data2)
print(data.shape)
print(data.tail(5))

In [None]:
len_data = data.shape[0]

In [None]:
covid19_data = data[['PCR 検査陽性者数(単日)', 'PCR 検査実施件数(単日)']]

In [None]:
fig, (axL, axR) = plt.subplots(ncols=2, figsize=(20,5))

axL.plot(covid19_data['PCR 検査陽性者数(単日)'], linewidth=2)
axL.set_title('Number of PCR Positive')
axL.set_xlabel('2020/1/16-11/30')
axL.set_ylabel('Number of people')
axL.grid(True)

axR.plot(covid19_data['PCR 検査実施件数(単日)'], linewidth=2)
axR.set_title('Number of PCR Inspections')
axR.set_xlabel('2020/1/16-11/30')
axR.set_ylabel('Number of inspections')
axR.grid(True)

fig.show()


In [None]:
covid19_data = covid19_data.values.astype(float)
covid19_data.shape

In [None]:
# 直近30日をテストデータにする
test_data_size = 30

train_data = covid19_data[:-test_data_size]
test_data = covid19_data[-test_data_size:]


In [None]:
# データセットの正規化を行う。最小値0と最大値1の範囲で行う。
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range=(0, 1))
train_data_normalized = scaler.fit_transform(train_data)

# Tensor型に変換
train_data_normalized = torch.FloatTensor(train_data_normalized)

In [None]:
# シーケンスに沿ったデータを作成する関数
def make_sequence_data(input_data, num_sequence):
    # シーケンスデータとラベルデータの器
    data = []
    # 全体のデータ数取得
    num_data = len(input_data)
    # 全体からシーケンス分引いた数までループする
    for i in range(num_data - num_sequence):
        # 1個ずらして、シーケンス分のデータを取得していく
        seq_data = input_data[i:i+num_sequence]
        # シーケンスの次の要素のデータ(ラベルデータとして1個目の陽性者数のみ)を取得していく
        target_data = input_data[:,0][i+num_sequence:i+num_sequence+1]
        # シーケンスデータとラベルデータをタプルとして取得していく
        data.append((seq_data, target_data))

    return data


In [None]:
# シーケンス長は1ヶ月分30日とする
seq_length = 30
# train_seq_data=最初のデータを1個ずらしてシーケンス分のデータ(時系列の学習データ群)、train_target=train_seq_dataの次のデータ(ラベルデータ)
train_seq_data = make_sequence_data(train_data_normalized, seq_length)


In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size=2, hidden_layer_size=100, output_size=1):
        super().__init__()
        self.hidden_layer_size = hidden_layer_size

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_layer_size, batch_first=True)

        self.linear = nn.Linear(in_features=hidden_layer_size, out_features=output_size)

    def forward(self, x):
        # LSTMのinputは(batch_size, seq_len, input_size)にする
        # LSTMのoutputは(batch_size, seq_len, hidden_layer_size)となる
        # hidden stateとcell stateにはNoneを渡して0ベクトルを渡す
        lstm_out, (hn, cn) = self.lstm(x, None)
        # Linearのinputは(N,∗,in_features)にする
        # lstm_out(batch_size, seq_len, hidden_layer_size)のseq_len方向の最後の値をLinearに入力する
        prediction = self.linear(lstm_out[:, -1, :])
        return prediction

In [None]:
model = LSTM()


In [None]:
# 損失関数と最適化関数を定義
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
epochs = 5
losses = []
for i in range(epochs):
    for seq, labels in train_seq_data:
        # seq, labelsのshapeは(seq_length, 特徴量)なのでLSTMに渡すために(batch, seq_length, 特徴量)にする。(batch=1)
        seq, labels = torch.unsqueeze(seq, 0), torch.unsqueeze(labels, 0)
        optimizer.zero_grad()
        y_pred = model(seq)
        single_loss = criterion(y_pred, labels)
        single_loss.backward()
        optimizer.step()
    losses.append(single_loss.item())
    print(f'epoch: {i}, loss : {single_loss.item()}')


In [None]:
plt.plot(losses)

In [None]:
# 予測する日数
pred_days = 30
# テストデータの正規化を行う。最小値0と最大値1の範囲で行ってTensor型に変換する。
test_data_normalized = scaler.fit_transform(test_data)
test_data_normalized = torch.FloatTensor(test_data_normalized)
# 予測するためのデータの最初のseq_length分はtrain_dataを使う
test_inputs = train_data_normalized[-seq_length:].tolist()


In [None]:
# モデルを評価モードとする
model.eval()
# 予測値を入れるリスト
test_outputs = []
for i in range(pred_days):
    seq = torch.FloatTensor(test_inputs[-seq_length:])
    seq = torch.unsqueeze(seq, 0)
    with torch.no_grad():
        test_inputs.append(test_data_normalized.tolist()[i])
        test_outputs.append(model(seq).item())
        

In [None]:
np_test_outputs = np.array(test_outputs).reshape(-1,1)
# 列方向に同じ値を追加して(30, 3)にする
np_test_outputs2 = np.hstack((np_test_outputs, np_test_outputs))
actual_predictions = scaler.inverse_transform(np_test_outputs2)

In [None]:
x = np.arange(len_data-pred_days, len_data, 1)
print(x)

In [None]:
fig_size = plt.rcParams['figure.figsize']
fig_size[0] = 10
fig_size[1] = 5
plt.rcParams['figure.figsize'] = fig_size

In [None]:
plt.title('Number of PCR Positives')
plt.ylabel('Number of people')
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(data['PCR 検査陽性者数(単日)'], label='Ground Truth')
plt.plot(x, actual_predictions[:,0], label='Prediction')
plt.xlabel('2020/1/16 - 11/30')
plt.legend()
plt.show()

In [None]:
plt.title('Number of PCR Positives')
plt.ylabel('Number of people')
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(x, data['PCR 検査陽性者数(単日)'][-1*pred_days:], label='Ground Truth')
plt.plot(x, actual_predictions[:,0], label='Prediction')
plt.xlabel('2020/10/31 - 11/30')
plt.legend()
plt.show()