In [40]:
import pandas as pd
import numpy as np
from scipy import stats
from scipy.optimize import differential_evolution
from sklearn.decomposition import PCA
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MaxAbsScaler, MinMaxScaler, Normalizer, PowerTransformer, RobustScaler, StandardScaler, PolynomialFeatures
from sklearn.metrics import max_error, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, median_absolute_error, r2_score
from sklearn.base import BaseEstimator, RegressorMixin
from GRNN import GRNN, calculate_error_cost

In [41]:
scaler_name = 'Task1_80_20_Z3_MaxAbsScalerXY'
model_name = 'HistGradientBoostingRegressor'
embedded_name = 'SGTM' # SGTM PCA KernelPCA IncrementalPCA FastICA
score_name = 'R2'

In [42]:
# Load all data with pandas
data = pd.read_csv('/content/Tunneling_Induced_building_damage_dataset.txt', sep='\t')
data = data.drop(labels = 'Tot No. Simulations', axis=1)
# remove outliers
data = data[(np.abs(stats.zscore(data.select_dtypes(exclude='object'))) < 3).all(axis=1)]
data_columns = data.columns
data.head()
X = data.iloc[:,:15].to_numpy()
# Y = data.iloc[:,15:]
Y = data.iloc[:,[29, 30]]
# Y = Y.iloc[:,1:-2]
target_columns = Y.columns
Y = Y.to_numpy()
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
# Transform data
scaler_x = MaxAbsScaler()
scaler_y = MaxAbsScaler()

X_train = scaler_x.fit_transform(X_train)
Y_train = scaler_y.fit_transform(Y_train)

X_test = scaler_x.transform(X_test)
# Y_test = scaler_y.transform(Y_test) # use inverse transform instead of

In [43]:
class ErrorResultsLogger(object):
    def __init__(self):
        self.results = {}

    def relative_root_mean_squared_error(self, true, pred):
        n = len(true) # update
        num = np.sum(np.square(true - pred)) / n  # update
        den = np.sum(np.square(pred))
        squared_error = num / den
        rrmse_loss = np.sqrt(squared_error)
        return rrmse_loss

    def calculate_errors(self, column_name, y_test, y_pred):
        MaxError = max_error                         (y_test, y_pred)
        MedError = median_absolute_error             (y_test, y_pred)
        MAE = mean_absolute_error                    (y_test, y_pred)
        MAPE = mean_absolute_percentage_error        (y_test, y_pred)
        MSE = mean_squared_error                     (y_test, y_pred)
        RMSE = mean_squared_error                    (y_test, y_pred, squared=False)
        RRMSE = self.relative_root_mean_squared_error(y_test, y_pred)
        R2 = r2_score                                (y_test, y_pred)
        self.results.update({
        f'{column_name}':{
            'MaxError' : MaxError,
            'MedError' : MedError,
            'MAE' : MAE,
            'MAPE' : MAPE,
            'MSE' : MSE,
            'RMSE' : RMSE,
            'RRMSE' : RRMSE,
            'R2' : R2
        }})

    def save_errors_results(self, file_name = '.xlsx'):
      results = pd.DataFrame(self.results)
      results.to_excel(file_name)

In [44]:
results = {}
y_train_predictions_1 = []
y_test_predictions_1 = []
test_logger = ErrorResultsLogger()

for i, (y_train, y_test) in enumerate(zip(Y_train.T, Y_test.T)):
    print(f'Training: {target_columns[i]}')
    regressor = HistGradientBoostingRegressor(random_state=42)
    regressor.fit(X_train, y_train)
    # Predict
    y_train_pred = regressor.predict(X_train)
    y_test_pred = regressor.predict(X_test)
    # Save results for errors
    y_train_predictions_1.append(y_train_pred)
    y_test_predictions_1.append(y_test_pred)
    # Log erorrs
    # test_logger.calculate_errors('train_'+target_columns[i], y_train, y_train_pred)
    # test_logger.calculate_errors('test_'+target_columns[i], y_test, y_test_pred)

# No need to transfrom train because it was already transformed
y_train_pred = np.stack(y_train_predictions_1, axis=1)
# use inverse transform on test for errors calculation
y_test_pred_inverse = scaler_y.inverse_transform(np.stack(y_test_predictions_1, axis=1))

for i, (y_train, y_train_pred, y_test, y_test_pred) in enumerate (
    zip(Y_train.T, y_train_pred.T, Y_test.T, y_test_pred_inverse.T)):

    test_logger.calculate_errors('train_'+target_columns[i], y_train, y_train_pred)
    test_logger.calculate_errors('test_'+target_columns[i], y_test, y_test_pred)

# Save xlsx
test_logger.save_errors_results(f'{scaler_name}_{model_name}_{score_name}.xlsx')

Training: Local Avg.
Training: Global Avg.


In [45]:
from sklearn.decomposition import PCA, FastICA, IncrementalPCA, KernelPCA, TruncatedSVD
from gtm import MultiLabelGTM

train_y_regressor = np.stack(y_train_predictions_1, axis=1)
test_y_regressor = np.stack(y_test_predictions_1, axis=1)

train_decomposition_input = np.concatenate((X_train, train_y_regressor), axis=1)
test_decomposition_input = np.concatenate((X_test, test_y_regressor), axis=1)

if embedded_name == 'PCA':
    pca = PCA()
    X_train_new = pca.fit_transform(train_decomposition_input)
    X_test_new = pca.transform(test_decomposition_input)

elif embedded_name == 'KernelPCA':
    pca = KernelPCA(kernel = 'poly')
    X_train_new = pca.fit_transform(train_decomposition_input)
    X_test_new = pca.transform(test_decomposition_input)

elif embedded_name == 'FastICA':
    pca = FastICA()
    X_train_new = pca.fit_transform(train_decomposition_input)
    X_test_new = pca.transform(test_decomposition_input)

elif embedded_name == 'IncrementalPCA':
    pca = IncrementalPCA()
    X_train_new = pca.fit_transform(train_decomposition_input)
    X_test_new = pca.transform(test_decomposition_input)

elif embedded_name == 'TruncatedSVD':
    pca = TruncatedSVD()
    X_train_new = pca.fit_transform(train_decomposition_input)
    X_test_new = pca.transform(test_decomposition_input)

elif embedded_name == 'SGTM':
    gtm = MultiLabelGTM(center_of_mass = True)
    gtm.fit(train_decomposition_input, train_decomposition_input)
    X_train_new = gtm.predict(train_decomposition_input)
    X_test_new = gtm.predict(test_decomposition_input)

17
Step: 1
Step: 2
Step: 3
Step: 4
Step: 5
Step: 6
Step: 7
Step: 8
Step: 9
Step: 10
Step: 11
Step: 12
Step: 13
Step: 14
Step: 15
Step: 16
Step: 17
--- 0.029486894607543945 seconds ---


In [46]:
# from lazypredict.Supervised import LazyRegressor
from sklearn import datasets
from sklearn.utils import shuffle
import numpy as np

results = {}
y_train_predictions_2 = []
y_test_predictions_2 = []
test_logger = ErrorResultsLogger()

# poly = PolynomialFeatures(2)
# X_train_new = poly.fit_transform(X_train_new) # [:,:10]
# X_test_new = poly.transform(X_test_new) # [:,:10]

X_train = X_train_new
X_test = X_test_new

for i, (y_train, y_test) in enumerate(zip(Y_train[:].T, Y_test[:].T)):
    print(f'Training: {target_columns[i]}')
    regressor = HistGradientBoostingRegressor(random_state=42)
    regressor.fit(X_train, y_train)
    # Predict
    y_train_pred = regressor.predict(X_train)
    y_test_pred = regressor.predict(X_test)
    # Save results
    y_train_predictions_2.append(y_train_pred)
    y_test_predictions_2.append(y_test_pred)
    # Log erorrs
    # test_logger.calculate_errors('train_' + target_columns[i], y_train, y_train_pred)
    # test_logger.calculate_errors('test_' + target_columns[i], y_test, y_test_pred)

# No need to transfrom train because it was already transformed
y_train_pred = np.stack(y_train_predictions_2, axis=1)
# use inverse transform on test for errors calculation
y_test_pred_inverse = scaler_y.inverse_transform(np.stack(y_test_predictions_2, axis=1))

for i, (y_train, y_train_pred, y_test, y_test_pred) in enumerate (
    zip(Y_train.T, y_train_pred.T, Y_test.T, y_test_pred_inverse.T)):
    test_logger.calculate_errors('train_'+target_columns[i], y_train, y_train_pred)
    test_logger.calculate_errors('test_'+target_columns[i], y_test, y_test_pred)

# Save xlsx
test_logger.save_errors_results(f'{scaler_name}_{model_name}_{embedded_name}_{score_name}.xlsx')


# for i, (y_train, y_test) in enumerate(zip(Y_train[:].T, Y_test[:].T)):
#   print(f'Training: {target_columns[i]}')
#   reg = LazyRegressor(verbose=0, ignore_warnings=False, custom_metric=None)
#   models,predictions = reg.fit(X_train, X_test, y_train, y_test)
#   print(models)
#   print()

Training: Local Avg.
Training: Global Avg.


In [47]:
import numpy as np
from sklearn.model_selection import KFold

# X y input
kf = KFold(n_splits=10, shuffle=True, random_state=42)
print(kf)
for i, (train_index, test_index) in enumerate(kf.split(X)):
    print(f"Fold {i+1}:")
    print(f"  Train: index={len(train_index)}")
    print(f"  Test:  index={len(test_index)}")

KFold(n_splits=10, random_state=42, shuffle=True)
Fold 1:
  Train: index=800
  Test:  index=89
Fold 2:
  Train: index=800
  Test:  index=89
Fold 3:
  Train: index=800
  Test:  index=89
Fold 4:
  Train: index=800
  Test:  index=89
Fold 5:
  Train: index=800
  Test:  index=89
Fold 6:
  Train: index=800
  Test:  index=89
Fold 7:
  Train: index=800
  Test:  index=89
Fold 8:
  Train: index=800
  Test:  index=89
Fold 9:
  Train: index=800
  Test:  index=89
Fold 10:
  Train: index=801
  Test:  index=88
