In [1]:
#https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter

In [2]:
import os
import sys
import h5py
import numpy as np
import pandas as pd
from datetime import datetime
import glob

import matplotlib.pyplot as plt
import seaborn as sns
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
from sklearn.base import clone
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.metrics import make_scorer

from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor, AdaBoostRegressor, BaggingRegressor, ExtraTreesRegressor, StackingRegressor, VotingRegressor, HistGradientBoostingRegressor

from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.tree import DecisionTreeRegressor

2023-08-02 07:56:58.583527: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-02 07:56:58.679841: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
def get_group_data(group):
    data_list = []
    dt_list = []
    filename_list = []

    for key in group.keys():
        
        for key2 in group[key].keys():

            dt = datetime.strptime(' '.join([key.split('__')[-2], key.split('__')[-1].replace('-', ':')]), 
                                        "%Y-%m-%d %H:%M:%S:%f")
            data = np.array(group[key][key2])
            data_list.append(data)
            dt_list.append(dt)
            filename_list.append(key2)
    
    df = pd.DataFrame({'filename': filename_list, 'data': data_list, 'dt': dt_list})
    return df


def get_translations(positive, negative):
    x_translation_list = []
    y_translation_list = []
    dt_list = []

    for i in range(len(positive['data'])):
        data = positive['data'][i]
        x_translation = data[0, 3]
        y_translation = data[2, 3]
        dt = positive['dt'][i].strftime('%H:%M:%S')
        x_translation_list.append(x_translation)
        y_translation_list.append(y_translation)
        dt_list.append(dt)

    positive_df = pd.DataFrame({'x_translation': x_translation_list, 'y_translation': y_translation_list, 'dt': dt_list})

    x_translation_list = []
    y_translation_list = []
    dt_list = []

    for i in range(len(negative['data'])):
        data = negative['data'][i]
        x_translation = data[0, 3]
        y_translation = data[2, 3]
        dt = negative['dt'][i].strftime('%H:%M:%S')
        x_translation_list.append(x_translation)
        y_translation_list.append(y_translation)
        dt_list.append(dt)

    negative_df = pd.DataFrame({'x_translation': x_translation_list, 'y_translation': y_translation_list, 'dt': dt_list})

    return positive_df, negative_df


def plot_translations(positive_df, negative_df):
    positive_df['type'] = 'Positive'
    negative_df['type'] = 'Negative'

    df = pd.concat([positive_df, negative_df])

    df = df.melt(id_vars=['dt', 'type'], value_vars=['x_translation', 'y_translation'], var_name='axis', value_name='translation')

    g = sns.relplot(data=df, x='dt', y='translation', hue='axis', col='type', kind='line')

    # Reduce the number of x-axis labels
    x_ticks = g.axes[0][0].get_xticks()
    g.set(xticks=x_ticks[::10])
    g.set_xticklabels(df['dt'].unique()[::10], rotation=45)

    # Update axis titles
    g.set_axis_labels('Time', 'Translation')    
    g.set_titles(row_template = '{row_name}', col_template = '{col_name}')
    
    plt.show()


def normalize_data(df):
    # Create a StandardScaler object
    scaler = StandardScaler()
    
    # Fit the scaler to the data
    scaler.fit(df)
    
    # Transform the data
    df_norm = scaler.transform(df)
    
    return df_norm, scaler


def denormalize_y(y_norm, scaler):
    # Transform the data back to its original scale
    y = scaler.inverse_transform(y_norm)
    
    return y


def sign_mse_loss(y_true, y_pred):
    # Calculate the mean squared error
    mse = np.mean((y_true - y_pred) ** 2)
    
    # Calculate the sign error
    sign_error = np.mean((np.sign(y_true) != np.sign(y_pred)).astype(float))
    
    # Calculate the total loss
    loss = mse + sign_error
    
    return loss


def get_metrics(y_test, y_pred, X):
    
    # Calculate the evaluation metrics
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)
    adj_r2 = 1 - (1 - r2) * (y_test.shape[0] - 1) / (y_test.shape[0] - X.shape[1] - 1)
    loss = sign_mse_loss(y_test, y_pred)

    print(f'Mean Absolute Error: {mae:.2f}')
    print(f'Root Mean Squared Error: {rmse:.2f}')
    print(f'R-squared: {r2:.2f}')
    print(f'Adjusted R-squared: {adj_r2:.2f}')
    print(f'Sign MSE Loss: {loss:.2f}')

    return mae, rmse, r2, adj_r2, loss


# def get_metrics(y_test, y_pred, X):
    
#     # Calculate the evaluation metrics
#     mae = np.abs(y_test - y_pred)
#     rmse = np.sqrt((y_test - y_pred) ** 2)
#     r2 = r2_score(y_test, y_pred, multioutput='raw_values')
#     adj_r2 = np.full_like(r2, np.nan)
#     loss = sign_mse_loss(y_test, y_pred)

#     # print(f'Mean Absolute Error: {mae}')
#     # print(f'Root Mean Squared Error: {rmse}')
#     print(f'R-squared: {r2}')
#     # print(f'Adjusted R-squared: {adj_r2}')
#     # print(f'Sign MSE Loss: {loss}')

#     return mae, rmse, r2, adj_r2, loss



def plot_results(results):
    
    # Extract the data from the results
    names = [x[0] for x in results]
    mae_values = [x[1] for x in results]
    rmse_values = [x[2] for x in results]
    r2_values = [x[3] for x in results]

    # Create a bar chart of the MAE values
    plt.figure(figsize=(10, 6))
    plt.bar(names, mae_values)
    plt.title('Mean Absolute Error')
    plt.ylabel('MAE')
    plt.xticks(rotation=45)
    plt.show()

    # Create a bar chart of the RMSE values
    plt.figure(figsize=(10, 6))
    plt.bar(names, rmse_values)
    plt.title('Root Mean Squared Error')
    plt.ylabel('RMSE')
    plt.xticks(rotation=45)
    plt.show()

    # Create a bar chart of the R-squared values
    plt.figure(figsize=(10, 6))
    plt.bar(names, r2_values)
    plt.title('R-squared')
    plt.ylabel('R-squared')
    plt.xticks(rotation=45)
    plt.show()


In [4]:
# Get a list of all H5 files
# h5_files = glob.glob('scanner3DTop_Transformations/TESTDATASET_*/*.h5')
h5_files = glob.glob('scanner3DTop_Transformations/*/*.h5')
df_list = []

for h5_file in h5_files:

    # Open H5 file
    h5 = h5py.File(h5_file, 'r')

    # Get negative direction transformations
    negative = get_group_data(group=h5["EW/individual/transformations/negative/"])

    # Get positive direction transformations
    positive = get_group_data(group=h5["EW/individual/transformations/positive/"])

    # Combine positive and negative transformations
    transformations = pd.concat([positive, negative])

    # Extract Environment Logger data
    df = pd.read_hdf(h5_file, 'environment_logger') 

    # Merge transformations and Environment logger data
    df = df.merge(transformations, on='filename')

    # Drop unwanted columns
    df = df.drop(['directories', 'filename', 'dt'], axis=1)

    # Flatten the transformations
    df['data'] = df['data'].apply(lambda x: x.flatten())

    # dummies_field = pd.get_dummies(df['field'], prefix='field')
    # dummies_scan_direction = pd.get_dummies(df['scan_direction'], prefix='scan_direction')
    # df = pd.concat([df, dummies_field, dummies_scan_direction], axis=1)
    # df = df.drop(['field', 'scan_direction'], axis=1)
    df['field'] = df['field'].map({'north': 0.0, 'south': 1.0})
    df['scan_direction'] = df['scan_direction'].map({'Negative': 0.0, 'Positive': 1.0})

    # Reset the index of your DataFrame
    df = df.reset_index(drop=True)

    # Add to list
    df_list.append(df)

df = pd.concat(df_list)
df = df.dropna()
# df = df[df['scan_direction']==1]
df = df.drop(['brightness'], axis=1)

In [5]:
# Split your data into input features X and output targets y
X = df.drop('data', axis=1)

# Extract X, Y, Z transformations
df['data'] = df['data'].apply(lambda x: x.reshape(4, 4)[:3,3])#[:3])

# Extract the transformations from the "data" column of the dataframe
y = np.stack(df['data'].values)

# Flatten the transformations to create a 2D array
y = y.reshape(y.shape[0], -1)

# Convert any Timestamps in the input features to a numerical representation
X = X.apply(lambda x: x.astype(int) if np.issubdtype(x.dtype, np.datetime64) else x)

# Convert only the columns with numeric data types to float
X[X.select_dtypes(include='number').columns] = X.select_dtypes(include='number').astype(float)

# Normalize the y variable
# X, scaler = normalize_data(X)

# Split your data into training, validation, and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Create a custom scorer
sign_mse_scorer = make_scorer(sign_mse_loss, greater_is_better=False)

In [6]:
y_test

array([[   5.,   35.,    0.],
       [ -20.,  -40.,    0.],
       [-135.,  -95.,    0.],
       [  10.,   30.,    5.],
       [-155., -140.,    0.],
       [  15.,   15.,    0.],
       [ -55.,   -5.,    0.],
       [ -35.,   25.,    0.],
       [ -45.,    5.,    0.],
       [  20.,   40.,    0.],
       [   5.,   15.,    0.],
       [  20.,   -5.,    0.],
       [ 210.,  150.,    0.],
       [ 120.,   50.,    0.],
       [ -55.,   10.,    0.],
       [   0.,    0.,    0.],
       [ -35.,  -15.,    0.],
       [ -15.,    0.,    0.],
       [ -35.,    5.,    0.],
       [  20.,  -20.,    0.],
       [ -25.,    5.,    0.],
       [ -30.,    5.,    0.],
       [ -15.,    5.,    0.],
       [ 150.,   70.,    0.],
       [ 105.,   15.,    0.],
       [  60.,   45.,    0.],
       [  20.,   35.,    0.],
       [ -55.,  -35.,    0.],
       [ -35.,  -45.,    0.],
       [ -25.,  -10.,    0.],
       [ 135.,   60.,    0.],
       [ -85.,  -60.,    0.],
       [ -30.,    0.,    0.],
       [ 1

In [7]:
assert len(X_train) == len(y_train)
assert len(X_test) == len(y_test)

In [8]:
# Define a list of models to evaluate
models = [
    ('Linear Regression', LinearRegression()),
    ('Ridge Regression', Ridge(random_state=0)),
    ('Lasso Regression', Lasso(random_state=0)),
    ('Elastic Net Regression', ElasticNet(random_state=0)),
    ('Random Forest Regressor', RandomForestRegressor(random_state=0)),
    ('BaggingRegressor', BaggingRegressor(random_state=0)),
    ('ExtraTreesRegressor', ExtraTreesRegressor(random_state=0))
    # ('Support Vector Regression', SVR()),
    # ('Neural Network', MLPRegressor(random_state=0))
]

# Evaluate each model
results = []
for name, model in models:
    print(name)
    # Train the model
    model.fit(X_train, y_train)
    
    # Make predictions on the test set
    y_pred = model.predict(X_test)

    # Calculate the evaluation metrics
    mae, rmse, r2, adj_r2, loss = get_metrics(y_test, y_pred, X_test)
    print('\n')

    # Save the results
    results.append((name, mae, rmse, r2, adj_r2, loss))

# del y_pred
# plot_results(results=results)

Linear Regression
Mean Absolute Error: 32.97
Root Mean Squared Error: 56.14
R-squared: 0.01
Adjusted R-squared: -0.08
Sign MSE Loss: 3151.96


Ridge Regression
Mean Absolute Error: 32.89
Root Mean Squared Error: 56.20
R-squared: 0.10
Adjusted R-squared: 0.01
Sign MSE Loss: 3159.16


Lasso Regression
Mean Absolute Error: 32.90
Root Mean Squared Error: 56.28
R-squared: 0.04
Adjusted R-squared: -0.05
Sign MSE Loss: 3168.42


Elastic Net Regression
Mean Absolute Error: 32.90
Root Mean Squared Error: 56.28
R-squared: 0.04
Adjusted R-squared: -0.05
Sign MSE Loss: 3167.65


Random Forest Regressor


  return linalg.solve(A, Xy, sym_pos=True, overwrite_a=True).T


Mean Absolute Error: 31.00
Root Mean Squared Error: 60.74
R-squared: 0.20
Adjusted R-squared: 0.12
Sign MSE Loss: 3690.19


BaggingRegressor
Mean Absolute Error: 33.50
Root Mean Squared Error: 65.00
R-squared: 0.06
Adjusted R-squared: -0.03
Sign MSE Loss: 4225.45


ExtraTreesRegressor
Mean Absolute Error: 30.68
Root Mean Squared Error: 58.45
R-squared: 0.24
Adjusted R-squared: 0.16
Sign MSE Loss: 3416.91




In [9]:
y_test[0]

array([ 5., 35.,  0.])

In [10]:
y_pred[0]

array([16.8 , 35.85,  0.  ])

In [11]:
y = np.stack(df['data'].values)

In [12]:
y[0]

array([-30.,  10.,   0.])

In [13]:
y = y.reshape(y.shape[0], -1)

In [14]:
y[0]

array([-30.,  10.,   0.])

In [15]:
from sklearn.model_selection import GridSearchCV

# # Define the parameter grid
# param_grid = {
#     'n_estimators': [10, 50, 100, 200], 
#     'max_depth': [None, 10, 20, 30],
#     'min_samples_split': [2, 5, 10],
#     'min_samples_leaf': [1, 2, 4],
# }

param_grid = {
    'n_estimators': [10, 50, 100, 200],
    'criterion': ['squared_error', 'absolute_error', 'friedman_mse'], #, 'poisson'],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4, 6, 8, 10],
    'min_weight_fraction_leaf': [0.0, 0.1, 0.2],
    'max_features': [None, 'sqrt', 'log2'],
    'max_leaf_nodes': [None, 10, 20, 30],
    'min_impurity_decrease': [0.0, 0.1, 0.2],
    'bootstrap': [True, False],
    'oob_score': [True, False],
    'warm_start': [True, False],
    'ccp_alpha': [0.0, 0.1, 0.2],
    'max_samples': [None, 0.5, 0.8]
}

# Create a random forest regressor
model = RandomForestRegressor(random_state=0, n_jobs=-1)

# Create a grid search object
grid_search = GridSearchCV(model, param_grid, cv=5, scoring='neg_mean_absolute_error') #neg_mean_squared_error

# Fit the grid search object to the data
grid_search.fit(X_train, y_train)

# Get the best parameters
best_params = grid_search.best_params_

# Train a random forest regressor with the best parameters
best_model = RandomForestRegressor(**best_params, random_state=0, n_jobs=-1)
best_model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = best_model.predict(X_test)

mae, rmse, r2, adj_r2, loss = get_metrics(y_test, y_pred, X_test)

In [None]:
y_test[1]

In [None]:
y_pred[1]

In [None]:
# from sklearn.ensemble import ExtraTreesRegressor
# from sklearn.model_selection import GridSearchCV

# # Define the parameter grid
# param_grid = {
#     'n_estimators': [10, 50, 100, 200, 300, 400, 500],
#     'max_depth': [None, 10, 20, 30],
#     'min_samples_split': [2, 5, 10],
#     'min_samples_leaf': [1, 2, 4],
# }

# # Create an ExtraTreesRegressor
# model = ExtraTreesRegressor(random_state=0, n_jobs=-1)
# # model = BaggingRegressor(model, random_state=0, n_jobs=-1)

# # Create a grid search object
# grid_search = GridSearchCV(model, param_grid, cv=5, scoring='neg_mean_absolute_error')

# # Fit the grid search object to the data
# grid_search.fit(X_train, y_train)

# # Get the best parameters
# best_params = grid_search.best_params_

# # Train an ExtraTreesRegressor with the best parameters
# best_model = ExtraTreesRegressor(**best_params, random_state=0, n_jobs=-1)
# # best_model = BaggingRegressor(best_model, random_state=0, n_jobs=-1)
# best_model.fit(X_train, y_train)

# # Make predictions on the test set
# y_pred = best_model.predict(X_test)

# mae, rmse, r2, adj_r2, loss = get_metrics(y_test, y_pred)


In [None]:
# best_params

In [None]:
# denormalize_y(y_test, scaler)

# denormalize_y(y_pred, scaler)