In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from math import sqrt
import torch.nn as nn
import torch.optim as optim

In [None]:
df = pd.read_csv('stock_price.csv')

df.head()

In [None]:
df.info()

In [4]:
df['日付け'] = pd.to_datetime(df['日付け'])

In [None]:
df = df[['日付け', '始値', '高値', '安値', '終値', '出来高', '変化率 %']].rename(columns={
    '日付け': 'Date',
    '始値': 'Open',
    '高値': 'High',
    '安値': 'Low',
    '終値': 'Close',
    '出来高': 'Volume',
    '変化率 %': 'Change',
})

df.head(10)

In [None]:
df.info()

In [140]:
# VolumeとChangeの全体を確認する用

# Volumeカラムのデータを数値に変換
# def convert_volume(value):
#     if 'M' in value:
#         return float(value.replace('M', '')) * 1
#     elif 'B' in value:
#         return float(value.replace('B', '')) * 1000
#     return float(value)

# df['Volume'] = df['Volume'].apply(convert_volume)

# Changeカラムのデータを数値に変換
# def convert_change(change):
#     return float(change.replace('%', ''))

# df['Change'] = df['Change'].apply(convert_change)

# df[['Date', 'Change']].plot(x='Date', kind='line')
# plt.show()

In [None]:
# データ全体の確認
df[['Date','Open', 'High', 'Low','Close']].plot(x='Date', kind='line')
plt.show()

In [None]:
# 一目均衡表の計算
df['Tenkan-sen'] = (df['High'].rolling(window=9).max() + df['Low'].rolling(window=9).min()) / 2
df['Kijun-sen'] = (df['High'].rolling(window=26).max() + df['Low'].rolling(window=26).min()) / 2
df['Senkou Span A'] = ((df['Tenkan-sen'] +df['Kijun-sen']) / 2).shift(26)
df['Senkou Span B'] = (df['High'].rolling(window=52).max() + df['Low'].rolling(window=52).min()) / 2
df['Chikou Span'] = df['Close'].shift(-26)

df.head()

In [None]:
df = df[['Date', 'High', 'Low', 'Open', 'Close', 'Tenkan-sen', 'Kijun-sen', 'Senkou Span A', 'Senkou Span B', 'Chikou Span']]
# 特徴量間のヒートマップの表示
sns.heatmap(df.corr(), vmin=-1, vmax=1, annot=True, cmap="rocket_r")
plt.show()

In [None]:
# Dataカラムの削除とNaNデータの排除
feature = df.drop(columns=['Date'])
feature = feature.iloc[51:-26].reset_index(drop=True)
target = feature['Close']
feature.tail()

In [11]:
# 正規化の範囲を決める

# # スケーリングを(0, 1)の範囲で行う
# scaler_features = MinMaxScaler()
# scaled_features = scaler_features.fit_transform(feature)
# scaler_target = MinMaxScaler()
# scaled_target = scaler_target.fit_transform(target.values.reshape(-1, 1))

# スケーリングを(-1, 1)の範囲で行う
scaler_features = MinMaxScaler(feature_range=(-0.5, 0.5))
scaled_features = scaler_features.fit_transform(feature)
scaler_target = MinMaxScaler(feature_range=(-0.5, 0.5))
scaled_target = scaler_target.fit_transform(target.values.reshape(-1, 1))

In [12]:
# データのシーケンス化
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length, :-1])
        y.append(data[i + seq_length, -1])
    return np.array(X), np.array(y)

time_step = 10
scaled_data = np.hstack((scaled_features, scaled_target))
X, y = create_sequences(scaled_data, time_step)    # X:feature y:target

In [13]:
# 訓練データとテストデータの分割
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

In [14]:
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

In [15]:
# LSTMモデルの定義
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_layer_size=50, num_layers=1, output_size=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_layer_size, output_size)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        x = lstm_out[:, -1, :] 
        x = self.linear(x)
        return x
    
# モデル、損失関数、最適化手法の定義
input_size = X_train.shape[2]
model = LSTMModel(input_size)
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# トレーニングパラメータの設定
epochs = 200
batch_size = 64
losses = []

In [16]:
# トレーニングデータのロード
train_loader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)

In [None]:
# モデルのトレーニング
for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        y_pred = model(X_batch)
        loss = loss_function(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    
    epoch_loss /= len(train_loader)
    losses.append(epoch_loss)
    if epoch % 10 == 0:
        print(f'Epoch {epoch+1}, Loss: {epoch_loss}')

# トレーニング損失のプロット
plt.figure(figsize=(10, 5))
plt.plot(range(epochs), losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.show()

In [None]:
# テストデータでの評価
model.eval()
with torch.no_grad():
    y_pred = model(X_test)

# スケールを元に戻す
y_pred_scaled = y_pred.numpy().reshape(-1, 1)
y_test_scaled = y_test.numpy().reshape(-1, 1)

y_pred_inverse = scaler_target.inverse_transform(y_pred_scaled)
y_test_inverse = scaler_target.inverse_transform(y_test_scaled)

# テスト損失を計算
test_loss = np.mean((y_pred_inverse - y_test_inverse) ** 2)
print(f'Test Loss: {test_loss}')

# # R^2を計算
# lstm_r2 = r2_score(y_test_inverse[1:], y_pred_inverse[:-1])
# print(f'LSTM Model R^2: {lstm_r2:.2f}')

# RMSEを計算
rmse = sqrt(mean_squared_error(y_test_inverse, y_pred_inverse))
print(f'Test RMSE: {rmse:.2f}')

# 実際の価格の変動方向と予測された価格の変動方向を計算
actual_direction = np.sign(np.diff(y_test_inverse, axis=0))
predicted_direction = np.sign(np.diff(y_pred_inverse, axis=0))

# 変動の正解率を計算
accuracy = np.mean(actual_direction == predicted_direction) * 100
print(f'Accuracy of predicting the direction of stock price change: {accuracy:.2f}%')

# 結果のプロット
plt.figure(figsize=(14, 5))
# plt.xlim(1450, 1850)  #わかりやすくするときに外す
# plt.ylim(190, 320)   #わかりやすくするときに外す
plt.plot(range(len(y_test_inverse)), y_test_inverse, label='True Close')
plt.plot(range(len(y_pred_inverse)), y_pred_inverse, label='Predicted Close')
plt.xlabel('Index')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
# 以下ベースラインモデルの評価

# 前日のClose価格を予測値とする
baseline_pred = y_test_inverse[:-1]
baseline_actual = y_test_inverse[1:]

# ベースラインモデルのRMSEを計算
baseline_rmse = sqrt(mean_squared_error(baseline_actual, baseline_pred))
print(f'Baseline Model RMSE: {baseline_rmse:.2f}')

# ベースラインモデルの変動予測精度を計算
baseline_actual_direction = np.sign(np.diff(baseline_actual, axis=0))
baseline_predicted_direction = np.sign(np.diff(baseline_pred, axis=0))

# ベースラインモデルの正解率を計算
baseline_accuracy = np.mean(baseline_actual_direction == baseline_predicted_direction) * 100
print(f'Baseline Model Accuracy: {baseline_accuracy:.2f}%')

# 結果のプロット
plt.figure(figsize=(14, 5))
plt.plot(range(len(baseline_actual)), baseline_actual, label='True Close')
plt.plot(range(len(baseline_pred)), baseline_pred, label='Baseline Predicted Close')
plt.xlabel('Index')
plt.ylabel('Price')
plt.legend()
plt.show()