In [None]:
# 📊 資料載入與預處理

import pandas as pd
import numpy as np

# 假設你有以下兩個檔案：
# - longterm_data.csv: 包含 1979–2019 的日射量資料
# - shortterm_data.csv: 包含 2020–2022 的日射量 + 發電資料

longterm_df = pd.read_csv('longterm_data.csv', parse_dates=['date'])
shortterm_df = pd.read_csv('shortterm_data.csv', parse_dates=['date'])

# 補充欄位：月份與季節
def assign_season(month):
    if month in [2,3,4]:
        return 'spring'
    elif month in [5,6]:
        return 'rainy'
    elif month in [7,8,9]:
        return 'summer'
    elif month in [10,11]:
        return 'autumn'
    else:
        return 'winter'

longterm_df['season'] = longterm_df['date'].dt.month.apply(assign_season)
shortterm_df['season'] = shortterm_df['date'].dt.month.apply(assign_season)


In [None]:
# 🌤 長期趨勢分解與特徵萃取（每季）

from statsmodels.tsa.seasonal import STL
from scipy.stats import linregress

season_features = {}

for season in longterm_df['season'].unique():
    df_season = longterm_df[longterm_df['season'] == season]
    df_season = df_season.set_index('date').resample('D').mean().interpolate()
    series = df_season['radiation']

    stl = STL(series, period=365)
    result = stl.fit()

    trend = result.trend
    slope = linregress(np.arange(len(trend)), trend.values).slope

    season_features[season] = {
        'slope': slope,
        'mean': series.mean(),
        'std': series.std(),
        'amplitude': result.seasonal.max() - result.seasonal.min()
    }

# 映射特徵至短期資料
for feature in ['slope', 'mean', 'std', 'amplitude']:
    shortterm_df[feature] = shortterm_df['season'].map(lambda s: season_features[s][feature])


In [None]:
# 🧮 SARIMAX 模型建構與預測

from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, r2_score

# 使用 POWER 為目標，radiation + 季節特徵作為外生變數
shortterm_df = shortterm_df.set_index('date').resample('D').mean().interpolate()

y = shortterm_df['power']
X = shortterm_df[['radiation', 'slope', 'mean', 'std']]

# 分割資料
split1 = int(len(y) * 0.7)
split2 = int(len(y) * 0.9)

y_train, y_val, y_test = y[:split1], y[split1:split2], y[split2:]
X_train, X_val, X_test = X[:split1], X[split1:split2], X[split2:]

# 建模
model = SARIMAX(y_train, exog=X_train, order=(1,1,1), seasonal_order=(1,1,1,12))
results = model.fit(disp=False)

# 預測與評估
y_pred = results.predict(start=y_val.index[0], end=y_val.index[-1], exog=X_val)
print("Validation RMSE:", mean_squared_error(y_val, y_pred, squared=False))
print("R²:", r2_score(y_val, y_pred))


In [None]:
# 🧠 LSTM 模型（Concatenation 模式）

import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.preprocessing import MinMaxScaler

# 使用 7 天為一組滑動視窗
def create_sequences(df, seq_len=7):
    X_seq, X_static, y = [], [], []
    for i in range(len(df) - seq_len):
        seq = df.iloc[i:i+seq_len]
        X_seq.append(seq[['radiation']].values)
        X_static.append(seq[['slope', 'mean', 'std']].iloc[-1].values)
        y.append(df['power'].iloc[i+seq_len])
    return np.array(X_seq), np.array(X_static), np.array(y)

scaler = MinMaxScaler()
scaled_df = pd.DataFrame(scaler.fit_transform(shortterm_df), columns=shortterm_df.columns, index=shortterm_df.index)

X_seq, X_static, y = create_sequences(scaled_df)

# 分割
train_size = int(len(y) * 0.8)
X_seq_train, X_seq_test = X_seq[:train_size], X_seq[train_size:]
X_static_train, X_static_test = X_static[:train_size], X_static[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# 建立模型
seq_input = Input(shape=(7, 1))
static_input = Input(shape=(3,))
lstm_out = LSTM(64)(seq_input)
concat = Concatenate()([lstm_out, static_input])
output = Dense(1)(concat)
model = Model([seq_input, static_input], output)
model.compile(optimizer='adam', loss='mse')

# 訓練
model.fit([X_seq_train, X_static_train], y_train, epochs=30, batch_size=16, validation_split=0.2)

# 評估
y_pred = model.predict([X_seq_test, X_static_test])
print("LSTM RMSE:", mean_squared_error(y_test, y_pred, squared=False))
print("R²:", r2_score(y_test, y_pred))
