1. import libraries

In [4]:
#import
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
from boruta import BorutaPy
import itertools
from sklearn.metrics import pairwise_distances
from sklearn.neighbors import NearestNeighbors
from sklearn.decomposition import PCA
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error  

2. definition

In [5]:
#def

def yyplot_k(y_obs, y_est, y_obs_label = "Observed Đ", y_est_label = "Predicted Đ"):

    plt.rcParams["font.size"] = 20
    plt.figure(figsize = (8,8))

    plt.scatter(y_obs,y_est,alpha = 1)

    ratio = 0.03
    axv = plt.axis()
    axrange = axv[1] - axv[0]
    axmin = min(axv) - ratio*axrange
    axmax = max(axv) + ratio*axrange
    plt.axis([axmin,axmax,axmin,axmax])

    plt.plot([axmin,axmax],[axmin,axmax],color = "gray")

    plt.xlabel(y_obs_label)
    plt.ylabel(y_est_label)
    
def yyplot_train_test(y_obs_train, y_est_train, y_obs_test, y_est_test, y_obs_label = "Observed Đ", y_est_label = "Predicted Đ"):

    plt.rcParams["font.size"] = 20
    plt.figure(figsize = (8,8))

    plt.scatter(y_obs_train,y_est_train,alpha = 1, color = "blue", label="training")
    plt.scatter(y_obs_test,y_est_test,alpha = 1, color = "orange", label="test")

    ratio = 0.03
    axv = plt.axis()
    axrange = axv[1] - axv[0]
    axmin = min(axv) - ratio*axrange
    axmax = max(axv) + ratio*axrange
    plt.axis([axmin,axmax,axmin,axmax])

    plt.plot([axmin,axmax],[axmin,axmax],color = "gray")

    plt.xlabel(y_obs_label)
    plt.ylabel(y_est_label)
    
def search_highly_correlated_variables(X, threshold_of_r):
    r_in_x = X.corr()
    r_in_x = abs(r_in_x)

    for i in range(r_in_x.shape[0]):
        r_in_x.iloc[i, i] = 0
    highly_correlated_variable_numbers = []
    
    for i in range(r_in_x.shape[0]):
        r_max = r_in_x.max()
        r_max_max = r_max.max()
        if r_max_max >= threshold_of_r:
            variable_number_1 = np.where(r_max == r_max_max)[0][0]
            variable_number_2 = np.where(r_in_x.iloc[:, variable_number_1] == r_max_max)[0][0]
            r_sum_1 = r_in_x.iloc[:, variable_number_1].sum()
            r_sum_2 = r_in_x.iloc[:, variable_number_2].sum()
            if r_sum_1 >= r_sum_2:
                delete_x_number = variable_number_1
            else:
                delete_x_number = variable_number_2
            highly_correlated_variable_numbers.append(delete_x_number)
            r_in_x.iloc[:, delete_x_number] = 0
            r_in_x.iloc[delete_x_number, :] = 0
        else:
            break

    X.drop(X.columns[highly_correlated_variable_numbers], axis=1, inplace = True)
    return X

def search_highly_correlated_variables_cv(X_train, X_test, threshold_of_r):
    r_in_x = X_train.corr()
    r_in_x = abs(r_in_x)

    for i in range(r_in_x.shape[0]):
        r_in_x.iloc[i, i] = 0
    highly_correlated_variable_numbers = []

    for i in range(r_in_x.shape[0]):
        r_max = r_in_x.max()
        r_max_max = r_max.max()
        if r_max_max >= threshold_of_r:
            variable_number_1 = np.where(r_max == r_max_max)[0][0]
            variable_number_2 = np.where(r_in_x.iloc[:, variable_number_1] == r_max_max)[0][0]
            r_sum_1 = r_in_x.iloc[:, variable_number_1].sum()
            r_sum_2 = r_in_x.iloc[:, variable_number_2].sum()
            if r_sum_1 >= r_sum_2:
                delete_x_number = variable_number_1
            else:
                delete_x_number = variable_number_2
            highly_correlated_variable_numbers.append(delete_x_number)
            r_in_x.iloc[:, delete_x_number] = 0
            r_in_x.iloc[delete_x_number, :] = 0
        else:
            break

    X_train.drop(X_train.columns[highly_correlated_variable_numbers], axis=1, inplace = True)
    X_test.drop(X_test.columns[highly_correlated_variable_numbers], axis=1, inplace = True)
    return X_train, X_test

def boruta(X, y, perc, rseed_boruta):
    selected_columns_sum = []
    for i in range(1):
        base_bo = RandomForestRegressor(n_estimators = 40, max_depth = 8)
        feat_selector = BorutaPy(base_bo, n_estimators = "auto", perc = perc, verbose =2, random_state=rseed_boruta)
        feat_selector.fit(X.values, y.values)
        selected = feat_selector.support_
        selected_columns = X.columns[selected].to_list()
        selected_columns_sum.append(selected_columns)

    boruta = list(set(list(itertools.chain.from_iterable(selected_columns_sum))))
    X = X[boruta]
    return X

def boruta_cv(X_train, y_train, X_test, perc, rseed_boruta):
    selected_columns_sum = []
    for i in range(1):
        base_bo = RandomForestRegressor(n_estimators = 40, max_depth = 8)
        feat_selector = BorutaPy(base_bo, n_estimators = "auto", perc = perc, verbose =2, random_state = rseed_boruta)
        feat_selector.fit(X_train.values, y_train.values)
        selected = feat_selector.support_
        selected_columns = X_train.columns[selected].to_list()
        selected_columns_sum.append(selected_columns)

    boruta = list(set(list(itertools.chain.from_iterable(selected_columns_sum))))
    X_train = X_train[boruta]
    X_test = X_test[boruta]
    return X_train, X_test

def funcTanimoto_sklearn(a, b):
    """
    Jaccard similarity is used as a kernel
    
    """
    if 0:
        print("----")
        print(a.shape)
        print(b.shape)
        print("----")
        
    if (a.ndim == 1) and (b.ndim == 1):
        jdist = pairwise_distances(a.astype(bool, copy = False).reshape(1, -1), b.astype(bool, copy = False).reshape(1, -1), metric = "jaccard")
    else:
        jdist = pairwise_distances(a.astype(bool, copy = False), b.astype(bool, copy = False), metric = "jaccard")
        
    return 1 - jdist

def T2_value(Xtr, Xts, dim):
    
    pca = PCA()
    pca.fit(Xtr)
    Xtr_pca = pca.transform(Xtr)
    
    # 寄与率（各主成分がどれだけ情報を保持するか）
    explained_variance_ratio = pca.explained_variance_ratio_
    # 累積寄与率（何次元まででどれだけカバーできるか）
    cumulative_explained_variance = np.cumsum(explained_variance_ratio)
    
    
    Xts_pca = pca.transform(Xts)
    
    # print(pd.DataFrame(pca.explained_variance_ratio_))
    
    loading = pca.components_  # shape: (PC数, 変数数)
    
    Xtr_6 =  Xtr_pca[:, :dim]
    Xts_6 =  Xts_pca[:, :dim]
    
    # 寄与度の計算：スコアとローディングから復元
    contribution_vec = np.dot(Xts_6, loading[:dim,:])  # shape: (1, n_features)
    contribution_squared = contribution_vec.squeeze()**2  # 二乗寄与度（各説明変数）
    
    varis = Xtr_6.var(axis=0)
    T2_train_max = max(np.sum(np.square(Xtr_6)/varis, axis=1))
    T2_test = np.sum(np.square(Xts_6)/varis, axis=1)

    # 寄与度を表示
    df_contrib = pd.DataFrame({
        "feature": Xtr.columns,
        "contribution": contribution_squared
    }).sort_values(by="contribution", ascending=False)
    
    return T2_test, T2_train_max, df_contrib, cumulative_explained_variance[dim]

def knn(k, ratio, Xtr, Xts):
    
    # Construction and fitting of KNN models
    ad_model = NearestNeighbors(n_neighbors=k + 1, metric='euclidean')
    ad_model.fit(Xtr)
    
    # Calculate KNN distance for training data
    knn_distance_train, _ = ad_model.kneighbors(Xtr)
    mean_knn_distance_train = knn_distance_train[:, 1:].mean(axis=1)
    
    # Determination of AD thresholds
    sorted_mean_knn_distance_train = np.sort(mean_knn_distance_train)
    ad_threshold = sorted_mean_knn_distance_train[int(len(sorted_mean_knn_distance_train) * ratio) - 1]
    
    # Calculate KNN distance for test data
    knn_distance_test, _ = ad_model.kneighbors(Xts)
    mean_knn_distance_test = knn_distance_test[:, 1:].mean(axis=1)
    
    return mean_knn_distance_test, ad_threshold

def RMSE (y, y_pred):
    rmse = np.sqrt(mean_squared_error(y, y_pred))
    return rmse

def MAE (y, y_pred):
    mae = np.sqrt(mean_absolute_error(y, y_pred))
    return mae
    
    