In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_percentage_error
import matplotlib.pyplot as plt
from keras import regularizers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from keras.optimizers import Adam
from keras.layers import Dropout
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Lasso
import random

In [None]:
data = pd.read_csv("/kaggle/input/stock-10years/TSLA.csv")

exclude_columns = ['Date']  # 假設欄位名稱是 'date'
data = data.iloc[::-1].reset_index(drop=True)
data.drop(columns=['Date'], inplace=True)
data['time'] = data.index + 1

# 移除 'Change %' 中的 '%' 符號
data['Change %'] = data['Change %'].str.replace('%', '', regex=False).astype(float)

def convert_vol(value):
    if 'K' in value:
        return float(value.replace('K', '')) * 1e3
    elif 'M' in value:
        return float(value.replace('M', '')) * 1e6
    elif 'B' in value:
        return float(value.replace('B', '')) * 1e9
    return float(value)  # 如果沒有 'K' 或 'M'，直接轉換為浮點數

data['Vol.'] = data['Vol.'].apply(convert_vol)

In [None]:
split_index = len(data) - 120 

# 創建訓練集和測試集
train_data = data[:split_index]
test_data = data[split_index:]

print(f"Train data length: {len(train_data)}")
print(f"Test data length: {len(test_data)}")

In [None]:
from sklearn.linear_model import LinearRegression
import numpy as np

# 假設 train_groups 是你已經劃分好的訓練數據集
# 'features' 是你需要預測的欄位名稱
features = ['Price', 'Open', 'High', 'Low', 'Vol.']
def make_model():
    models = {feature: LinearRegression() for feature in features}  # 為每個特徵創建一個模型
    
    # 創建每個特徵的 X 和 y 集合
    X_train_by_feature = {feature: [] for feature in features}
    y_train_by_feature = {feature: [] for feature in features}
    
    # 訓練過程，使用 for i 循環每次取 i ~ i + group_size
    
    # for i 遍歷每個訓練組的起始索引，i 會從 0 開始，然後每次增長 step_size
    i = 0
    while(i < len(data) - group_size):
        group = data.iloc[i:i + group_size]  # 取出每個訓練組
        X1 = group.iloc[:group_size-1][features].values  # 前 group_size-1 天的資料
        y1 = group.iloc[group_size-1][features].values  # 第 group_size 天的資料作為目標
    
        # 將 group_size-1 天的資料合併成一個長向量
        X1_combined = X1.flatten()  # 把資料展開為一個長向量
    
        # 收集每個特徵的 X 和 y
        for j, feature in enumerate(features):
            X_train_by_feature[feature].append(X1_combined)  # 收集 X
            y_train_by_feature[feature].append(y1[j])  # 收集 y
            
        #step_size = random.randint(20, 40) 
        i = i + step_size
        
        #print("step_size", step_size)
        #print("i", i)
    
    # 批量訓練每個特徵的模型
    for feature in features:
        X_train = np.array(X_train_by_feature[feature])  # 將收集的 X 轉為 numpy 陣列
        y_train = np.array(y_train_by_feature[feature])  # 將收集的 y 轉為 numpy 陣列
        models[feature].fit(X_train, y_train)  # 訓練模型
    return models

In [None]:
"""
# 取出最後9個樣本的特徵
last_9_data = data.iloc[-(group_size-1):][features].values  # 取出最後9個樣本的特徵

# 合併為一個長向量 (1D 陣列)
X_combined = last_9_data.flatten()  # 轉換為 1D 陣列

# 用來存放預測結果的列表
predictions = []

# 為每個特徵進行預測
for i, feature in enumerate(features):
    # 使用對應的模型進行預測
    prediction = models[feature].predict(X_combined.reshape(1, -1))  # 這裡進行一次reshape使得符合模型要求
    predictions.append(prediction[0])  # 預測結果是陣列，所以取出第一個值

# 將預測結果轉換成 DataFrame
pred_df = pd.DataFrame([predictions], columns=features)

# 顯示預測結果
print("predict")
print(pred_df)

# 取出 test_data 的第一行並去掉 'time' 列
test_data_first_row = test_data.iloc[0][['Price', 'Open', 'High', 'Low', 'Vol.']]

# 轉換為 DataFrame 顯示為行的形式
test_data_first_row_df = pd.DataFrame([test_data_first_row.values], columns=test_data_first_row.index)

# 顯示第一行數據
print("test")
print(test_data_first_row_df)
"""

In [None]:
def make_predict(model):
    # 設定迭代次數
    iterations = len(test_data)
    
    # 用來存放每次的預測結果
    all_predictions = []
    
    # 複製原始的 last_8_data 以便進行逐步更新
    last_9_data = data.iloc[-(group_size-1):][features].values
    updated_last_9_data = last_9_data.copy()
    
    # 進行迭代預測
    for step in range(iterations):
        # 合併為一個長向量 (1D 陣列)
        X_combined = updated_last_9_data.flatten()  # 轉換為 1D 陣列
        
        # 用來存放當前步驟的預測值
        step_prediction = []
        
        # 為每個特徵進行預測
        for feature in features:
            prediction = models[feature].predict(X_combined.reshape(1, -1))  # 預測
            step_prediction.append(prediction[0])  # 取出預測結果
        
        # 將當前步驟的預測結果存入總結果列表
        all_predictions.append(step_prediction)
        
        # 丟掉 last_8_data 的第一個樣本，將新的預測結果插入到最後
        updated_last_9_data = np.vstack([updated_last_9_data[1:], step_prediction])  # 更新資料
        #updated_last_9_data = np.vstack([updated_last_9_data[1:], test_data.iloc[step][['Price', 'Open', 'High', 'Low', 'Vol.']].to_numpy().reshape(1, -1)])  # 更新資料
    
    # 將所有預測結果轉換成 DataFrame
    all_predictions_df = pd.DataFrame(all_predictions, columns=features)
    all_predictions_df['time'] = range(len(train_data), len(train_data) + len(all_predictions_df))
    # 顯示前幾步的預測結果
    all_predictions_df
    return all_predictions_df

In [None]:
def make_draw(all_predictions_df):
    import matplotlib.pyplot as plt
    
    # 假設 test_data 已經有 'time' 欄位
    
    # 繪製圖表
    plt.figure(figsize=(10, 6))
    
    # 繪製真實價格
    plt.plot(test_data['time'], test_data['Price'], label='Actual Price', linestyle='-')
    
    # 繪製預測價格
    plt.plot(all_predictions_df['time'], all_predictions_df['Price'], label='Predicted Price',linestyle='--')
    
    #plt.ylim(0, 400)
    # 添加標籤與圖例
    plt.title('Actual vs Predicted Price')
    plt.xlabel('Time')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    
    # 顯示圖表
    plt.show()


In [None]:
def make_MAPE(all_predictions_df):
    
    true_values = test_data["Price"].to_numpy()
    predicted_values = all_predictions_df["Price"].to_numpy()
    
    # 計算 MAPE
    mape = np.mean(np.abs((true_values - predicted_values) / true_values)) * 100
    
    #print(f"MAPE: {mape:.2f}%")
    return mape

In [None]:
group_size = 500  # 每個組的大小
step_size = 40  # 每次迴圈的增量

In [None]:
min_mape = 1e8
best_group_size = 0
best_step_size = 0

fig, ax = plt.subplots(figsize=(8, 6))
mape_values = []

for i in range(500):
    group_size = random.randint(100, 1000)
    step_size = random.randint(20, 200)
    models = make_model()
    all_predictions_df = make_predict(models)
    mape = make_MAPE(all_predictions_df)
    if(i%10 == 0):
        print("i =", i,"group_size =", group_size,"step_size =", step_size,"mape =", mape)

    if mape < min_mape:
        min_mape = mape
        best_group_size = group_size
        best_step_size = step_size

    ax.scatter(group_size, step_size, c=mape, cmap='viridis', s=100,vmin=0, vmax=50)

# 添加顏色條
plt.colorbar(ax.collections[0], label='MAPE')

# 設定標籤
ax.set_xlabel('Group Size')
ax.set_ylabel('Step Size')

# 顯示圖形
plt.title('MAPE as a Function of Group Size and Step Size')
plt.show()    


group_size = best_group_size
step_size = best_step_size
models = make_model()
all_predictions_df = make_predict(models)
mape = make_MAPE(all_predictions_df)
make_draw(all_predictions_df)

print("min_mape =", min_mape)
print("best_group_size =", best_group_size)
print("best_step_size =", best_step_size)

In [None]:
print(train_data)
print(all_predictions_df)

In [None]:
group_size = 625  # 每個組的大小
step_size = 40  # 每次迴圈的增量
models = make_model()
all_predictions_df = make_predict(models)
mape = make_MAPE(all_predictions_df)
make_draw(all_predictions_df)

print("mape =", mape)