### type B GRU 모델 학습 + SHAP

In [None]:
pip install shap

from google.colab import files
import pandas as pd
from sklearn.preprocessing import RobustScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense
import shap
import numpy as np
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

In [None]:
# 데이터 로드
df = pd.read_csv("최최종데이터_b_지난주추가.csv")
df.sort_values(by=["id", "week", "obj_num"], inplace=True)
columns = df.columns[4:]

# 데이터 스케일링
scaler = RobustScaler()
df[columns] = scaler.fit_transform(df[columns])

# 독립 변수와 종속 변수 분리
X = df.drop(columns=['flw_get'])
y = df['flw_get']

# 주차(week) 정보를 기준으로 데이터를 그룹화
week_groups = df.groupby('week')

# 5주차부터 25주차까지를 훈련 데이터로 선택
train_weeks = list(range(5, 26))
train_data = pd.concat([group for week, group in week_groups if week in train_weeks])

# 26주차부터 30주차까지를 테스트 데이터로 선택
test_weeks = list(range(26, 31))
test_data = pd.concat([group for week, group in week_groups if week in test_weeks])# 주차(week) 정보를 기준으로 데이터를 그룹화

# 독립 변수와 종속 변수 분리
X_train = train_data.drop(columns=['flw_get', 'id', 'week', 'obj_num', 'week_start_date'])  # 독립 변수 (week_start_date 열 제거)
y_train = train_data['flw_get']  # 종속 변수
X_test = test_data.drop(columns=['flw_get', 'id', 'week', 'obj_num', 'week_start_date'])  # 독립 변수 (week_start_date 열 제거)
y_test = test_data['flw_get']  # 종속 변수

# GRU 모델 학습
def gru(X_train, y_train, num_units=75, num_layers=2, learning_rate=0.00005, epochs=200, batch_size=32):
    models = []
    model = Sequential()
    model.add(GRU(num_units, input_shape=(X_train.shape[1], 1), return_sequences=True))
    for _ in range(num_layers - 1):
        model.add(GRU(num_units, return_sequences=True))
    model.add(GRU(num_units))
    model.add(Dense(1, activation='linear'))
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate), loss='mean_squared_error')
    model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=0)
    models.append(model)
    return models

In [None]:
# 검증
def validation(X_test, y_test, models, title) :

  validation_results_mse = []
  validation_results_rmse = []

  for model in models:
      y_pred = model.predict(X_test)
      mse = mean_squared_error(y_test, y_pred)
      rmse = np.sqrt(mean_squared_error(y_test, y_pred))

      validation_results_mse.append(mse)
      validation_results_rmse.append(rmse)

  # 검증 결과 평균 출력
  mean_mse = np.mean(validation_results_mse)
  print("평균 검증 MSE:", mean_mse)

  mean_rmse = np.mean(validation_results_rmse)
  print("평균 검증 RMSE:", mean_rmse)

	plt.figure(figsize=(15,10))

  plt.plot(range(len(y_test)), y_test, color='blue')
  plt.plot(range(len(y_pred)), y_pred,  color='red')

  for i in range(1, 11) :
    plt.axvline(x=20*i, linestyle='dotted')

  plt.title(title)
  plt.savefig(f"{title}.png")
  plt.show()
  return mean_mse, mean_rmse

In [None]:
# 튜닝할 하이퍼파라미터 후보 정의
num_units_case = [75]
num_layers_case = [3]
learning_rate_case = [0.001]
epochs_case = [200]
batch_size_case = [16]

# 그리드 서치 수행
history = {"case": [], "model": [],
           "num_units": [], "num_layers": [], "learning_rate": [], "epochs": [], "batch_size": [],
           "mean_mse": [], "mean_rmse": []}

case_ = "B"
model_ = "GRU"

for num_units in num_units_case:
  for num_layers in num_layers_case:
    for learning_rate in learning_rate_case:
      for epochs in epochs_case:
        for batch_size in batch_size_case:
          # 파일 저장용 제목
          title = f"{case_}_{model_}({num_units}_{num_layers}_{learning_rate}_{epochs}_{batch_size})"
          print(f"{title}")

          # 모델 학습
          models = gru(X_train, y_train,
                      num_units=num_units,
                      num_layers=num_layers,
                      learning_rate=learning_rate,
                      epochs=epochs,
                      batch_size=batch_size)

          # SHAP을 사용한 모델 해석 및 검증
          models_with_shap = gru_with_shap(X_train, y_train,
                                              X_test,
                                              num_units=num_units,
                                              num_layers=num_layers,
                                              learning_rate=learning_rate,
                                              epochs=epochs,
                                              batch_size=batch_size)

          # 검증 진행
          mean_mse, mean_rmse = validation(X_test, y_test, models_with_shap, title)

          # 기록
          history["case"].append(case_)
          history["model"].append(model_)
          history["num_units"].append(num_units)
          history["num_layers"].append(num_layers)
          history["learning_rate"].append(learning_rate)
          history["epochs"].append(epochs)
          history["batch_size"].append(batch_size)
          history["mean_mse"].append(mean_mse)
          history["mean_rmse"].append(mean_rmse)

# 기록 저장
history_df = pd.DataFrame.from_dict(data=history, orient='columns')
history_df.to_csv(f"history_{case_}_{model_}.csv", index=False)

# 전체 파일 저장용
import os

file_list = os.listdir("/content")
file_list

for i in file_list:
    if i == '.config' or i == 'sample_data':
        continue
    else:
        files.download(i)

In [None]:
# SHAP을 사용한 GRU 모델 해석
def gru_with_shap(X_train, y_train, X_test, num_units=75, num_layers=2, learning_rate=0.00005, epochs=200, batch_size=32):
    X_train_np = X_train.values
    y_train_np = y_train.values
    X_test_np = X_test.values

    models = []
    model = Sequential()
    model.add(GRU(num_units, input_shape=(X_train_np.shape[1], 1), return_sequences=True))
    for _ in range(num_layers - 1):
        model.add(GRU(num_units, return_sequences=True))
    model.add(GRU(num_units))
    model.add(Dense(1, activation='linear'))
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate), loss='mean_squared_error')
    model.fit(X_train_np, y_train_np, epochs=epochs, batch_size=batch_size, verbose=0)
    models.append(model)

    # shap_values 계산을 위한 masker 생성
    masker = shap.maskers.Independent(data=X_train_np)

    # shap.Explainer에 masker 전달
    explainer = shap.Explainer(model, masker)

    # shap_values 계산
    shap_values = explainer.shap_values(X_test_np)


    # SHAP summary plot 그리기
    shap.summary_plot(shap_values, features=X_test_np, feature_names=X_test.columns, show=False)
    plt.title("GRU B with SHAP - Summary Plot")
    plt.savefig("gru_B_shap_summary_plot.png")
    plt.show()

    # # SHAP bar plot 그리기
    shap.summary_plot(shap_values, features=X_test_np, feature_names=X_test.columns, plot_type='bar', show=False)
    plt.subplots_adjust(top=0.9, bottom=0.1, left=0.1, right=0.9)
    plt.title("GRU B with SHAP - Bar Plot")
    plt.savefig("gru_B_shap_bar_plot.png")
    plt.show()

    # index 시작 확인하기
    for selected_feature_name in X_test.columns:
      print(selected_feature_name)
      print(df.columns.get_loc(selected_feature_name))


    # 선택한 특성의 이름 - dependence plot 이용하기 위한 역변환 과정
    for selected_feature_name in X_test.columns:
        print(selected_feature_name)
        selected_feature_index = df.columns.get_loc(selected_feature_name) - 5

        # RobustScaler의 중앙값과 IQR 이용
        center = scaler.center_[selected_feature_index]
        scale = scaler.scale_[selected_feature_index]

        # 스케일링된 데이터를 원래 값으로 역변환
        feature_values_original = X_test[selected_feature_name].values * scale + center

        # dependence plot 그리기
        plt.figure(figsize=(10, 6))

        plt.scatter(feature_values_original, shap_values[:, selected_feature_index], c=y_test)
        plt.xlabel(selected_feature_name)
        plt.ylabel('SHAP Value')
        plt.title(f'GRU B with SHAP - Dependence Plot for {selected_feature_name}')
        plt.colorbar(label='Actual Target Value')
        plt.savefig(f"{selected_feature_name}_dependence_plot_original.png")
        plt.show()

    # feature importance plot 그리기
    shap_values = explainer(X_test)
    shap.plots.bar(shap_values)

    return models