In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sb

sb.set(style="white", font="Times New Roman", context="paper", palette='bright', font_scale=1.5)


%matplotlib inline

# Auxiliary functions

In [None]:
from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import KFold

def cv_training(model, x_train, y_train, kpi_features, target_features, cv_method=None, k_folds=10, shuffle=False, kfold_random_state=150, *args, **kwargs):
    """
        This function perform two different cross validation on training.
        1. Leave One out:                leave_one_out
        2. KFold :                       kfold
        3. Regular training (Default):   None

        user must give sklearn model object
    :param model:      Sklearn model object
    :param x_train:       the input data for trining
    :param y_train:       the target data fpr training
    :param kpi_features:       List of kpi features
    :param target_features:       list of target features
    :param cv_method:  the method of traininf. default k_fold with fold1=0
    :param k_folds: The numer of folds for k-fold.
    :params kfold_random_state   random state to do same k-fold
    :param args:
    :param kwargs:
    :return:
        Trained model.
    """

    if cv_method == "kfold":
        kf = KFold(n_splits=k_folds, shuffle=shuffle, random_state=kfold_random_state)
        for train_set, test_set in kf.split(x_train):
            model.fit(
                x_train[train_set, :],
                y_train[train_set, :].ravel(),
            )

    elif cv_method == "leave_one_out":
        cv = LeaveOneOut()
        for train_set, test_set in cv.split(x_train):
            model.fit(
                x_train[train_set, :],
                y_train[train_set, :].ravel(),
            )
    elif cv_method == "single_shot":
        cv = LeaveOneOut()
        for test_set, train_set in cv.split(x_train):  # I just changed the train_set and test_set indexes.
            model.fit(
                x_train[train_set, :],
                y_train[train_set, :].ravel(),
            )
    else:   # regular training without any cross validation.
        model.fit(
            x_train,
            y_train.ravel(),
        )

    return model




import copy

def multiple_training(predictor_model, xtrain, ytrain, xtest, ytest, kpi_features, target_features, number_of_iterations=100, kfold=True, shuffle=False, kfold_random_state=150, *args, **kwargs):
    """
        This function train model multiple times and return its best one.
        It also save it.
    :param predictor_model:   An in stnace of sklearn model
    xtrain, ytrain, xtest, ytest: train test numpy datasets.. scaled
    :param number_of_iterations: The numbe rof training times.
    :param transfer_learning:  If True, it will change some directories.
      # TODO: check later for target encoder too.
    :param args:
    :param kwargs:
    :return:
        Best trained model
    """
    
    if kfold:
        cv_method = "kfold"
    else:
        cv_method = None

    model_performance = []
    for i in range(number_of_iterations):
        try:

            fake_model = [predictor_model]
            model = copy.deepcopy(fake_model)[0]
            
            model = cv_training(model=model, x_train=xtrain, y_train=ytrain.reshape(-1, 1), 
                    kpi_features=kpi_features, target_features=target_feature, 
                                cv_method=cv_method, 
                                k_folds=kfold, 
                                shuffle=shuffle, 
                                kfold_random_state=kfold_random_state)

            model_performance.append(
                {
                    "iteration": i,
                    "model_instance": model,
                    "test_r_squared": round(
                        r2_score(
                            ytest.reshape(-1, 1),
                            model.predict(xtest).reshape(-1, 1)
                        ),
                        3
                    ),
                    "train_r_squared": round(
                        r2_score(
                            ytrain.reshape(-1, 1),
                            model.predict(xtrain).reshape(-1, 1)
                        ),
                        3
                    )
                }
            )

            print(f"Model: {model}, Iteration: {i}, Train R squared: {model_performance[i]['train_r_squared']}, Test R squared: {model_performance[i]['test_r_squared']}")
            del model, fake_model
        except KeyboardInterrupt as e:
            print("\nProcess terminated by user ...!")
            break

    plt.figure()
    plt.plot(range(len(model_performance)), [j["train_r_squared"] for j in model_performance], color="red")
    plt.plot(range(len(model_performance)), [j["test_r_squared"] for j in model_performance], color="blue")
    plt.title("Train and Test $R^2$")
    plt.xlabel("Training Iteration")
    plt.ylabel("$R^2$")
    plt.legend([f"Train $R^2$, Mean:{round(np.mean([i['train_r_squared'] for i in model_performance]), 2)}", f"Test $R^2$, Mean:{round(np.mean([i['test_r_squared'] for i in model_performance]), 2)}"])
    
    # if not transfer_learning:
    #     plt.savefig(os.path.join(model_instance.model_dir, f"{model_instance.model_name}_r_squared.png"), dpi=300)
    # else:
    #     plt.savefig(os.path.join(model_instance.model_dir, f"after_transfer_learning/{model_instance.model_name}_r_squared.png"), dpi=300)
    plt.show()

    # Get model with hghiest Test data R squared.
    model = sorted(model_performance, key=itemgetter("test_r_squared"), reverse=True)[0]["model_instance"]
    
    return model



In [None]:
def metric_reporter(y_pred, y_true, *args, **kwargs):
    
    print("R2:  ", metrics.r2_score(y_true=y_true, y_pred=y_pred))
    print("RMSE:  ", metrics.mean_squared_error(y_true=y_true, y_pred=y_pred, squared=False))
    print("MAE:  ", metrics.mean_absolute_error(y_true=y_true, y_pred=y_pred))

In [None]:
import pandas as pd
# ## Material Properties: For compounds I used harmonic average.
metal_work_functions = {
    "Co": 4.52, "Cr": 4.11, "Ir": 5.31, "NiO": 2.49, 
    "Au": 5.06, "Cu": 4.53, "Ni": 4.98, "Pd": 5.06, 
    "Pt": 5.54, "Rh": 4.99, "Ag": 4.22, "Bare": 0,
    "nan": np.NaN
}
metal_atomic_numbers = {
    "Co": 27, "Cr": 24, "Ir": 77, "NiO": 18, "Au": 79,
    "Cu": 29, "Ni": 28, "Pd": 46, "Pt": 78, "Rh": 45,
    "Ag": 47, "Bare": 0,
    "nan": np.NaN
}
metal_electronegativity = {
    "Co": 1.88, "Cr": 1.66, "Ir": 2.20, "NiO": 2.675,
    "Au": 2.54, "Cu": 1.9, "Ni": 1.91, "Pd": 2.2,
    "Pt": 2.28, "Rh": 2.28, "Ag": 1.93, "Bare": 0,
    "nan": np.NaN
}

engineered_features = pd.read_excel("../../alcohol_structure/engineered_features.xlsx")

def value_to_cat(df, list_of_alcohol):
    df['MT'] = ""    # Metal atomic number
    df['AT'] = ""    # alcohol molecular weight

    for i in df.index.tolist():
        for j, key in enumerate(metal_atomic_numbers):
            if df.loc[i, "MAN"] == metal_atomic_numbers[key]:
                df.loc[i, 'MT'] = key
        for at in list_of_alcohol:
            if df.loc[i, "ATI"] == engineered_features[engineered_features.name == at].L_pca_feature.to_numpy()[0]:
                df.loc[i, 'AT'] = at
    
    return df

In [None]:
import numpy as np

from numpy.random import rand
from sklearn.metrics import mean_squared_error


class BayesianOptimizer():
    """
        This class perform gradient descent algorithm on optimization for single objective

        This class perform genetic algorithm for single sample.
    """

    
    def __init__(self, model, kpi_features, target_feature, optimization_features, optimization_rate=0.01, threshold=1e-3, decay_rate=0.9, feature_bound=[-1, 1], *args, **kwargs):
        """
            This method initialize the values with objective of the optimization
        :param model:     A model instance with .predict method.
        :param database:  A database includes (feature for .predict method of the model) kpi feature and target features not anything else for
        :param kpi_features:  The ki feature for prediction.
        :param target_feature:  The target feature of main model predictor (keras or sklearn model)
        :param optimization_feature:       The desired feature for optimization
        :param optimization_rate:       The gradient descent optimization rate.
        :param threshold:       Threshold to stop iterations.
        :param decay_rate:       The Decay rate for momentom
        :param feature_bound:       The bound of feature to calculate the optimzation results.
        :param args:
        :param kwargs:
        """
        self.model = model
        self.kpi_features = kpi_features
        self.target_feature = target_feature

        self.decay_rate = decay_rate
        self.optimization_rate = optimization_rate
        self.epsilon = 1e-4     # use for derivative calculations.
        self.threshold = threshold   # Stop criteria

        self.feature_bound = np.array([feature_bound])
        self.optimization_features = optimization_features

    def objective_function(self, optimization_feature_value):
        """
            This method is used for the genetic optimizer to predict values.
        :param optimization_feature:    The value for the feature with optimization.
        :return:
            The predicted values of the main model
        """

        input_data = self.database[self.kpi_features].copy()
        input_data[self.optimization_features] = optimization_feature_value

        predicted_targets = self.model.predict(np.array(input_data, dtype=float))
        
        # Negetive tfor maximation
        predicted_targets = -1 * predicted_targets
        return predicted_targets

    def run_optimization(self, database, *args, **kwargs):
        """
            This method on call run the optimization loop.
        :param database:    The input database for optimization (single row ?)
        :param args:
        :param kwargs:
        :return:
            The optimization model.
        """
        self.database = database

        # initial guess:
        # solution = self.feature_bound[:, 0] + rand(len(self.feature_bound)) * (self.feature_bound[:, 1] - self.feature_bound[:, 0])
        
        solution = self.database.iloc[0][self.optimization_features].values.reshape(1, -1)
        
        objective_function = 1
        count = 0
        diff = 0
        while objective_function > self.threshold:

            try:
                # Update guess
                gradient = self.gradient(optimization_feature_value=solution)
                diff = self.decay_rate * diff - self.optimization_rate * gradient
                solution = solution + diff

                # Calculate the objective function
                objective_function = self.objective_function(optimization_feature_value=solution)

                count += 1
                print(f"Iteration Number:  {count},  Cost Function: {objective_function}, Update: {diff[0]}")
                if (count > 50) or np.abs(diff) < self.threshold:
                    break
            except KeyboardInterrupt:
                print("Optimization terminated by user!")
                break
        return solution, objective_function, count

    def gradient(self, optimization_feature_value):
        """
            This function calculate gradient of the predictor model and return the value
        :return:
            Gradient value of the predictor model.
        """

        input_data = self.database[self.kpi_features].copy()
        input_data[self.optimization_features] = optimization_feature_value

        # calculate Gradient
        x_add_delta_x = input_data[self.kpi_features].copy()
        x_add_delta_x[self.optimization_features] += self.epsilon

        x_mines_delta_x = input_data[self.kpi_features].copy()
        x_mines_delta_x[self.optimization_features] += - self.epsilon

        return (self.model.predict(np.array(x_add_delta_x, dtype=float)) - self.model.predict(np.array(x_mines_delta_x, dtype=float))) / (2 * self.epsilon)


In [None]:
from math import sin
from math import pi
from numpy import arange
from numpy import vstack
from numpy import argmax
from numpy import asarray
from numpy.random import normal
from numpy.random import random
from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from warnings import catch_warnings
from warnings import simplefilter
from matplotlib import pyplot


# objective function
def objective(x, noise=0.1):
    return -1 * model.predict(x)

# surrogate or approximation for the objective function
def surrogate(gpr_model, X):
    # catch any warning generated when making a prediction
    with catch_warnings():
        # ignore generated warnings
        simplefilter("ignore")
        return gpr_model.predict(X, return_std=True)


# probability of improvement acquisition function
def acquisition(X, Xsamples, gpr_model):
    # calculate the best surrogate score found so far
    yhat, _ = surrogate(gpr_model, X)
    best = max(yhat)
    # calculate mean and stdev via surrogate function
    mu, std = surrogate(gpr_model, Xsamples)

    mu = mu.reshape(-1, )
    # calculate the probability of improvement
    probs = norm.cdf((mu - best) / (std+1E-9))
    return probs


# optimize the acquisition function
def opt_acquisition(X, y, gpr_model, Xsamples):
    # random search, generate random samples
    # Xsamples = random(100)
    # Xsamples = Xsamples.reshape(len(Xsamples), 1)
    # calculate the acquisition function for each sample
    scores = acquisition(X, Xsamples, gpr_model)

    # locate the index of the largest scores
    ix = argmax(scores)
    return Xsamples[ix]

# Load data

In [None]:
all_data = pd.read_excel("path to all_data_file.xlsx")

In [None]:
# Feature for alcohol type.
engineered_features = pd.read_excel("path to engineered_features.xlsx")

In [None]:
# To obtain models for other LI features change it based on the name of the feature:
# ApP, LI, AcP, and None.

kpi_features = ["ML", "AcP", "AC", "MWF", "ATI"]
target_feature = ["Rate"]            # Active photons

# preparation:

In [None]:

# Scaling.

from sklearn import preprocessing


input_data = all_data[kpi_features].copy()
target_data = all_data[target_feature].copy().values.reshape(-1, 1)

input_scaler = preprocessing.StandardScaler(with_mean=True, with_std=True)
target_scaler = preprocessing.StandardScaler(with_mean=True, with_std=True)

# input_scaler = preprocessing.MinMaxScaler((0, 1))
# target_scaler = preprocessing.MinMaxScaler((0, 1))

input_scaler.fit(input_data)
target_scaler.fit(target_data)
normal_all_data = pd.concat(
    [
        pd.DataFrame(input_scaler.transform(input_data), columns=input_data.columns.tolist()),
        pd.DataFrame(target_scaler.transform(target_data), columns=target_feature)
    ],
    axis=1
)

# Active learning

In [None]:
from sklearn.gaussian_process import kernels
from sklearn.gaussian_process import GaussianProcessRegressor

training_data = all_data.sample(10, random_state=365).copy()
test_data = all_data[~all_data.index.isin(training_data.index.values.tolist())].copy()

xtrain, ytrain = input_scaler.transform(training_data[kpi_features]), target_scaler.transform(training_data[target_feature].values)
xtest, ytest = input_scaler.transform(test_data[kpi_features]), target_scaler.transform(test_data[target_feature].values)


all_rmse = []
all_mstde = []
id_of_selected_samples = []


# test_ddd = value_to_cat(test_data.copy(), list_of_alcohol=lit_database.AT.value_counts().index.tolist())
training_data_ = training_data.copy()
test_data_ = test_data.copy()

counts = -1
while True:
    # my_kernel = 0.5 * kernels.ConstantKernel(0.5, (1e-23, 1e20)) + \
    #         kernels.RBF(0.9, (1e-23, 1e20)) * kernels.RBF(0.9, (1e-23, 1e20)) + \
    #         kernels.WhiteKernel(0.1, (1e-23, 1e20)) 


    # gpr_instance = GaussianProcessRegressor(
    #     kernel=my_kernel, 
    #     alpha=1e-10,
    #     normalize_y=True,
    #     copy_X_train=False,
    #     random_state=20,
    # )
    
    gpr_instance = RandomForestRegressor(n_estimators=1500, max_depth=800)


    gpr = cv_training(model=gpr_instance, x_train=xtrain, y_train=ytrain.reshape(-1, 1), kpi_features=kpi_features, 
                        target_features=target_feature, cv_method="kfold", k_folds=5, shuffle=True)

    counts+=1
    print(f"\nIteration {counts} ================")
    print("Training Score: ", gpr.score(xtrain, ytrain), "\n")
    print("Test Score: ", gpr.score(xtest, ytest))



    # Both model:
    # ypredtrain, stdpredtrain = target_scaler.inverse_transform(gpr.predict(xtrain, return_std=True)[0].reshape(-1, 1)), target_scaler.inverse_transform(gpr.predict(xtrain, return_std=True)[1].reshape(-1, 1))
    # ypredtest, stdpredtest = target_scaler.inverse_transform(gpr.predict(xtest, return_std=True)[0].reshape(-1, 1)), target_scaler.inverse_transform(gpr.predict(xtest, return_std=True)[1].reshape(-1, 1))
    
    ypredtrain = target_scaler.inverse_transform(gpr.predict(xtrain).reshape(-1, 1))
    ypredtest = target_scaler.inverse_transform(gpr.predict(xtest).reshape(-1, 1))
 

    # print("MSTDE", np.mean(stdpredtest))
    print("RMSE", metrics.mean_squared_error(
        y_pred=ypredtest, y_true=test_data_[target], squared=False))
    
    all_rmse.append(metrics.mean_squared_error(
        y_pred=ypredtest, y_true=test_data_[target], squared=False))
    # all_mstde.append(np.mean(stdpredtest))



    test_errors = ypredtest - target_scaler.inverse_transform(ytest)

    _test_data = pd.DataFrame(columns=["error", "prediction", "abs_error", "std_of_error"], 
                              index=test_data_.index.tolist())
    _test_data["error"] = test_errors.reshape(-1, 1)
    _test_data["prediction"] = ypredtest.reshape(-1, 1)
    _test_data["abs_error"] = abs(test_errors.reshape(-1, 1)).reshape(-1, 1) 
    _test_data["std_of_error"] = np.NaN #stdpredtest.reshape(-1, 1)

    _test_data.index.name = "ID"
    _test_data = pd.concat([_test_data, test_data_], axis=1)
    
    # Check if continue
    # if not int(input("Enter 1 to pick up the 20 sample of test data \nwhihth hiest error and train a new model\nor enter zero to stop active learning:  ")):
    #     break
    if len(test_data_) <= 70:
        break
    # if gpr.score(xtest, ytest) > gpr.score(xtrain, ytrain):
    #     break

    _test_data.sort_values(by="abs_error", ascending=False, inplace=True)
    samples_with_highest_abs_error = _test_data.iloc[:10].copy()
    
    print("selected samples: \n", samples_with_highest_abs_error.index.values.tolist())
    
    id_of_selected_samples.append(samples_with_highest_abs_error.index.values.tolist())
    
    # import pdb; pdb.set_trace()
    
    training_data_ = pd.concat([training_data_, samples_with_highest_abs_error[kpi_features + target_feature]], axis=0)
    training_data_.sample(frac=1, replace=True)
    test_data_ = test_data_[~test_data_.index.isin(samples_with_highest_abs_error.index.values.tolist())]
    # test_ddd = value_to_cat(test_data_.copy(), list_of_alcohol=lit_database.AT.value_counts().index.tolist())
    
    # change x and y train and test
    xtrain, ytrain = input_scaler.transform(training_data_[kpi_features]), target_scaler.transform(training_data_[target_feature].values)
    xtest, ytest = input_scaler.transform(test_data_[kpi_features]), target_scaler.transform(test_data_[target_feature].values)

    print("train size: ", ytrain.shape[0], "   test size: ", ytest.shape[0])
    
plt.plot(all_rmse, color="red")
# plt.plot(all_mstde, color="blue")
plt.show()

In [None]:
# training Randomforest tree for checking the trees, 

from xgboost import XGBRegressor, XGBRFRegressor

xtrain, ytrain = input_scaler.transform(training_data[kpi_features]), training_data[target_feature].values
xtest, ytest = input_scaler.transform(validation_data[kpi_features]), validation_data[target_feature].values


xg_model = XGBRegressor(random_state=200)
# xg_model = XGBRFRegressor(learning_rate=0.04, n_estimators=100, max_depth=40, random_state=200)
xg_model = cv_training(model=xg_model, x_train=xtrain, y_train=ytrain.reshape(-1, 1), kpi_features=kpi_features, 
                    target_features=target_feature, cv_method="kfold", k_folds=5, shuffle=True)

print("Train: ")
metric_reporter(
    y_pred=xg_model.predict(input_scaler.transform(training_data[kpi_features])).reshape(-1, 1),
    y_true=training_data[target_feature], 
)

print("Test: ")
metric_reporter(
    y_pred=xg_model.predict(input_scaler.transform(validation_data[kpi_features])).reshape(-1, 1),
    y_true=validation_data[target_feature], 
)


# TPOT Training:

In [None]:
import tpot
from sklearn.model_selection import RepeatedKFold

cv = RepeatedKFold(n_splits=10, n_repeats=10, random_state=1)
xtrain, ytrain = input_scaler.transform(training_data[kpi_features]), target_scaler.transform(training_data[target_feature].values)

model = tpot.TPOTRegressor(
    generations=5, 
    population_size=100, 
    scoring='neg_mean_absolute_error', 
    cv=cv, 
    verbosity=2, 
    random_state=1, 
    n_jobs=-1
)

model.fit(xtrain, ytrain)

all_data["predictions"] = target_scaler.inverse_transform(model.predict(input_scaler.transform(all_data[kpi_features])).reshape(-1, 1)).reshape(-1, )


plt.figure()
plt.scatter(
    training_data[target_feature],
    target_scaler.inverse_transform(model.predict(input_scaler.transform(training_data[kpi_features])).reshape(-1, 1)), color="blue"
)
plt.scatter(
    validation_data[target_feature],
    target_scaler.inverse_transform(model.predict(input_scaler.transform(validation_data[kpi_features])).reshape(-1, 1)), color="green"
)
plt.plot(all_data[target_feature], all_data[target_feature], color="red")

plt.legend([
    f"Training data", 
    f"Validation data"
])# plt.title("EXP data")
plt.xlabel(f"Actual {target_feature[0]}")
plt.ylabel(f"Predicted {target_feature[0]}")
# plt.savefig("acp_scatter.png", dpi=500, bbox_inches="tight")

# Repeatibility test

In [None]:
import tpot
from sklearn.model_selection import RepeatedKFold
from sklearn import metrics

xtrain_, ytrain_ = input_scaler.transform(training_data[kpi_features]), target_scaler.transform(training_data[target_feature].values)

repeated_predictions_ = []
for i__ in range(2):
    
 
    cv = RepeatedKFold(n_splits=10, n_repeats=10, random_state=1)
    model__ = tpot.TPOTRegressor(
        generations=5, 
        population_size=100, 
        scoring='neg_mean_absolute_error', 
        cv=cv, 
        verbosity=0, 
        random_state=1, 
        n_jobs=-1
    )

    model__.fit(xtrain_, ytrain_)
    repeated_predictions_.append(model__.predict(xtrain_).reshape(-1, 1))

# Y-scrambeling

In [None]:
import tpot
from sklearn.model_selection import RepeatedKFold
from sklearn import metrics

xtrain_, ytrain_ = input_scaler.transform(training_data[kpi_features]), target_scaler.transform(training_data[target_feature].values)

r2_suffeled = []
for i__ in range(100):
    
    np.random.shuffle(ytrain_)
    
    cv = RepeatedKFold(n_splits=10, n_repeats=10, random_state=1)
    model__ = tpot.TPOTRegressor(
        generations=5, 
        population_size=100, 
        scoring='neg_mean_absolute_error', 
        cv=cv, 
        verbosity=0, 
        random_state=1, 
        n_jobs=-1
    )

    model__.fit(xtrain_, ytrain_)
    y_pred_ = model__.predict(xtrain_).reshape(-1, 1)
    
    r2_suffeled.append(metrics.r2_score(y_true=ytrain_, y_pred=y_pred_))
    
    if i__ % 10 == 0:
        print(i__)
    

# Random flactuation test

In [None]:
import tpot
from sklearn.model_selection import RepeatedKFold

all_r2 = []
all_rmse = []
all_mae = []

for ii in range(100):
    
    cv = RepeatedKFold(
        n_splits=10,
        n_repeats=10,
        random_state=np.random.randint(1000)
    )
    xtrain, ytrain = input_scaler.transform(training_data[kpi_features]), target_scaler.transform(training_data[target_feature].values)

    model = tpot.TPOTRegressor(
        generations=5, 
        population_size=100, 
        scoring='neg_mean_absolute_error', 
        cv=cv, 
        verbosity=0, 
        random_state=np.random.randint(1000), 
        n_jobs=-1
    )

    model.fit(xtrain, ytrain)

    y_hat = target_scaler.inverse_transform(model.predict(xtrain).reshape(-1, 1)).reshape(-1, 1)
    
    all_r2.append(metrics.r2_score(y_true=training_data[target_feature].values.reshape(-1, 1), y_pred=y_hat))
    all_rmse.append(metrics.mean_squared_error(y_true=training_data[target_feature].values.reshape(-1, 1), y_pred=y_hat, squared=False))
    all_mae.append(metrics.mean_absolute_error(y_true=training_data[target_feature].values.reshape(-1, 1), y_pred=y_hat))

    
    if ii % 10 == 0:
        print(ii, " percent passed.")

# SHAP analysis

In [None]:
# Shape analysis of final model 
import shap

explainer = shap.KernelExplainer(
    model.predict, 
    input_scaler.transform(all_data[kpi_features]) 
)

shap_values = explainer.shap_values(
    input_scaler.transform(all_data[kpi_features]),
    # nsamples=420,
    # alpha=0.002
)


In [None]:
f = plt.figure()
shap.bar_plot(np.std(shap_values[0], axis=0), feature_names=kpi_features)
# f.savefig("shape_bar_plot_acp_rate.png", dpi=1000, bbox_inches="tight")

In [None]:
# this part already been calculated and you can find it in the data folder.


shape_value_anlaysis = {}
for indexer, i in enumerate(kpi_features):
    
    mean__ = input_scaler.mean_.tolist()[indexer]
    std__ = np.sqrt(input_scaler.var_.tolist()[indexer])
    
    fig, ax, x, y, hist, features, features_tmp = partial_dependence(
        npoints=2000, 
        ind=i,
        model=model.predict, 
        data=input_scaler.transform(all_data[kpi_features]), 
        xmin='percentile(0)',
        xmax='percentile(100)',
        model_expected_value=False, 
        feature_expected_value=False, 
        feature_names=kpi_features, 
        ylabel=target_feature[0], 
        show=False, ice=False, hist=False)  # shoe=False
    
    
    shape_value_anlaysis[i] = {
        # "fig": fig,
        # "ax": ax,
        f"{i}": revers_feat(x, mean=mean__, std=std__),
        "mean_Rate": target_scaler.inverse_transform(y[0].reshape(-1, 1)).reshape(-1, ),
        "std_Rate": target_scaler.inverse_transform(y[1].reshape(-1, 1)).reshape(-1, ),
        "min_Rate": target_scaler.inverse_transform(y[2].reshape(-1, 1)).reshape(-1, ),
        "max_Rate": target_scaler.inverse_transform(y[3].reshape(-1, 1)).reshape(-1, ),
        "median_Rate": target_scaler.inverse_transform(y[4].reshape(-1, 1)).reshape(-1, ),
        #"hist": hist,
        #"features": features
    }
    #plt.figure()
    #plt.scatter(ax.get_lines()[0].get_xdata(), ax.get_lines()[0].get_ydata(), s=[2]*len(ax.get_lines()[0].get_xdata()))
    # plt.savefig(f"shape_analysis/acp_rate/shape_partial_dependency_{i}_plot.jpg", dpi=1000, bbox_inches="tight")
    
    pd.DataFrame(shape_value_anlaysis[i]).to_excel(f"C:/Users/z5326694/OneDrive - UNSW/Desktop/PhD/PhD papers/light intensity paper/paper_data/shape_partial_dependence_{i}.xlsx")

# Load model for prediction

In [None]:

import os
import joblib
model = joblib.load(os.path.join("acp_rate_model.sav"))

with open(os.path.join("acp_rate_model.txt")) as f:
    print(f.read())


In [None]:
kpi_features = ['ML', 'AcP', 'AC', 'MWF', 'ATI']
target_feature = ['Rate']