In [11]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def Slope(X, Y, axis=1):
    sorted_idx = np.argsort(X, axis=axis)
    X = np.take_along_axis(X, sorted_idx, axis=axis)
    Y = np.take_along_axis(Y, sorted_idx, axis=axis)
    if axis==1:
        delta_X = X[:,1:] - X[:,:-1]
        delta_Y = Y[:,1:] - Y[:,:-1]
    if axis==0:
        delta_X = X[1:] - X[:-1]
        delta_Y = Y[1:] - Y[:-1]
    return delta_Y / delta_X

def recovery_curve(x, slope, y0=0.5):
    delta_x = x[1:] - x[:-1]
    delta_y = delta_x*slope
    y = np.array([delta_y[:i].sum() for i in range(1, len(x), 1)]) + y0
    y = np.hstack((np.array([y0]), y))
    return y

IV_data = pd.read_csv('./../../Data/Organized/all/InterpolateData_ChangeNoMuch_0.7.csv', \
                    encoding='Big5', index_col=False)

IV_matrix = np.array(IV_data)
K_num = len(np.where(IV_matrix[0,0] == IV_matrix)[0])
K_closing = np.array(IV_data['履約價(收盤價)'])
K_closing = np.reshape(K_closing, (-1, K_num))
IV_closing = np.array(IV_data['隱含波動率(收盤價)'])
IV_closing = np.reshape(IV_closing, (-1, K_num))
K_IVslope_closing = Slope(X=K_closing, Y=IV_closing, axis=1)

K_settlement = np.array(IV_data['履約價(結算價)'])
K_settlement = np.reshape(K_settlement, (-1, K_num))
IV_settlement = np.array(IV_data['隱含波動率(結算價)'])
IV_settlement = np.reshape(IV_settlement, (-1, K_num))
K_IVslope_settlement = Slope(X=K_settlement, Y=IV_settlement, axis=1)




若$X=[x_1, x_2,\dots x_n     ]$  
則 $TimeSeriesData(X, 30)= [[x_1, x_2,\dots x_{30}],\quad [x_2, x_3,\dots x_{31}]\dots [x_{n-30}, x_{n-29},\dots x_{n-1}]],\quad [x_{31}, x_{32},\dots x_n     ]$  

In [12]:
def TimeSeriesData(X, seq_length):
    X_train = np.zeros((len(X)-seq_length, seq_length, len(X[0])))
    y_train = np.zeros((len(X)-seq_length, len(X[0])))
    for i in range(len(X) - seq_length):
        X_train[i] = X[i:i+seq_length]
        y_train[i] = (X[i+seq_length])
    return np.array(X_train), np.array(y_train)
seq_length = 7
magnification_slope = 10000
IV_matrix_forecast = IV_matrix[seq_length*K_num:]
#收盤價
Inputs_closing_slope, Ouputs_closing = TimeSeriesData(K_IVslope_closing*magnification_slope, seq_length=seq_length)
columns_names_OHLC = ['期貨開盤價', '期貨最高價', '期貨最低價', '期貨收盤價']
column_index_OHLC = [IV_data.columns.get_loc(col) for col in columns_names_OHLC]
Inputs_OHLC = IV_matrix[range(0, len(IV_matrix), K_num)]
Inputs_OHLC = Inputs_OHLC[:,column_index_OHLC]
Inputs_OHLC = np.array([Inputs_OHLC[i:i+seq_length+1] for i in range(len(Inputs_OHLC)-seq_length)])
Inputs_OHLC = Inputs_OHLC.astype(float)

train_size = int(len(Inputs_closing_slope)*0.8)
X_train_closing = Inputs_closing_slope[:train_size]
OHLC_train = Inputs_OHLC[:train_size]
y_train_closing = Ouputs_closing[:train_size]
X_test_closing = Inputs_closing_slope[train_size:]
y_test_closing = Ouputs_closing[train_size:]
IV_matrix_test = IV_matrix_forecast[K_num*train_size:]
OHLC_test = Inputs_OHLC[train_size:]


#結算價
Inputs_settlement, Ouputs_settlement = TimeSeriesData(K_IVslope_settlement*magnification_slope, seq_length=seq_length)
train_size = int(len(Inputs_settlement)*0.8)
X_train_settlement = Inputs_settlement[:train_size]
y_train_settlement = Ouputs_settlement[:train_size]
X_test_settlement = Inputs_settlement[train_size:]
y_test_settlement = Ouputs_settlement[train_size:]


In [13]:
from keras.models import Model
from keras.layers import Input, Dense, LSTM, concatenate
from keras.optimizers import Adam

# 定義第一個ANN模型
input_lstm = Input(shape=X_train_settlement.shape[1:])
hidden1 = LSTM(100, return_sequences=True, activation='relu')(input_lstm)
hidden2 = LSTM(100, activation='relu')(hidden1)
output = Dense(10, activation='relu')(hidden2)
output = Dense(y_train_closing.shape[1])(output)

# 定義模型
model = Model(input_lstm, outputs=output)

# 編譯模型

model.compile(loss='mse', optimizer = 'adam', metrics=['mse'])

# 顯示模型結構
#model.summary()



# 訓練模型
model.fit(X_train_settlement, y_train_settlement, epochs=50, \
          batch_size=3000, validation_split=0.2)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x2032c588640>

In [10]:
model.fit(X_train_closing, y_train_closing, epochs=20, batch_size=3000, validation_split=0.2)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x1cc911fff10>

In [14]:
test_loss = model.evaluate(X_test_closing, y_test_closing)
print("Test Loss:", test_loss)

# 查看測試輸出
y_pred_settlement = model.predict(X_test_settlement)



Test Loss: [3.0155186653137207, 3.0155186653137207]


In [9]:
columns_names_K = ['履約價(收盤價)', '履約價(結算價)']
column_index_K = [IV_data.columns.get_loc(col) for col in columns_names_K]
ForecastIV = np.zeros(((len(y_pred_settlement), K_num)))
for i in range(len(y_pred_settlement)):
    x = IV_matrix_test[i*K_num:(i+1)*K_num, column_index_K[0]]
    v0 = IV_matrix_test[i*K_num,8]
    ForecastIV[i] = recovery_curve(x, y_pred_settlement[i]/(magnification_slope), y0=v0)
ForecastIV = np.reshape(ForecastIV,(-1,1))
Forecast_matrix  = np.hstack((IV_matrix_test, ForecastIV))
column = np.hstack((IV_data.columns.to_numpy(), np.array(['預測隱含波動率(結算價)'])))
Forecast_Data = pd.DataFrame(data=Forecast_matrix, columns=column)
Forecast_Data.to_csv('./../../Data/forecast/ForecastIV_0.7.csv', index=False, encoding='Big5')