# 分類モデル（XGBoost）

In [None]:
# ライブラリのインポート
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats
from sklearn import preprocessing

## csvの読み込み

In [None]:
#データの読み込み
## xgboostは型にcategoryを使えないため，one-hot-encordingが必要
data_folder = input("データファイルのあるフォルダまでのパス")
data_folder = data_folder.rstrip()
data_folder = data_folder.replace("\\", "/") + "/"

file = data_folder + "input20001_30000_input+error_dummy.csv"

df = pd.read_csv(file,  encoding="shift-jis")

In [1]:
#目的変数
pur = "安全率エラー判定"

In [None]:
sns.set(font='Yu Gothic',rc = {'figure.figsize':(200,200)})
sns.heatmap(df.corr(),square=True, vmax=1, vmin=-1, center=0,cmap='coolwarm')

In [None]:
sns.set(font='Yu Gothic',rc = {'figure.figsize':(100,100)})
dff = df.copy()
# dff = dff.drop(["評価_状態"],axis = 1)
sns.heatmap(dff.corr()[[pur]].sort_values(by=pur, ascending=False)[1:],cmap='coolwarm', annot=True)

## モデルの作成

In [None]:
# データの分割
# 全体の20%をテストデータに設定

from sklearn.model_selection import train_test_split
train, test = train_test_split(df, test_size=0.2, random_state = 9, stratify = df.loc[:,pur])

In [None]:
test

### アンダーサンプリングとクロスバリデーション

In [None]:
# Stratified K Foldでデータを分割
from imblearn.under_sampling import RandomUnderSampler
# 目的変数と説明変数に分ける
X = train.drop([pur], axis = 1) # 予測対象以外を説明変数に設定
y = train.loc[:,pur]

#アンダーサンプリング　3:1
positive_count_train = y.value_counts()[1]
strategy = {0.0:positive_count_train*3, 1.0:positive_count_train}
rus=RandomUnderSampler(random_state=9, sampling_strategy = strategy)
X, y = rus.fit_resample(X, y)
train = pd.concat([y,X],axis=1)

print(len(X))

# Trainデータの層状k分割
# ライブラリのインポート
from sklearn.model_selection import StratifiedKFold

fold = StratifiedKFold(n_splits=5, shuffle=True, random_state=0) # データを5分割する
kf = fold.split(X, y)
kf_cv = list(kf)

In [None]:
for i, (idx_train, idx_val) in enumerate(kf_cv):
    print(f'fold {i}')
    print(idx_train)
    print(idx_val)
    print('=='*30)
    print(len(idx_train), len(idx_val)) #5分割しているのでデータ数が1:4になるか確認する
    print('=='*30)

## モデルの作成

### XGBoost

In [None]:
# from xgboost.callback import early_stop
import xgboost as xgb
from sklearn import metrics # 正解率を出すためのライブラリ
from sklearn.model_selection import GridSearchCV,RandomizedSearchCV,StratifiedKFold,cross_val_score

### ハイパーパラメータチューニング

In [None]:
import optuna
def objective(trial,df_X,df_y):
    
    params ={
    'max_depth':trial.suggest_int("max_depth",3,10),
    'min_child_weight':trial.suggest_int('min_child_weight',1,5),
    'gamma':trial.suggest_uniform('gamma',0,5),
    'subsample':trial.suggest_uniform('subsample',0.5,1),
    'colsample_bytree':trial.suggest_uniform('colsample_bytree',0.5,1),
    'learning_rate':trial.suggest_uniform('learning_rate',0,1)}

    model = xgb.XGBClassifier(n_estimators=100,
                            verbosity=0,
                            n_jobs=-1,
                            random_state=0,
                            **params)

    #交差検証
    scores = cross_val_score(model, df_X, df_y,cv=5,scoring="accuracy")
    score_mean = np.mean(scores)

    return score_mean

In [None]:
#optuna.create_study()でoptuna.studyインスタンスを作る。
study = optuna.create_study(direction = 'maximize')

#studyインスタンスのoptimize()に作った関数を渡して最適化する。
study.optimize(lambda trial: objective(trial,X,y),n_trials=200, timeout=300)

In [None]:
#スコアを見る
print(study.best_params)    

In [None]:
print(study.best_value)

In [None]:
xgb_params = {
    'objective': 'binary:logistic',  # 2値分類問題
    'eval_metric': 'logloss',       # 学習用の指標
}

In [None]:
xgb_params['max_depth'] = study.best_params['max_depth']
xgb_params['min_child_weight'] = study.best_params['min_child_weight']
xgb_params['gamma'] = study.best_params['gamma']
xgb_params['subsample'] = study.best_params['subsample']
xgb_params['colsample_bytree'] = study.best_params['colsample_bytree']
xgb_params['learning_rate'] = study.best_params['learning_rate']

## 学習開始

In [None]:
import optuna
from sklearn.metrics import average_precision_score
import shap
import japanize_matplotlib

def fit_xgb(X, y, cv, params: dict=None):
    
    models = []
    acc = []
    oof_plob = np.zeros(len(X))
    oof_classfication = np.zeros(len(X))

    if params is None:
        params = {}

    threshold_all = []
    for i, (idx_train, idx_val) in enumerate(kf_cv):
        X_train, y_train = X.iloc[idx_train], y.iloc[idx_train] # 学習用の説明変数と目的変数の呼び出し
        X_val, y_val = X.iloc[idx_val], y.iloc[idx_val]

        clf = xgb.XGBClassifier(**params)
        model = clf.fit(X_train, y_train,
                        eval_set=[(X_val, y_val)],  
                        early_stopping_rounds=20,
                        verbose = 2)

        pred_prob = model.predict_proba(X_val)[:,1] # どのクラスに分類されるのかの確率を算出
        print(pred_prob)

        explainer = shap.TreeExplainer(model = model,data=X_train,feature_perturbation="interventional")
        shap_values = explainer(X_train)
        shap.plots.bar(shap_values=shap_values,max_display=10)
        shap.plots.beeswarm(shap_values,max_display=10)
       
        def accuracy(y_val, pred_prob, threshold):
            pred = [1 if prob >= threshold else 0 for prob in pred_prob]
            score = metrics.f1_score(y_val, pred)
            return score
        
        def objective(trial):
            threshold = trial.suggest_float('threshold', 0.0, 1.0) # 0~1.0で探索
            ret = accuracy(y_val, pred_prob, threshold)
            return ret
        
        # 閾値の最適化
        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=200)

        # 閾値の呼び出し
        best_threshold = sorted(study.best_params.values())
        pred = np.where(pred_prob > best_threshold,1,0)

        print(best_threshold)
        
        oof_plob[idx_val] = pred_prob
        print(pred)
        
        oof_classfication[idx_val] = pred

        models.append(model)
        threshold_all.append(best_threshold[0])

        acc.append(metrics.accuracy_score(y_val, pred))
    
    print(f'classification_report：{metrics.classification_report(y_val, pred)}')

    return oof_plob,oof_classfication, models, threshold_all

In [None]:
oof_plob,oof_classfication, models,threshold_all = fit_xgb(X, y, kf_cv, xgb_params)

In [None]:
#確率分布
sns.distplot(oof_plob)

In [None]:
#確率
oof_plob

In [None]:
print(np.average(threshold_all))

## テストデータ

In [None]:
from sklearn.metrics import confusion_matrix, mean_squared_error

def inference_xgb(models):
    # testデータに対して推論を行う
    X_test = test.drop(["安全率エラー判定"], axis=1)
    y_test = test['安全率エラー判定']

    pred_test_prob = np.zeros((len(y_test),len(y.unique()))) # 320×6の2次元配列を作成    

    for model in models:
        pred_test_prob += model.predict_proba(X_test)/5

    # AUC曲線
    from sklearn.metrics import roc_auc_score
    from sklearn.metrics import roc_curve
    fpr, tpr, thresholds = roc_curve(y_test, pred_test_prob[:,1] )
    plt.plot(fpr, tpr, marker='o')
    plt.xlabel('FPR: False positive rate')
    plt.ylabel('TPR: True positive rate')
    plt.grid()
    print(roc_auc_score(y_test, pred_test_prob[:,1] ))

    pred_test = (pred_test_prob[:,1]  > (np.average(threshold_all))).astype(int)

    #正解率
    from sklearn.metrics import accuracy_score
    print('Accuracy:',accuracy_score(y_test,pred_test_aaa))

    cm = confusion_matrix( y_test , pred_test_aaa ) 
    print(cm)
    print(f'Classification Report:{metrics.classification_report(y_test, pred_test)}')

    return pred_test_prob,X_test,y_test

In [None]:
pred_test_prob,X_test,y_test = inference_xgb(models)

In [None]:
len(pred_test_prob[:,1])

In [None]:
sns.distplot(oof_plob,label="検証用oof")
sns.distplot(pred_test_prob[:,1],bins=25,label="テスト予測確率")
plt.legend()
plt.show()

In [None]:
pred_test_prob[:,1]

In [None]:
train["prob"] = oof_plob
train

In [None]:
test["prob"] = pred_test_prob[:,1]
test

In [None]:
df_prob = pd.concat([train,test])

## 全データについて考える

In [None]:
from sklearn.metrics import accuracy_score

def all_data_xgb(models):
    # testデータに対して推論を行う
    X_all = df.drop(["安全率エラー判定"], axis=1)
    y_all = df['安全率エラー判定']

    pred_all_prob = np.zeros((len(y_all),len(y_all.unique()))) # 320×6の2次元配列を作成   
    print(pred_all_prob.shape) 

    for model in models:
        pred_all_prob += model.predict_proba(X_all)/5
        explainer = shap.TreeExplainer(model = model,data=X_all,feature_perturbation="interventional")
        shap_values = explainer(X_all)
        shap.plots.bar(shap_values=shap_values,max_display=10)
        shap.plots.beeswarm(shap_values,max_display=10)

    
    pred_all = (pred_all_prob[:,1]  > (np.average(threshold_all))).astype(int)
    # pred_all = np.where(pred_all_prob[:,1] > 0.5,1,0)

    print(pred_all)

    df2["predicted"] = pred_all

    print(f'ClassificationReport:',metrics.classification_report(y_all, pred_all))

    #適合率
    from sklearn.metrics import precision_score
    print('Precision:', precision_score(y_all, pred_all))

    
    #AUC曲線
    from sklearn.metrics import roc_auc_score
    from sklearn.metrics import roc_curve
    fpr, tpr, thresholds = roc_curve(y_test, pred_test_prob[:,1] )
    plt.plot(fpr, tpr)
    plt.xlabel('FPR: False positive rate')
    plt.ylabel('TPR: True positive rate')
    plt.grid()

    print(roc_auc_score(y_all, pred_all_prob[:,1] ))

    print('Accuracy:',accuracy_score(y_all,pred_all))

    cm = confusion_matrix( y_all , pred_all ) 
    print(cm)     

    return pred_all_prob

In [None]:
from sklearn.metrics import confusion_matrix, mean_squared_error

df2 = df.copy()
df2["prob"] = all_data_xgb(models)[:,1]
df2["差分"] = df2["安全率エラー判定"] - df2["predicted"]

print(data_folder)
# df2.to_csv(f"{data_folder}"+"220822_akiya500.csv",encoding='utf_8_sig')
# df.to_csv(output_data,encoding='utf_8_sig')
print(len(df2))

## エラーかどうかの予測確率毎の実際のエラーデータと正常データ件数

In [None]:
import japanize_matplotlib  # <- これ

df2.groupby('安全率エラー判定')['prob'].plot.hist(bins=10, alpha=0.8, legend=True,figsize=[10,10])
plt.ylim(0, 2000)                 # (2) y軸の表示範囲
plt.title('エラーかどうかの予測確率毎の実際のエラーデータと正常データ件数',fontsize = 14)
plt.legend(['正常データ','エラーデータ'],fontsize = 20)
plt.ylabel("データ数")
plt.xlabel("予測確率")

plt.show()


## Feature Importance

In [None]:
import japanize_matplotlib  # <- これ
for i in range(5):
    ax = xgb.plot_importance(models[i],importance_type='gain')
    fig = ax.figure
    print(fig.set_size_inches(4, 30))

In [None]:
# 5つのモデルで重要度が出てくるので箱ひげ図にします、
def plot_importance(model, X):
    feature_importance_df = pd.DataFrame()
    for i, model in enumerate(models):
        _df = pd.DataFrame()
        _df["feature_importance"] = model.feature_importances_
        _df["column"] = X.columns
        _df["fold"] = i + 1
        feature_importance_df = pd.concat([feature_importance_df, _df], 
                                          axis=0, ignore_index=True)
        print(feature_importance_df)

    order = feature_importance_df.groupby("column").sum()[["feature_importance"]].sort_values("feature_importance", ascending=False).index[:150]
    print(feature_importance_df)
    fig, ax = plt.subplots(figsize=(20, max(10, len(order) * .25)))
    sns.boxenplot(data=feature_importance_df, 
                  x="feature_importance", 
                  y="column", 
                  order=order, 
                  ax=ax, 
                  palette=None,  
                  orient="h")
    ax.tick_params(axis="x")
    ax.set_title("Feature Importance")
    ax.grid()
    fig.tight_layout()
    return fig, ax

fig, ax = plot_importance(models, X)