In [1]:
from google.colab import drive
drive.mount('/content/drive')

ValueError: mount failed

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

In [None]:
# 再現性確保

import os
import random
import numpy as np
import tensorflow as tf

os.environ['PYTHONHASHSEED'] = '42'
os.environ['TF_DETERMINISTIC_OPS'] = '1'

random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)


In [None]:
# データクリーニング

'''

%cd /content/drive/MyDrive/assignment-main(new)/Trainee/time-series-prediction

df = pd.read_csv('/content/drive/MyDrive/assignment-main(new)/Trainee/time-series-prediction/stock_price.csv')

def parse_volume(s):
  if 'B' in s:
    return float(s.replace('B', '')) * 1e9
  elif 'M' in s:
    return float(s.replace('M', '')) * 1e6
  else:
    return float(s)

def clean(df):
  df_new = df.copy()
  df_new = df_new[::-1]
  df_new = df_new.rename(columns={'日付け': 'date', '終値': 'close', '始値': 'open', '高値': 'high', '安値': 'low', '出来高': 'volume', '変化率 %': 'return'})
  df_new['date'] = pd.to_datetime(df_new['date'])
  df_new = df_new.set_index('date')
  df_new['volume'] = df_new['volume'].apply(parse_volume)
  df_new['return'] = df_new['return'].str.replace('%', '').astype(float) / 100
  return df_new

cleaned_df = clean(df)


from google.colab import files
cleaned_df.to_csv('my_cleaned_stock_price.csv')
files.download('my_cleaned_stock_price.csv')

'''

In [None]:
df = pd.read_csv('/content/drive/MyDrive/assignment-main(new)/Trainee/time-series-prediction/my_cleaned_stock_price.csv')
df

In [None]:
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
df

In [None]:
import holidays

# 終値からRSIを計算
def compute_rsi(data, window=14):
    delta = data['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

def engineer(df_old):

  df_new = df_old.copy()

  df_new['sin_dayofyear'] = np.sin(2 * np.pi * df_new.index.dayofyear / 365)
  df_new['cos_dayofyear'] = np.cos(2 * np.pi * df_new.index.dayofyear / 365)
  df_new['sin_dayofmonth'] = np.sin(2 * np.pi * df_new.index.day / 30)
  df_new['cos_dayofmonth'] = np.cos(2 * np.pi * df_new.index.day / 30)
  df_new['sin_dayofweek'] = np.sin(2 * np.pi * df_new.index.dayofweek / 7)
  df_new['cos_dayofweek'] = np.sin(2 * np.pi * df_new.index.dayofweek / 7)

  # 日本の休日
  jp_holidays = holidays.Japan()
  df_new['holiday'] = df_new.index.map(lambda x: 1 if x in jp_holidays else 0)

  # 移動平均、相対力指数
  sma_list = [5, 10, 20, 60]
  for i in sma_list:
    df_new[f'sma_{i}'] = df_new['close'].rolling(window=i).mean()
    df_new[f'ema_{i}'] = df_new['close'].ewm(span=i, adjust=False).mean()
    df_new[f'rsi_{i}'] = compute_rsi(df_new, window=i)

  # 標準偏差
  for i in sma_list:
    df_new[f'std_{i}'] = df_new['close'].rolling(window=i).std()

  # 急激な上昇をとらえる
  for i in range(len(sma_list)):
    for j in range(i+1, len(sma_list)):
      df_new[f'sma_{sma_list[i]}vs{sma_list[j]}'] = df_new[f'sma_{sma_list[i]}'] / df_new[f'sma_{sma_list[j]}']
      # ゴールデンクロス
      df_new[f'gc_{sma_list[i]}vs{sma_list[j]}'] = (df_new[f'sma_{sma_list[i]}'].shift(1) < df_new[f'sma_{sma_list[j]}'].shift(1)) & (df_new[f'sma_{sma_list[i]}'] >= df_new[f'sma_{sma_list[j]}'])
      # デッドクロス
      df_new[f'dc_{sma_list[i]}vs{sma_list[j]}'] = (df_new[f'sma_{sma_list[i]}'].shift(1) > df_new[f'sma_{sma_list[j]}'].shift(1)) & (df_new[f'sma_{sma_list[i]}'] <= df_new[f'sma_{sma_list[j]}'])
      # MACD
      df_new[f'macd_{sma_list[i]}vs{sma_list[j]}'] = df_new[f'ema_{sma_list[i]}'] - df_new[f'ema_{sma_list[j]}']
      df_new[f'macd_signal_{sma_list[i]}vs{sma_list[j]}'] = df_new[f'macd_{sma_list[i]}vs{sma_list[j]}'].ewm(span=9, adjust=False).mean()

  # ADX
  epsilon = 1e-6
  df_new['+DM'] = df_new['high'] - df_new['high'].shift(1)
  df_new['-DM'] = df_new['low'].shift(1) - df_new['low']
  df_new['TR'] = np.maximum.reduce([
    df_new['high'] - df_new['low'],
    abs(df_new['high'] - df_new['close'].shift(1)),
    abs(df_new['low'] - df_new['close'].shift(1))
    ])
  df_new['+DI'] = df_new['+DM'].rolling(window=14).sum() / (df_new['TR'].rolling(window=14).sum() + epsilon) * 100
  df_new['-DI'] = df_new['-DM'].rolling(window=14).sum() / (df_new['TR'].rolling(window=14).sum() + epsilon) * 100
  df_new['DX'] = abs((df_new['+DI'] - df_new['-DI']) / (df_new['+DI'] + df_new['-DI'] + epsilon)) * 100
  df_new['ADX'] = df_new['DX'].rolling(window=14).mean()

  # ストキャスティクス
  df_new['%K'] = (df_new['close'] - df_new['low'].rolling(window=14).min()) / (df_new['high'].rolling(window=14).max() - df_new['low'].rolling(window=14).min()) * 100
  df_new['%D'] = df_new['%K'].rolling(window=3).mean()


  #df_new = df_new.drop(['open', 'high', 'low'], axis=1)

  return df_new

engineered_df = engineer(df)
engineered_df

In [None]:
df_new = engineered_df['2020-01-01':]
train = df_new[:'2022-12-31']
embargo_train_valid = df_new['2023-01-01':'2023-03-31']
valid = df_new['2023-01-01':'2023-12-31']
embargo_valid_test = df_new['2024-01-01':'2024-06-30']
test = df_new['2024-07-01':'2024-07-31']

In [None]:
input_seq_len = min(60, len(embargo_valid_test))
output_seq_len = len(test)

In [None]:
def create_dataset(df, input_seq_len=60, output_seq_len=22):
  data = df.values
  X, y = [], []
  if len(df) - input_seq_len - output_seq_len < 0:
    raise ValueError('input_seq_len or output_seq_len is too large')
  for i in range(len(df) - input_seq_len - output_seq_len):
    X.append(data[i:i+input_seq_len])
    y.append(data[i+input_seq_len:i+input_seq_len+output_seq_len, 0])
  return np.array(X), np.array(y)

X_train, y_train = create_dataset(train, input_seq_len=input_seq_len, output_seq_len=output_seq_len)
X_valid, y_valid = create_dataset(valid, input_seq_len=input_seq_len, output_seq_len=output_seq_len)

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

X_scaler = StandardScaler()
X_train_scaled = X_scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_valid_scaled = X_scaler.transform(X_valid.reshape(-1, X_valid.shape[-1])).reshape(X_valid.shape)

y_scaler = StandardScaler()
y_train_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1)).reshape(y_train.shape)
y_valid_scaled = y_scaler.transform(y_valid.reshape(-1, 1)).reshape(y_valid.shape)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow.keras.initializers import GlorotUniform
from tensorflow.keras.optimizers.schedules import CosineDecay
from tensorflow.keras import regularizers, layers, models, Input
from tensorflow.keras.callbacks import ReduceLROnPlateau
import tensorflow.keras.backend as K

In [None]:
# LSTM


dropout_rate = 0.2

lstm_model = Sequential()
lstm_model.add(LSTM(256, input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]), return_sequences=True))
lstm_model.add(Dropout(dropout_rate))
lstm_model.add(LSTM(128, return_sequences=True))
lstm_model.add(Dropout(dropout_rate))
lstm_model.add(LSTM(64, return_sequences=False))
lstm_model.add(Dropout(dropout_rate))
lstm_model.add(Dense(output_seq_len))

lstm_model.summary()

# lr_schedule = CosineDecay(initial_learning_rate=0.001, decay_steps = (len(X_train_scaled) // 64) * 1000)
optimizer = Adam(learning_rate=0.001)
lstm_model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
es = EarlyStopping(monitor='val_loss', patience=20, verbose=1, mode='min', restore_best_weights=True)
lstm_model.fit(X_train_scaled, y_train_scaled, epochs=1000, batch_size=64, validation_data=(X_valid_scaled, y_valid_scaled), shuffle=False, callbacks=[es])

print(f'Best Validation Loss: {np.min(lstm_model.history.history["val_loss"]):.4f}')

plt.figure(figsize=(12, 8))
plt.plot(lstm_model.history.history['loss'], label='Train Loss')
plt.plot(lstm_model.history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()


In [None]:
# LSTM + Attention


class Attention(Layer):
  def __init__(self, **kwargs):
    super(Attention, self).__init__(**kwargs)

  def build(self, input_shape):
    self.W = self.add_weight(shape=(input_shape[-1], 1),
                             initializer='glorot_uniform',
                             trainable=True)
    self.b = self.add_weight(shape=(1,),
                         initializer='zeros',
                         trainable=True)
    super(Attention, self).build(input_shape)

  def call(self, x):
    e = K.tanh(K.dot(x, self.W) + self.b)
    a = K.softmax(e, axis=1)
    output = x*a
    return K.sum(output, axis=1)

def create_attention_lstm(input_shape, output_dim):

  inputs = Input(shape=input_shape)

  x = LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))(inputs)
  x = Dropout(0.2)(x)
  x = LSTM(128, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))(x)
  x = Dropout(0.2)(x)
  x = LSTM(64, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))(x)
  x = Dropout(0.2)(x)

  x = Attention()(x)
  # x = Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
  x = Dropout(0.2)(x)

  outputs = Dense(output_dim)(x)
  lstm_attention_model = Model(inputs, outputs)
  return lstm_attention_model

lstm_attention_model = create_attention_lstm(input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]), output_dim=output_seq_len)
lstm_attention_model.summary()
lstm_attention_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])

es = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min', restore_best_weights=True)

history = lstm_attention_model.fit(
    X_train_scaled, y_train_scaled,
    epochs=1000,
    batch_size=64,
    validation_data=(X_valid_scaled, y_valid_scaled),
    shuffle=False,
    callbacks=[es]
    )

print(f'Best Validation Loss: {np.min(history.history["val_loss"]):.4f}')

plt.figure(figsize=(12, 8))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()


In [None]:
# LSTM+Attention2


class Attention(Layer):
  def __init__(self, **kwargs):
    super(Attention, self).__init__(**kwargs)

  def build(self, input_shape):
    self.W = self.add_weight(shape=(input_shape[-1], 1),
                             initializer='glorot_uniform',
                             trainable=True)
    self.b = self.add_weight(shape=(1,),
                         initializer='zeros',
                         trainable=True)
    super(Attention, self).build(input_shape)

  def call(self, x):
    e = K.tanh(K.dot(x, self.W) + self.b)
    a = K.softmax(e, axis=1)
    output = x*a
    return K.sum(output, axis=1)

def create_attention_lstm(input_shape, output_dim):

  inputs = Input(shape=input_shape)

  x1 = LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))(inputs)
  x1 = Dropout(0.2)(x1)
  x1 = Attention()(x1)

  x2 = LSTM(128, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))(inputs)
  x2 = Dropout(0.2)(x2)
  x2 = Attention()(x2)

  x3 = LSTM(64, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))(inputs)
  x3 = Dropout(0.2)(x3)
  x3 = Attention()(x3)

  x = Concatenate()([x1, x2, x3])
  x = Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.001))(x)
  x = Dropout(0.2)(x)

  outputs = Dense(output_dim)(x)
  lstm_attention_model = Model(inputs, outputs)
  return lstm_attention_model

lstm_attention2_model = create_attention_lstm(input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]), output_dim=output_seq_len)
lstm_attention2_model.summary()
lstm_attention2_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])

es = EarlyStopping(monitor='val_loss', patience=50, verbose=1, mode='min', restore_best_weights=True)

history = lstm_attention2_model.fit(
    X_train_scaled, y_train_scaled,
    epochs=1000,
    batch_size=64,
    validation_data=(X_valid_scaled, y_valid_scaled),
    shuffle=False,
    callbacks=[es]
    )

print(f'Best Validation Loss: {np.min(history.history["val_loss"]):.4f}')

plt.figure(figsize=(12, 8))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()


In [None]:
# Transformer

class PositionalEncoding(tf.keras.layers.Layer):
  def __init__(self, embed_dim):
      super().__init__()
      self.embed_dim = embed_dim

  def call(self, x):
      # x.shape[1] はシーケンスの長さ、x.shape[-1] は埋め込み次元数
      pos = tf.cast(tf.range(start=0, limit=tf.shape(x)[1], delta=1), dtype=tf.float32)  # posをfloat32にキャスト
      i = tf.cast(tf.range(start=0, limit=self.embed_dim, delta=1), dtype=tf.float32)  # iもfloat32にキャスト

      # 位置エンコーディングの計算
      angle_rates = 1 / tf.pow(10000., (2 * (i // 2)) / self.embed_dim)
      angles = tf.expand_dims(pos, axis=-1) * angle_rates  # 位置エンコーディング
      cos = tf.math.sin(angles[:, 0::2])  # 奇数番目の次元
      sin = tf.math.cos(angles[:, 1::2])  # 偶数番目の次元
      pos_encoding = tf.concat([sin, cos], axis=-1)

      # 位置エンコーディングを元の入力に加算
      return x + tf.expand_dims(pos_encoding, axis=0)

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.1):
  # Multi-Head Attention
  x = MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs)
  x = Dropout(dropout)(x)
  x = LayerNormalization(epsilon=1e-6)(x + inputs)

  # Feed-Forward部分
  x_ff = Dense(ff_dim, activation='gelu')(x)
  x_ff = Dropout(dropout)(x_ff)
  x_ff = Dense(inputs.shape[-1])(x_ff)
  x = LayerNormalization(epsilon=1e-6)(x + x_ff)
  return x

def build_transformer_model(input_shape, output_dim):
  inputs = Input(shape=input_shape)

  # Position Encoding
  embed_dim = 256
  x = Dense(embed_dim)(inputs)
  x = Dropout(0.2)(x)
  x = PositionalEncoding(embed_dim)(x)

  for _ in range(2):
    x = transformer_encoder(x, head_size=64, num_heads=4, ff_dim=256, dropout=0.2)

  x = GlobalAveragePooling1D()(x)

  x = Dense(64, activation='gelu')(x)
  x = Dropout(0.2)(x)

  outputs = Dense(output_dim)(x)

  model = models.Model(inputs, outputs)
  return model

transformer_model = build_transformer_model(
    input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]),
    output_dim=output_seq_len
)
transformer_model.summary()

transformer_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])

es = EarlyStopping(monitor='val_loss', patience=50, verbose=1, mode='min', restore_best_weights=True)
lr_schedule = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=50, verbose=1, min_lr=1e-5)

history = transformer_model.fit(
    X_train_scaled, y_train_scaled,
    epochs=1000,
    batch_size=64,
    validation_data=(X_valid_scaled, y_valid_scaled),
    shuffle=False,
    callbacks=[es, lr_schedule]
)

print(f'Best Validation Loss: {np.min(history.history["val_loss"]):.4f}')

plt.figure(figsize=(12, 8))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

In [None]:
X_test = embargo_valid_test[-input_seq_len:].values
X_test_scaled = X_scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

In [None]:
def pred(model, X_test_scaled, y_scaler):
  y_test_pred_scaled = model.predict(X_test_scaled.reshape(1, X_test_scaled.shape[0], X_test_scaled.shape[1]))
  y_test_pred = y_scaler.inverse_transform(y_test_pred_scaled).flatten()
  return y_test_pred

lstm_test_pred = pred(lstm_model, X_test_scaled, y_scaler)
lstm_attention_test_pred = pred(lstm_attention_model, X_test_scaled, y_scaler)
lstm_attention2_test_pred = pred(lstm_attention2_model, X_test_scaled, y_scaler)
transformer_test_pred = pred(transformer_model, X_test_scaled, y_scaler)

ensemble_test_pred = (
    # lstm_test_pred +
    lstm_attention_test_pred +
    lstm_attention2_test_pred +
    transformer_test_pred
) / 3

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error

def evaluate_model(model_name, y_test_pred):
    # モデルがNoneの場合は名前を'Ensemble'に設定

    rmse = np.sqrt(mean_squared_error(test['close'], y_test_pred))
    mae = mean_absolute_error(test['close'], y_test_pred)

    print(f'{model_name} RMSE: {rmse}')
    print(f'{model_name} MAE: {mae}')

# モデルごとに評価
evaluate_model('LSTM', lstm_test_pred)
evaluate_model('LSTM Attention', lstm_attention_test_pred)
evaluate_model('LSTM Attention 2', lstm_attention2_test_pred)
evaluate_model('Transformer', transformer_test_pred)

# アンサンブルモデルを評価
evaluate_model('Ensemble', ensemble_test_pred)


sequential_6 RMSE: 4.78832386723381
sequential_6 MAE: 4.374059660538381
functional_63 RMSE: 3.4191316721960683
functional_63 MAE: 2.99490635084069
functional_67 RMSE: 4.568780446341157
functional_67 MAE: 4.221273405655569
functional_65 RMSE: 5.248340023489538
functional_65 MAE: 4.903860075577444
Ensemble RMSE: 4.78832386723381
Ensemble MAE: 4.374059660538381

In [None]:
plt.figure(figsize=(12, 8))
plt.plot(test.index, lstm_test_pred, label='LSTM')
plt.plot(test.index, lstm_attention_test_pred, label='LSTM + Attention')
plt.plot(test.index, lstm_attention2_test_pred, label='LSTM + Attention 2')
plt.plot(test.index, transformer_test_pred, label='Transformer')
plt.plot(test.index, ensemble_test_pred, label='Ensemble')
plt.plot(df_new.index, df_new['close'], label='Actual')
plt.title('Actual vs Predicted')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid()
plt.show()

In [None]:
plt.figure(figsize=(12, 8))
plt.plot(test.index, lstm_test_pred, label='LSTM')
plt.plot(test.index, lstm_attention_test_pred, label='LSTM + Attention')
plt.plot(test.index, lstm_attention2_test_pred, label='LSTM + Attention 2')
plt.plot(test.index, transformer_test_pred, label='Transformer')
plt.plot(test.index, ensemble_test_pred, label='Ensemble')
plt.plot(test.index, test['close'], label='Actual')
plt.title('Actual vs Predicted')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid()
plt.show()

In [None]:
lstm_test_pred

In [None]:
lstm_attention_test_pred