In [None]:
import h5py
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import animation
import scipy
from scipy import signal
from scipy import integrate
from scipy.fftpack import fft
from scipy.fftpack import fftfreq
from scipy import stats
from scipy.stats import kurtosis, skew
from scipy.signal import find_peaks
from sklearn import preprocessing
from sklearn.svm import OneClassSVM
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
from sklearn.cluster import KMeans
from xgboost import XGBRegressor
from sklearn.ensemble import RandomForestRegressor
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import LSTM
from tensorflow.keras.models import model_from_json
from tensorflow.keras import layers,models
from tensorflow.keras import callbacks
from tensorflow.keras.utils import plot_model
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import Normalizer
import datetime
from sklearn.metrics import mean_squared_error
import warnings
import random
import math
from math import pi
import seaborn as sns
import shap
from mlxtend.evaluate import bias_variance_decomp
import timeit
warnings.filterwarnings("ignore")
%matplotlib inline
plt.rcParams['agg.path.chunksize'] = 10000

plt.rcParams.update({'font.size': 18})
plt.rcParams.update({'font.family': 'Arial'})

np.random.seed(7)

# Feed forward neural network

In [None]:
load_data = pd.read_csv(r'M:\THESIS_IPT\MRIDUL\data_overlapping_windows\points1000_20ms\Combined_final_ALL_noOutliers.csv')

# converting avg peak value from mm to microns
load_data['avg_peak'] = 1000*load_data['avg_peak']

load_data

In [None]:
(load_data['avg_peak']==0.1).sum()

In [None]:
# dropping some zero values to balance the dataset
zero_index = []
last = load_data.columns.get_loc("avg_peak")
for i in range(load_data.shape[0]):
    if load_data.iloc[i,last]==0.1:
        zero_index.append(i)
        
rand_zero_index = random.sample(zero_index,12000)
load_data.drop(load_data.index[rand_zero_index], axis=0, inplace=True)

In [None]:
Y_nn = load_data['avg_peak']
X_nn = load_data.drop(['avg_peak'], axis=1)

# dividing data into training and test sets
Y_nn = np.ravel(Y_nn)
X_TrainVal, X_test, Y_TrainVal, Y_test = train_test_split(X_nn, Y_nn, 
                                                    test_size = 0.1, 
                                                    random_state = 3,
                                                    shuffle = True)

X_train, X_val, Y_train, Y_val = train_test_split(X_TrainVal, Y_TrainVal, 
                                                    test_size = 0.3, 
                                                    random_state = 3,
                                                    shuffle = True)

In [None]:
# scaling the input to neural network
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
X_val = scaler.transform(X_val)

Y_train = Y_train.reshape(-1, 1)
Y_val = Y_val.reshape(-1, 1)
scaler2 = StandardScaler().fit(Y_train)
Y_train = scaler2.transform(Y_train)
Y_val = scaler2.transform(Y_val)

In [None]:
a = X_nn.shape[1]

# NN model
model = Sequential()
model.add(Dense(a, activation = 'relu', input_shape = (a,)))
model.add(Dense(units = a//2, activation = 'relu', kernel_regularizer = tf.keras.regularizers.l2(0.01)))
model.add(Dropout(0.2))
model.add(Dense(units = a//2, activation = 'relu', kernel_regularizer = tf.keras.regularizers.l2(0.01)))
model.add(Dense(units = 1, activation = 'linear'))
print(model.summary())

# Compile model
adam = tf.keras.optimizers.Adam(learning_rate=1e-2)
sgd = tf.keras.optimizers.SGD(learning_rate=1e-2)
model.compile(optimizer = 'adam',
              loss = tf.keras.losses.Huber(), 
              metrics=['mse', 'mae']
             )

start_time = timeit.default_timer()

# Fit the model
history = model.fit(X_train, Y_train, epochs=200, batch_size=8000, validation_data=(X_val, Y_val), verbose=2)

end_time = timeit.default_timer()

In [None]:
print(end_time - start_time)

In [None]:
# training curves

plt.figure(figsize = (10, 5))
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss magnitude')
plt.legend()
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\loss_fnn_reg.png',bbox_inches='tight',dpi=1000)
plt.show()

In [None]:
# predictions using the model

X_test_prediction = model.predict(X_test)
Y_prediction = scaler2.inverse_transform(X_test_prediction)
Y_prediction = Y_prediction.reshape((Y_prediction.shape[0],))
Y_prediction[Y_prediction < 0] = 0.0001

In [None]:
# performance plots

xlim = Y_test.shape[0]
xaxis = np.arange(0,xlim,25)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax1.plot(xaxis, Y_test[0:xlim:25],'bo-',label = 'Real data',markerfacecolor='none')
ax1.plot(xaxis, Y_prediction[0:xlim:25], 'rx--',label = 'Predicted data')
ax2.plot(Y_prediction[5400:5450], 'rx--')
ax2.plot(Y_test[5400:5450],'bo-',markerfacecolor='none')

ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_ylabel('Average peak value [$\mu$m]')
ax1.set_xlabel('Data points (Sampled Test data)')
ax2.set_xlabel('Data points (Subset of Test data)')
#fig.legend(fontsize=18)

#plt.savefig(r'M:\THESIS_IPT\REPORT\images\fnn_reg.png',bbox_inches='tight',dpi=1000)

In [None]:
# metrics

mae = metrics.mean_absolute_error(Y_test, Y_prediction)
rmse = metrics.mean_squared_error(Y_test, Y_prediction, squared = False)

print("Mean absolute error: ", mae)
print("Root mean square error: ", rmse)

In [None]:
# statistical features of test data

df = pd.DataFrame(Y_test)
df.describe()

In [None]:
# statistical features of predicted values

df = pd.DataFrame(Y_prediction)
df.describe()

In [None]:
# plots

df = pd.DataFrame(Y_test, columns=['True'])
df['Predicted'] = Y_prediction.tolist()
df = df.sort_values('True')

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax2.plot(df['True'], abs(df['Predicted']-df['True']), label = 'Error = |Predicted Value - True Value|')

ax1.plot(Y_prediction, 'rx',label = 'Predicted value')
ax1.plot(Y_test,'bx',label = 'True value', markerfacecolor='none')

ax2.set_ylabel('Absolute Error Values [$\mu$m]')
ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_xlabel('True Values (Test Data)')
ax1.set_xlabel('Data points (Test data)')
#fig.legend()
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\ffn_reg_error.png',bbox_inches='tight',dpi=1000)
plt.show()

# Residual neural network

In [None]:
# identity block
def identity_block(input_tensor,units):
    x = layers.Dense(units)(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Dense(units)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Dense(units)(x)
    x = layers.BatchNormalization()(x)

    x = layers.add([x, input_tensor])
    x = layers.Activation('relu')(x)

    return x

# dense block
def dens_block(input_tensor,units):
    x = layers.Dense(units)(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Dense(units)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.Dense(units)(x)
    x = layers.BatchNormalization()(x)

    shortcut = layers.Dense(units)(input_tensor)
    shortcut = layers.BatchNormalization()(shortcut)

    x = layers.add([x, shortcut])
    x = layers.Activation('relu')(x)
    return x


def ResNet50Regression():
    Res_input = layers.Input(shape=(load_data.shape[1]-1,))

    width = 128

    x = dens_block(Res_input,width)
    x = identity_block(x,width)
    x = identity_block(x,width)

    x = layers.BatchNormalization()(x)
    x = layers.Dense(1, activation='softplus')(x)
    model = models.Model(inputs=Res_input, outputs=x)

    return model

# load data
load_data = pd.read_csv(r'M:\THESIS_IPT\MRIDUL\data_overlapping_windows\points1000_20ms\Combined_final_ALL_noOutliers.csv')

# converting avg peak value from mm to microns
load_data['avg_peak'] = 1000*load_data['avg_peak']

# dropping some zero values to balance the dataset
zero_index = []
last = load_data.columns.get_loc("avg_peak")
for i in range(load_data.shape[0]):
    if load_data.iloc[i,last]==0.1:
        zero_index.append(i)        
rand_zero_index = random.sample(zero_index,12000)
load_data.drop(load_data.index[rand_zero_index], axis=0, inplace=True)


y = load_data['avg_peak']
x = load_data.drop(['avg_peak'], axis=1)
y = np.ravel(y)
y = y.reshape(-1, 1)

# scaling the input to neural network
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
scaler_x.fit(x)
xscale = scaler_x.transform(x)
scaler_y.fit(y)
yscale = scaler_y.transform(y)
X_train, X_test, y_train, y_test = train_test_split(xscale, yscale, test_size = 0.1, random_state = 3, shuffle = True)

# Model
model = ResNet50Regression()

model.compile(loss= tf.keras.losses.Huber(),# 'mse'
              optimizer='adam', 
              metrics=['mse','mae'])
model.summary()

#compute running time
starttime = datetime.datetime.now()

history = model.fit(X_train, y_train, epochs=50, batch_size=1000, verbose=2, callbacks=[callbacks.EarlyStopping(monitor='val_loss', patience=10,verbose=2, mode='auto')], validation_split=0.1)

endtime = datetime.datetime.now()

# Save Model
#model.save('OptimalModelDataSet2.h5')
#plot_model(model, to_file='ResnetModel.png')
#from keras.models import load_model
#model.save('my_model.h5') 
#model = load_model('my_model.h5') 

# Model Predicting
yhat = model.predict(X_test)

print('The time cost: ')
print(endtime - starttime)
print('The test loss: ')
print(mean_squared_error(yhat,y_test))

#invert normalization
yhat = scaler_y.inverse_transform(yhat) 
y_test = scaler_y.inverse_transform(y_test) 

In [None]:
# training curves

plt.figure(figsize = (10, 5))
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss magnitude')
plt.legend()
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\loss_resnet_reg.png',bbox_inches='tight',dpi=1000)
plt.show()

In [None]:
# performance plots
xlim = y_test.shape[0]
xaxis = np.arange(0,xlim,25)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax1.plot(xaxis, y_test[0:xlim:25],'bo-',label = 'Real data',markerfacecolor='none')
ax1.plot(xaxis, yhat[0:xlim:25], 'rx--',label = 'Predicted data')
ax2.plot(yhat[1050:1100], 'rx--')
ax2.plot(y_test[1050:1100],'bo-',markerfacecolor='none')

ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_ylabel('Average peak value [$\mu$m]')
ax1.set_xlabel('Data points (Sampled Test data)')
ax2.set_xlabel('Data points (Subset of Test data)')
#ig.legend(fontsize=18)
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\resnet_reg.png',bbox_inches='tight',dpi=1000)
plt.show()

In [None]:
# metrics

mae = metrics.mean_absolute_error(y_test, yhat)
rmse = metrics.mean_squared_error(y_test, yhat, squared = False)


print("Mean absolute error: ", mae)
print("Root mean square error: ", rmse)

In [None]:
# statistical features of predicted values

df = pd.DataFrame(yhat)
df.describe()

In [None]:
# plots

yhat = yhat.reshape(6676)
df = pd.DataFrame(y_test, columns=['True'])
df['Predicted'] = yhat.tolist()
df = df.sort_values('True')

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax2.plot(df['True'], abs(df['Predicted']-df['True']), label = 'Error = |Predicted Value - True Value|')

ax1.plot(yhat, 'rx',label = 'Predicted value')
ax1.plot(y_test,'bx',label = 'True value', markerfacecolor='none')

ax2.set_ylabel('Absolute Error Values [$\mu$m]')
ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_xlabel('True Values (Test Data)')
ax1.set_xlabel('Data points (Test data)')
#fig.legend()
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\resnet_reg_error.png',bbox_inches='tight',dpi=1000)
plt.show()

# XGBoost regressor

In [None]:
load_data = pd.read_csv(r'M:\THESIS_IPT\MRIDUL\data_overlapping_windows\points1000_20ms\Combined_final_ALL_noOutliers.csv')
load_data['avg_peak'] = 1000*load_data['avg_peak']

In [None]:
# dropping some zero values to balance the dataset
zero_index = []
last = load_data.columns.get_loc("avg_peak")
for i in range(load_data.shape[0]):
    if load_data.iloc[i,last]==0.1:
        zero_index.append(i)
        
rand_zero_index = random.sample(zero_index,12000)
load_data.drop(load_data.index[rand_zero_index], axis=0, inplace=True)

In [None]:
Y_nn = load_data['avg_peak']
X_nn = load_data.drop(['avg_peak'], axis=1)

# dividing data into training and test sets
Y_nn = np.ravel(Y_nn)
X_TrainVal, X_test, Y_TrainVal, Y_test = train_test_split(X_nn, Y_nn, 
                                                    test_size = 0.1, 
                                                    random_state = 3,
                                                    shuffle = True)

X_train, X_val, Y_train, Y_val = train_test_split(X_TrainVal, Y_TrainVal, 
                                                    test_size = 0.3, 
                                                    random_state = 3,
                                                    shuffle = True)

In [None]:
# scaling the input to neural network
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
X_val = scaler.transform(X_val)

Y_train = Y_train.reshape(-1, 1)
Y_val = Y_val.reshape(-1, 1)
scaler2 = StandardScaler().fit(Y_train)
Y_train = scaler2.transform(Y_train)
Y_val = scaler2.transform(Y_val)

# scaling the input to neural network
scaler3 = Normalizer().fit(X_train)
X_train = scaler3.transform(X_train)
X_test = scaler3.transform(X_test)
X_val = scaler3.transform(X_val)

scaler4 = MinMaxScaler().fit(Y_train)
Y_train = scaler4.transform(Y_train)
Y_val = scaler4.transform(Y_val)

In [None]:
# XGBoost regressor
model = XGBRegressor()

start_time = timeit.default_timer()
model.fit(X_train,Y_train, eval_set=[(X_val, Y_val)])
end_time = timeit.default_timer()

In [None]:
print(end_time - start_time)

In [None]:
# model predictions

X_test_prediction = model.predict(X_test)
X_test_prediction = X_test_prediction.reshape(-1, 1)
Y_prediction = scaler4.inverse_transform(X_test_prediction)
Y_prediction = scaler2.inverse_transform(Y_prediction)
Y_prediction[Y_prediction < 0] = 0.0001

In [None]:
# performance plots

xlim = Y_test.shape[0]
xaxis = np.arange(0,xlim,25)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax1.plot(xaxis, Y_test[0:xlim:25],'bo-',label = 'Real data',markerfacecolor='none')
ax1.plot(xaxis, Y_prediction[0:xlim:25], 'rx--',label = 'Predicted data')
ax2.plot(Y_prediction[4400:4450], 'rx--')
ax2.plot(Y_test[4400:4450],'bo-',markerfacecolor='none')

ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_ylabel('Average peak value [$\mu$m]')
ax1.set_xlabel('Data points (Sampled Test data)')
ax2.set_xlabel('Data points (Subset of Test data)')
#ig.legend(fontsize=18)

#plt.savefig(r'M:\THESIS_IPT\REPORT\images\xgboost_reg.png',bbox_inches='tight',dpi=1000)

In [None]:
# metrics

mae = metrics.mean_absolute_error(Y_test, Y_prediction)
rmse = metrics.mean_squared_error(Y_test, Y_prediction, squared = False)

print("Mean absolute error: ", mae)
print("Root mean square error: ", rmse)

In [None]:
# statistical features of predicted values

df = pd.DataFrame(Y_prediction)
df.describe()

In [None]:
# plots

Y_prediction = Y_prediction.reshape(6676)
df = pd.DataFrame(Y_test, columns=['True'])
df['Predicted'] = Y_prediction.tolist()
df = df.sort_values('True')

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax2.plot(df['True'], abs(df['Predicted']-df['True']), label = 'Error = |Predicted Value - True Value|')

ax1.plot(Y_prediction, 'rx',label = 'Predicted value')
ax1.plot(Y_test,'bx',label = 'True value', markerfacecolor='none')

ax2.set_ylabel('Absolute Error Values [$\mu$m]')
ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_xlabel('True Values (Test Data)')
ax1.set_xlabel('Data points (Test data)')
#fig.legend()
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\xgboost_reg_error.png',bbox_inches='tight',dpi=1000)
plt.show()

# Random forest regressor

In [None]:
load_data = pd.read_csv(r'M:\THESIS_IPT\MRIDUL\data_overlapping_windows\points1000_20ms\Combined_final_ALL_noOutliers.csv')
load_data['avg_peak'] = 1000*load_data['avg_peak']

In [None]:
Y_nn = load_data['avg_peak']
X_nn = load_data.drop(['avg_peak'], axis=1)

# dividing data into training and test sets
Y_nn = np.ravel(Y_nn)
X_TrainVal, X_test, Y_TrainVal, Y_test = train_test_split(X_nn, Y_nn, 
                                                    test_size = 0.1, 
                                                    random_state = 3,
                                                    shuffle = True)

X_train, X_val, Y_train, Y_val = train_test_split(X_TrainVal, Y_TrainVal, 
                                                    test_size = 0.3, 
                                                    random_state = 3,
                                                    shuffle = True)

In [None]:
# scaling the input to neural network
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
X_val = scaler.transform(X_val)

Y_train = Y_train.reshape(-1, 1)
Y_val = Y_val.reshape(-1, 1)
scaler2 = StandardScaler().fit(Y_train)
Y_train = scaler2.transform(Y_train)
Y_val = scaler2.transform(Y_val)

# scaling the input to neural network
scaler3 = Normalizer().fit(X_train)
X_train = scaler3.transform(X_train)
X_test = scaler3.transform(X_test)
X_val = scaler3.transform(X_val)

scaler4 = MinMaxScaler().fit(Y_train)
Y_train = scaler4.transform(Y_train)
Y_val = scaler4.transform(Y_val)

In [None]:
# random forest regressor

model = RandomForestRegressor()

start_time = timeit.default_timer()
model.fit(X_train, Y_train)
end_time = timeit.default_timer()

In [None]:
print(end_time - start_time)

In [None]:
# model predictions

X_test_prediction = model.predict(X_test)
X_test_prediction = X_test_prediction.reshape(-1, 1)
Y_prediction = scaler4.inverse_transform(X_test_prediction)
Y_prediction = scaler2.inverse_transform(Y_prediction)
Y_prediction[Y_prediction < 0] = 0.0001

In [None]:
# performance plots

xlim = 6686
xaxis = np.arange(0,xlim,25)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax1.plot(xaxis, Y_test[0:xlim:25],'bo-',label = 'Real data',markerfacecolor='none')
ax1.plot(xaxis, Y_prediction[0:xlim:25], 'rx--',label = 'Predicted data')
ax2.plot(Y_prediction[200:250], 'rx--')
ax2.plot(Y_test[200:250],'bo-',markerfacecolor='none')

ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_ylabel('Average peak value [$\mu$m]')
ax1.set_xlabel('Data points (Sampled Test data)')
ax2.set_xlabel('Data points (Subset of Test data)')
#ig.legend(fontsize=18)

#plt.savefig(r'M:\THESIS_IPT\REPORT\images\rf_reg.png',bbox_inches='tight',dpi=1000)

In [None]:
# metrics

mae = metrics.mean_absolute_error(Y_test, Y_prediction)
rmse = metrics.mean_squared_error(Y_test, Y_prediction, squared = False)

print("Mean absolute error: ", mae)
print("Root mean square error: ", rmse)

In [None]:
# statistical features of predicted values

df = pd.DataFrame(Y_prediction)
df.describe()

In [None]:
# plots

Y_prediction = Y_prediction.reshape(7876)
df = pd.DataFrame(Y_test, columns=['True'])
df['Predicted'] = Y_prediction.tolist()
df = df.sort_values('True')

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16,5))
ax2.plot(df['True'], abs(df['Predicted']-df['True']), label = 'Error = |Predicted Value - True Value|')

ax1.plot(Y_prediction, 'rx',label = 'Predicted value')
ax1.plot(Y_test,'bx',label = 'True value', markerfacecolor='none')

ax2.set_ylabel('Absolute Error Values [$\mu$m]')
ax1.set_ylabel('Average peak value [$\mu$m]')
ax2.set_xlabel('True Values (Test Data)')
ax1.set_xlabel('Data points (Test data)')
#fig.legend()
#plt.savefig(r'M:\THESIS_IPT\REPORT\images\rf_reg_error.png',bbox_inches='tight',dpi=1000)
plt.show()