In [13]:
import numpy as np
import cv2
from sklearn.neural_network import MLPRegressor
from matplotlib import pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
%matplotlib tk
import matplotlib as mpl

### Read output angle data (test folder 24, 30, 31, 32, 33, 35 from Preliminary result v5)

### Reducing the dimension of images

In [4]:
# read video frame
def dep_cam_reduced(dep_v_name):
    dep_v = cv2.VideoCapture(dep_v_name)
    ret, frame = dep_v.read()
    counter=0

    frame_count = int(dep_v.get(cv2.CAP_PROP_FRAME_COUNT))

#     frame_height = int(frame.shape[0]/5)
#     frame_width = int(frame.shape[1]/5)
#     frame_height = frame.shape[0]
#     frame_width = frame.shape[1]
    frame_height = 30
    frame_width = 40
    depth_frames = np.empty((frame_count, frame_height, frame_width))

    while(dep_v.isOpened()):
        ret, frame = dep_v.read()
        if ret == False:
            break
        if ret == True:
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            gray_frame = cv2.resize(gray_frame, (frame_width, frame_height), interpolation = cv2.INTER_AREA)
            depth_frames[counter] = gray_frame
            counter+=1

    dep_v.release()
    cv2.destroyAllWindows()
    return depth_frames

In [5]:
# read output depth images data
foldername = r'Test_Subject_Leo/'
test_folder = ['test24', 'test30', 'test31', 'test32','test33', 'test35' ]
test_num = ['24', '30', '31', '32', '33', '35']
sub_name = foldername + test_folder[0]
dep_name = sub_name + r'/depth_processed_leo_test' + test_num[0] + '.avi';
depth =  dep_cam_reduced(dep_name)
for i in range(1, 6):
    subfolder_name = foldername + test_folder[i]
    dep_video_name = subfolder_name + r'/depth_processed_leo_test' + test_num[i] + '.avi'
    #print(dep_video_name)
    dep_v_temp =  dep_cam_reduced(dep_video_name)/255.0
    depth = np.concatenate((depth, dep_v_temp))

In [None]:
print(depth.shape)
print(theta_z.shape)

In [None]:
# Read force data
subfolder_name = foldername + test_folder[0]
force_file = subfolder_name + r'/fcss_processed_leo_test' + test_num[0] + '_' + '11_15_2020' + '.txt'
force = pd.read_csv(force_file)
force = force.iloc[:,:].values
for i in range(1,6):
    subfolder_name = foldername + test_folder[i]
    if test_folder[i] == 'test30' or test_folder[i] == 'test31':
        force_file = subfolder_name + r'/fcss_processed_leo_test' + test_num[i] + '_' + '11_24_2020' + '.txt'
    else: 
        force_file = subfolder_name + r'/fcss_processed_leo_test' + test_num[i] + '_' + '11_25_2020' + '.txt'
    dataset_y =  pd.read_csv(force_file)
    force_temp = dataset_y.iloc[:,:].values
    force = np.concatenate((force, force_temp))

### Comments:

###### Observation: Comparing to the NN that uses tanh and sigmoid activation function, the rectified linear unit fucntion seemed to have better performance (lower mae) when the layers number are the same. Also, the choice of metrics of model evaluation seemed to be important (mse is quite large compared to mae). But I wonder what happened to my plots ;(     

###### Also, right now I am using a dimension of 120 * 160, performing a dimensionality reduction could be helpful for extracting necessary features. A longer epoch time may also improve performance. Tried sklearn neural network MLPRegressor but does not work

In [24]:
depth_train, depth_test, force_train, force_test, theta_z_train, theta_z_test = train_test_split(depth, force, theta_z, test_size = 0.2, random_state = 0)

In [42]:
# dropout at both visible layer and hidden layer

import tensorflow as tf
callback = tf.keras.callbacks.EarlyStopping(monitor = 'loss', patience = 10, restore_best_weights = True)
model_dp = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(30, 40)),
    tf.keras.layers.Dense(units=128, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(units=32, activation='relu'),
    tf.keras.layers.Dense(1, activation='linear')
])


model_dp.compile(optimizer='adam', 
              loss= 'mae',
              metrics=['mae'])

history = model_dp.fit(depth_train, theta_z_train, epochs=300,callbacks = [callback], 
                    batch_size = 128, validation_data = (depth_test, theta_z_test))

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300


In [26]:
# Get training and test loss histories
training_loss = history.history['loss']
test_loss = history.history['val_loss']
mpl.rcParams['figure.figsize'] = (20,5)
# Create count of the number of epochs
epoch_count = range(len(training_loss))

# Visualize loss history
mpl.rcParams['figure.figsize'] = (20,5)
plt.plot(epoch_count, training_loss, 'r--')
plt.plot(epoch_count, test_loss, 'b-')
plt.legend(['Training Loss', 'Test Loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [43]:
y_prediction = model_dp.predict(depth_test)


mpl.rcParams['figure.figsize'] = (20,5)
plt.plot(theta_z_test)
plt.plot(y_prediction,'--')
plt.legend(['Ground Truth', 'Prediction'])
plt.xlabel('Sample')
plt.ylabel('Angle (degree)')


Text(0, 0.5, 'Angle (degree)')

In [44]:
from sklearn.metrics import mean_absolute_error
mean_absolute_error(theta_z_test, y_prediction)

6.741460756272304

### Mulitple inputs in neural network

In [18]:
from sklearn.model_selection import train_test_split
depth_train, depth_test, force_train, force_test, theta_z_train, theta_z_test = train_test_split(depth, force, theta_z, test_size = 0.2, random_state = 0)

In [48]:
from tensorflow.keras import layers
input_depth = layers.Input(shape = (30, 40))
input_force = layers.Input(shape = (6, ))

# first branch
x = layers.Flatten(input_shape=(30, 40))(input_depth)
x = layers.Dense(units=128, activation='relu')(x)
x = layers.Dense(units=64, activation='relu')(x)
x = layers.Dropout(0.2)(x)
x = layers.Dense(32, activation='relu')(x)
x = tf.keras.Model(inputs= input_depth, outputs = x)

# second branch
y = layers.Dense(units=128, activation='relu')(input_force)
y = layers.Dense(units=64, activation='relu')(y)
y = layers.Dense(32, activation='relu')(y)
y = layers.Dropout(0.2)(y)
y = tf.keras.Model(inputs= input_force, outputs = y)

# combine two branches
combined = layers.concatenate([x.output, y.output])

# regression to one output
z = layers.Dense(units=32, activation='relu')(combined)
z = layers.Dropout(0.2)(z)
z = layers.Dense(units=16, activation='relu')(combined)
z = layers.Dense(1, activation='linear')(z)
model_fd = tf.keras.Model(inputs = [x.input, y.input], outputs = z)

#tf.keras.utils.plot_model(model, to_file = 'model.png')

In [49]:
model_fd.compile(optimizer='adam', loss= 'mae', metrics=['mae'])

history = model_fd.fit([depth_train, force_train],theta_z_train, verbose = 1, epochs = 300, batch_size = 128,
                   validation_data = ([depth_test, force_test], theta_z_test))

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
Epoch 73/300
Epoch 74/300
Epoch 75/300
Epoch 76/300
Epoch 77/300
Epoch 78

In [53]:
# Get training and test loss histories
training_loss = history.history['loss']
test_loss = history.history['val_loss']

# Create count of the number of epochs
epoch_count = range(len(training_loss))

# Visualize loss history
plt.plot(epoch_count, training_loss, 'r--')
plt.plot(epoch_count, test_loss, 'b-')
plt.legend(['Training Loss', 'Test Loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [88]:
y_prediction = model_fd.predict([depth_test, force_test])

import matplotlib as mpl
mpl.rcParams['figure.figsize'] = (10,7)
mpl.rc('xtick', labelsize=18) 
mpl.rc('ytick', labelsize=18)
plt.plot(theta_z_test)
plt.plot(y_prediction,'--')
plt.xlim([4900, 5200])
plt.ylim([-110, 110])
plt.legend(['Ground Truth', 'Prediction'])
plt.xlabel('Sample', fontsize=18)
plt.ylabel('Angle (degree)', fontsize=18)
plt.show()
plt.savefig('ground_v_prediction_zoomed in.png', dpi = 300)

In [81]:
mpl.rcParams['figure.figsize'] = (15,10)
ax1 = plt.subplot(211)
ax1.plot(theta_z_test)
ax1.plot(y_prediction,'--')
plt.legend(['Ground Truth', 'Prediction'])
plt.ylabel('Angle (degree)')

ax2 = plt.subplot(212)
ax2.plot(theta_z_test)
ax2.plot(y_prediction,'--')
ax2.set_xlim([4900, 5200])
plt.legend(['Ground Truth', 'Prediction'])
plt.xlabel('Sample')
plt.ylabel('Angle (degree)')
plt.show()
plt.savefig('ground_v_prediction.png', dpi = 300)

In [52]:
mean_absolute_error(theta_z_test, y_prediction)

3.104436460064926

In [None]:
plt.scatter(y_prediction, theta_z_test)
plt.xlabel('Prediction angle')
plt.ylabel('Ground angle')
plt.show()

### Feed only force data to the NN

In [39]:
import tensorflow as tf
from keras.utils.vis_utils import plot_model
callback = tf.keras.callbacks.EarlyStopping(monitor = 'loss', patience = 10, restore_best_weights = True)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(units=128, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(units=32, activation='relu'),
    tf.keras.layers.Dense(1, activation='linear')
])

# plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)
model.compile(optimizer='adam', 
              loss= 'mae',
              metrics=['mae'])

history = model.fit(force_train, theta_z_train, epochs=300,callbacks = [callback], 
                    batch_size = 128, validation_data = (force_test, theta_z_test))

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
Epoch 73/300
Epoch 74/300
Epoch 75/300
Epoch 76/300
Epoch 77/300
Epoch 78

In [30]:
# Get training and test loss histories
training_loss = history.history['loss']
test_loss = history.history['val_loss']

# Create count of the number of epochs
epoch_count = range(len(training_loss))

# Visualize loss history
plt.plot(epoch_count, training_loss, 'r--')
plt.plot(epoch_count, test_loss, 'b-')
plt.legend(['Training Loss', 'Test Loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [45]:
y_prediction = model.predict(force_test)


mpl.rcParams['figure.figsize'] = (20,5)
plt.plot(theta_z_test)
plt.plot(y_prediction,'--')
plt.legend(['Ground Truth', 'Prediction'])
plt.xlabel('Sample')
plt.ylabel('Angle (degree)')

Text(0, 0.5, 'Angle (degree)')

In [46]:
mean_absolute_error(theta_z_test, y_prediction)

6.913944966483415