# RNN/LSTMハンズオン
日経平均を使った時系列データ予測にチャレンジ

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

import matplotlib
import matplotlib.pyplot as plt
import scipy
import numpy as np
import pandas as pd
import tensorflow as tf
import sklearn.preprocessing as sp
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Activation, SimpleRNN, GRU, LSTM
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping


# 描画を綺麗に表示する
from matplotlib.pylab import rcParams
import seaborn as sns
rcParams['figure.figsize'] = 15, 6

print('matplotlib version :', matplotlib.__version__)
print('scipy version :', scipy.__version__)
print('numpy version :', np.__version__)
print('tensorflow version : ', tf.__version__)
print('keras version : ', keras.__version__)


日経新聞の以下のWebページから「日経平均株価」の「日次データ」をダウンロードする。
https://indexes.nikkei.co.jp/nkave/index?type=download

ダウンロードしたCSVファイル（nikkei_stock_average_daily_jp.csv）を左側のウインドウにドラッグアンドドロップし、アップロードする。

続いてファイル内容の確認を行う。

In [None]:
%%bash
head nikkei_stock_average_daily_jp.csv

In [None]:
#文字エンコーディングが対応していないため、タイトル行が文字化けしてしまっているので、エンコードを変換。
import codecs

sf = codecs.open('nikkei_stock_average_daily_jp.csv', 'r', encoding='shift-jis')
uf = codecs.open('nikkei_utf8.csv', 'w', encoding='utf-8')
for line in sf:
    uf.write(line)
sf.close()
uf.close()

修正結果の確認

In [None]:
%%bash
head nikkei_utf8.csv

In [None]:
df = pd.read_csv('nikkei_utf8.csv')

In [None]:
df.head()

In [None]:
#カラム名を英名に変更
df.set_axis(["date", "close", "open", "high", "low"], axis="columns", inplace=True)

In [None]:
df.tail()

In [None]:
#最終行に不要データがあるため削除。
df.drop(index=df.index[-1], inplace=True)

In [None]:
df.tail()

In [None]:
#全データの再確認
print(df)
print(df.describe())

In [None]:
#データの正規化
df['close_norm'] = sp.minmax_scale(df['close'])
print(df.describe())

In [None]:
plt.plot(df['close_norm'])

In [None]:
#訓練、テストデータの作成

#データセットのサイズを決める。今回は10個のデータから次の１個を予測するためのデータセットを作る。
maxlen = 10
X, Y = [], []
for i in range(len(df) - maxlen):
    X.append(df[['close_norm']].iloc[i:(i+maxlen)].values)
    Y.append(df[['close_norm']].iloc[i+maxlen].values)
X=np.array(X)
Y=np.array(Y)

In [None]:
print(X.shape)
print(Y.shape)

In [None]:
print(X[0])
print(Y[0])

In [None]:
print(X[1])
print(Y[1])

In [None]:
# 訓練用のデータと、テスト用のデータに分ける
N_train = int(len(df) * 0.8)
N_test = len(df) - N_train
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=N_test, shuffle = False) 

In [None]:
# 隠れ層の数などを定義: 隠れ層の数が大きいほど精度が上がる?
n_in = 1 # len(X[0][0])
n_out = 1 # len(Y[0])
n_hidden = 500
epochs = 30

#モデル作成 (Kerasのフレームワークで簡易に記載できる)
model = Sequential()
model.add(SimpleRNN(n_hidden, activation="tanh", 
                    batch_input_shape=(None, maxlen, n_in)))
model.add(Dense(n_in, kernel_initializer='random_uniform'))
model.add(Activation("linear"))

model.compile(optimizer='sgd',                 # 最適化手法
              loss='mean_squared_error')       # 損失関数

hist = model.fit(X_train, y_train, batch_size=maxlen, epochs=epochs, verbose=1)

In [None]:
# 損失のグラフ化（モデルがどれだけ予測出来ていないかの指標。０に近づくほどモデルの精度が良い。
# ※推定を行う場合はAccuracyではなく、Lossでみる。
loss = hist.history['loss']
epochs = len(loss)
plt.rc('font', family='serif')
fig = plt.figure()
fig.patch.set_facecolor('white')
plt.plot(range(epochs), loss, marker='.', label='loss(training data)')
plt.show()

In [None]:
# 予測結果
predicted = model.predict(X_test)
result = pd.DataFrame(predicted)
result.columns = ['predict']
result['actual'] = y_test

#正規化していた数値を金額に再変換する
result['predict']=((df['close'].max()-df['close'].min())*result['predict'])+df['close'].min()
result['actual']=((df['close'].max()-df['close'].min())*result['actual'])+df['close'].min()
result.plot()
plt.show()

#平均値でオフセット
#result['predict']=result['predict']-(result['predict'].mean()-result['actual'].mean())

date=np.array(df.loc[len(df)-len(X_test):len(df),['date']])
result['date']=np.array(date)

result.loc[:,['date','actual','predict']]

大事なのは値が推定できているかではない。

儲かるかどうかだ！

結局儲かるの？

In [None]:
#売買シミュレーションを作る
bitprice=np.array(df.loc[len(df)-len(X_test):len(df),['open']])
result['bitprice']=np.array(bitprice)
result['Judge'] = False
result['Rev'] = 0
result['Hit'] = False
result['totalRev'] = 0
total_rev=0

# Collumn Ref
# 0:predict 1:actual 2:date 3:bitprice 4:Judge 5:Rev 6:hit 7:totalRev 
for index in range(len(result)-1):
    if (result.iat[index+1, 0]>result.iat[index, 1]):    #当日の実終値よりも明日の予想終値が高い場合
        result.iat[index+1, 4]=True                      #買う判断をする
        result.iat[index+1, 5]=result.iat[index+1, 1]-result.iat[index+1, 3]    #翌日の初値で購入し、終値で売った金額がその日の儲け
        if result.iat[index+1, 5]>0:
            result.iat[index+1, 6]=True                  #モデル評価用に値上がり判断が妥当だったかを判定
        total_rev=total_rev+result.iat[index+1, 5]       #差額を累積利益に算入
    result.iat[index+1, 7]=total_rev                     #累積利益をresult表に追加

In [None]:
result['Rev'].plot()
result['totalRev'].plot()
plt.show()

result.loc[:,['date','actual','predict','bitprice','Judge','Rev','Hit','totalRev']]

In [None]:
#売買シミュレーション結果まとめ
judgeTrue = result['Judge'] == True
hitTrue = result['Hit'] == True

print("判断回数: "+str(len(result)))
print("取引回数: "+str(judgeTrue.sum()))
print("利益獲得回数: "+str(hitTrue.sum()))
print("勝率:　"+str(hitTrue.sum()/judgeTrue.sum()))
print("最終利益： ￥"+str(total_rev)+"/単元")


## __RNN以外の時系列モデルを使ってみる。（おまけ）__
RNNより新しいLSTMモデルでのトライアル。

In [None]:
# 隠れ層の数などを定義: 隠れ層の数が大きいほど精度が上がる?
n_in = 1 # len(X[0][0])
n_out = 1 # len(Y[0])
n_hidden = 500

#モデル作成 (Kerasのフレームワークで簡易に記載できる)
model = Sequential()
model.add(LSTM(n_hidden,
               batch_input_shape=(None, maxlen, n_in),
               kernel_initializer='random_uniform',
               return_sequences=False))
model.add(Dense(n_in, kernel_initializer='random_uniform'))
#model.add(Activation("linear"))

#opt = Adam(lr=0.001, beta_1=0.9, beta_2=0.999)
opt = Adam()                                   # 最適化手法の設定
model.compile(optimizer = opt,                 # 最適化手法
              loss = 'mean_squared_error',     # 損失関数
              )

early_stopping = EarlyStopping(monitor='loss', patience=10, verbose=1)
hist = model.fit(X_train, y_train, batch_size=maxlen, epochs=10,
                 callbacks=[early_stopping])

In [None]:
# 損失のグラフ化（モデルがどれだけ予測出来ていないかの指標。０に近づくほどモデルの精度が良い。
# ※推定を行う場合はAccuracyではなく、Lossでみる。
loss = hist.history['loss']
epochs = len(loss)
plt.rc('font', family='serif')
fig = plt.figure()
fig.patch.set_facecolor('white')
plt.plot(range(epochs), loss, marker='.', label='loss(training data)')
plt.show()

In [None]:
# 予測結果
predicted = model.predict(X_test)
result = pd.DataFrame(predicted)
result.columns = ['predict']
result['actual'] = y_test

#正規化していた数値を金額に再変換する
result['predict']=((df['close'].max()-df['close'].min())*result['predict'])+df['close'].min()
result['actual']=((df['close'].max()-df['close'].min())*result['actual'])+df['close'].min()

#平均値でオフセット
#result['predict']=result['predict']-(result['predict'].mean()-result['actual'].mean())

result.plot()
plt.show()

date=np.array(df.loc[len(df)-len(X_test):len(df),['date']])
result['date']=np.array(date)

result.loc[:,['date','actual','predict']]

In [None]:
#売買シミュレーションを作る
bitprice=np.array(df.loc[len(df)-len(X_test):len(df),['open']])
result['bitprice']=np.array(bitprice)
result['Judge'] = False
result['Rev'] = 0
result['Hit'] = False
result['totalRev'] = 0
total_rev=0

# Collumn Ref
# 0:predict 1:actual 2:date 3:bitprice 4:Judge 5:Rev 6:hit 7:totalRev 
for index in range(len(result)-1):
    if (result.iat[index+1, 0]>result.iat[index, 1]):    #当日の実終値よりも明日の予想終値が高い場合
        result.iat[index+1, 4]=True                      #買う判断をする
        result.iat[index+1, 5]=result.iat[index+1, 1]-result.iat[index+1, 3]    #翌日の初値で購入し、終値で売った金額がその日の儲け
        if result.iat[index+1, 5]>0:
            result.iat[index+1, 6]=True                  #モデル評価用に値上がり判断が妥当だったかを判定
        total_rev=total_rev+result.iat[index+1, 5]       #差額を累積利益に算入
    result.iat[index+1, 7]=total_rev                     #累積利益をresult表に追加
    

result['Rev'].plot()
result['totalRev'].plot()
plt.show()

result.loc[:,['date','actual','predict','bitprice','Judge','Rev','Hit','totalRev']]

In [None]:
#売買シミュレーション結果まとめ(LSTM)
judgeTrue = result['Judge'] == True
hitTrue = result['Hit'] == True

print("判断回数: "+str(len(result)))
print("取引回数: "+str(judgeTrue.sum()))
print("利益獲得回数: "+str(hitTrue.sum()))
print("勝率:　"+str(hitTrue.sum()/judgeTrue.sum()))
print("最終利益： ￥"+str(total_rev)+"/単元")