In [1]:
from tcn import TCN, tcn_full_summary
import pandas as pd
import numpy  as np
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

In [2]:
train_data = pd.read_csv('../Implementation/CSVFiles/2022-09-26-18-56-30-joint_states.csv')
train_data['.position'] = train_data['.position'].str.strip('()')
train_data['.velocity'] = train_data['.velocity'].str.strip('()')
train_data['.effort'] = train_data['.effort'].str.strip('()')
time_stamp = train_data[['.header.stamp.nsecs']]
position = train_data['.position'].str.split(',', expand=True)
velocity = train_data['.velocity'].str.split(',', expand=True)
effort = train_data['.effort'].str.split(',', expand=True)
features = pd.concat([position, velocity, effort], axis=1)

In [3]:
x = features.values #returns a numpy array
min_max_scaler = preprocessing.MinMaxScaler()
x_scaled = min_max_scaler.fit_transform(x)
features = pd.DataFrame(x_scaled)

In [4]:
position.shape

(3750, 12)

In [5]:
velocity.shape

(3750, 12)

In [6]:
effort.shape

(3750, 12)

In [7]:
features.shape

(3750, 36)

In [8]:
groundTruth = pd.read_csv("../Implementation/CSVFiles/2022-09-26-18-56-30-ground_truth-state.csv", sep =',', nrows = 3750)
Xtargets = groundTruth[['.pose.pose.position.x']]
Ytargets = groundTruth[['.pose.pose.position.y']]

In [9]:
#Normalizing x targets between 0 and 1
x = Xtargets.values #returns a numpy array
min_max_scaler = preprocessing.MinMaxScaler()
x_scaled = min_max_scaler.fit_transform(x)
Xtargets = pd.DataFrame(x_scaled)

In [10]:
#Normalizing y targets between 0 and 1
x = Ytargets.values #returns a numpy array
min_max_scaler = preprocessing.MinMaxScaler()
x_scaled = min_max_scaler.fit_transform(x)
Ytargets = pd.DataFrame(x_scaled)

In [11]:
Xtargets = np.asarray(Xtargets).astype(np.float64)
Ytargets = np.asarray(Ytargets).astype(np.float64)
features = np.asarray(features).astype(np.float64)

In [12]:
from sklearn.preprocessing import RobustScaler
RS = RobustScaler()
features = RS.fit_transform(features)

In [13]:
n_features = features.shape[-1]

features = features.reshape(-1, 1, n_features)

n_epochs = 5000
n_splits = 2

In [None]:
from tcn import TCN, tcn_full_summary
kf = KFold(n_splits=n_splits, shuffle=False)
test_preds_X = []

for fold, (train_idx, test_idx) in enumerate(kf.split(features, Xtargets)):
    print('-'*15, '>', f'Fold {fold+1}', '<', '-'*15)
    X_train, X_valid = features[train_idx], features[test_idx]
    y_train, y_valid = Xtargets[train_idx], Xtargets[test_idx]
    
    scheduler = tf.keras.optimizers.schedules.ExponentialDecay(1e-3, 200*((len(features)*0.8)/1024), 1e-5)
    
    Xmodel = keras.models.Sequential([
        TCN(input_shape=(1, n_features), nb_filters=256, return_sequences=True, dilations=[1, 2, 4, 8, 16, 32]),
        keras.layers.Dense(72, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(144, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(72, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(36, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(18, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(9, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(1, activation='tanh')
    ])
    
    Xmodel.compile(optimizer="adam", loss="mae", metrics=keras.metrics.MeanAbsoluteError())
    
    history = Xmodel.fit(X_train, y_train,                                                 
                        validation_data=(X_valid, y_valid), 
                        epochs=n_epochs, 
                        batch_size=1024, 
                        callbacks=[tf.keras.callbacks.LearningRateScheduler(scheduler)],
                        verbose=0)
    
    #Xmodel.save(f'Fold{fold+1} weights')
    test_preds_X.append(Xmodel.predict(features).squeeze().reshape(-1, 1).squeeze())

In [None]:
X = test_preds_X[0] + test_preds_X[1]
error = mean_absolute_error(Xtargets, X)
print('MAE: %.3f' % error)

In [None]:
from tcn import TCN, tcn_full_summary
kf = KFold(n_splits=n_splits, shuffle=False)
test_preds_Y  = []

for fold, (train_idx, test_idx) in enumerate(kf.split(features, Ytargets)):
    print('-'*15, '>', f'Fold {fold+1}', '<', '-'*15)
    X_train, X_valid = features[train_idx], features[test_idx]
    y_train, y_valid = Ytargets[train_idx], Ytargets[test_idx]
    
    scheduler = tf.keras.optimizers.schedules.ExponentialDecay(1e-3, 200*((len(features)*0.8)/1024), 1e-5)
    
    Ymodel = keras.models.Sequential([
        TCN(input_shape=(1, n_features), nb_filters=256, return_sequences=True, dilations=[1, 2, 4, 8, 16, 32]),
        keras.layers.Dense(72, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(144, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(72, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(36, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(18, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(9, activation='relu', kernel_initializer='he_uniform'),
        keras.layers.Dense(1, activation='tanh')
    ])
    
    Ymodel.compile(optimizer="adam", loss="mae",
                  metrics=keras.metrics.MeanAbsoluteError())
    
    history = Ymodel.fit(X_train, y_train, 
                        
                        
                        validation_data=(X_valid, y_valid), 
                        epochs=n_epochs, 
                        batch_size=1024, 
                        callbacks=[tf.keras.callbacks.LearningRateScheduler(scheduler)],
                        verbose=0)
    
    #Ymodel.save(f'Fold{fold+1} weights')
    test_preds_Y.append(Ymodel.predict(features).squeeze().reshape(-1, 1).squeeze())

In [None]:
Y = test_preds_Y[0] + test_preds_Y[1]
error = mean_absolute_error(Ytargets, Y)
print('MAE: %.3f' % error)

In [None]:
#Reading Odometry and scaling
odom_data  = pd.read_csv('../Implementation/CSVFiles/2022-09-26-18-56-30-odom.csv')
odom_data.drop(['.header.seq', '.header.frame_id', '.child_frame_id', 'time', '.pose.covariance', '.twist.covariance'], axis=1, inplace=True)
dataset_odom = odom_data.values
min_max_scaler = preprocessing.MinMaxScaler()
x_scaled = min_max_scaler.fit_transform(dataset_odom)
dataset_odom = pd.DataFrame(x_scaled)

In [None]:
# generating two arrays for x and y coordinates
odom = dataset_odom.values
x_odom, y_odom = odom[:, [2]], odom[:, [3]]

In [None]:
plt.scatter(X, Y, color='red')
plt.scatter(Xtargets, Ytargets, color='blue')
plt.scatter(x_odom, y_odom, color='orange')
plt.show()