# Load Dataset

In [27]:
import numpy as np
import pandas as pd
from torch.utils.data import Dataset
import torch
from torchvision import transforms
class create_tensor_dataset_localization(Dataset):
    def __init__(self, path = './dataset/realData/contact_detection_train.csv', transform = transforms.Compose([transforms.ToTensor()]), num_classes =5,
                 num_features_dataset = 14, num_features_lstm = 2, data_seq = 28, desired_seq = 28, localization= False, collision =False, dof = 7):
        self.path = path
        self.transform = transform
        self.num_features_dataset = num_features_dataset
        self.num_features_lstm = num_features_lstm
        self.data_seq = data_seq
        self.desired_seq = desired_seq
        self.dof = dof
        self.num_classes = num_classes
        self.localization = localization
        self.collision = collision
        if collision and localization:
            print('collision and localization cannot be true at the same time!')
            exit()
            

        self.read_dataset()
        self.data_in_seq()
        
    def __len__(self):
        return len(self.data_target)


    def __getitem__(self, idx: int):

        data_sample = torch.tensor(self.data_input.iloc[idx].values)
        data_sample = torch.reshape(data_sample, (self.dof ,self.num_features_lstm*self.desired_seq))

        target = self.data_target.iloc[idx]

        return data_sample, target


    def read_dataset(self):
        
        # laod data from csv file
        if self.path[(len(self.path)-3): len(self.path)] == 'csv':
            data = pd.read_csv(self.path)
        elif self.path[(len(self.path)-3): len(self.path)] == 'pkl':
            data = pd.read_pickle(self.path)
        # specifying target and data
        data_input = data.iloc[:,1:data.shape[1]]
        data_target = data.iloc[:,0]

        if not self.localization:
            data_target.loc[data_target.iloc[:]!=0] = 1

        if self.localization or self.collision:
            data_input = data_input.loc[data_target.iloc[:]!=0, :]
            data_target = data_target.loc[data_target.iloc[:]!=0]
            data_target = data_target-1


        self.data_input = data_input.reset_index(drop=True)
        self.data_target = data_target.reset_index(drop=True)
        

    def data_in_seq(self):

        dof = self.dof

        # resorting item position
        data = np.array( range(0, self.num_features_dataset * self.data_seq ))
        data = data.reshape(self.data_seq, self.num_features_dataset)

        joint_data_pos = []
        for j in range(dof):
                 
            column_index = np.array(range(self.num_features_lstm))*dof +j
            row_index= range(self.data_seq-self.desired_seq, self.data_seq)
            join_data_matrix = data[:, column_index]
            joint_data_pos.append(join_data_matrix.reshape((len(column_index)*len(row_index))))
        
        joint_data_pos = np.hstack(joint_data_pos)

        # resorting (28,28)---> (4,28)(4,28)(4,28)(4,28)(4,28)(4,28)(4,28)

        self.data_input.columns = range(self.num_features_dataset * self.data_seq)
        self.data_input = self.data_input.loc[:][joint_data_pos]


# LSTM model

In [2]:
import argparse
import os
import time
import torch
import numpy as np
import random
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
from torchmetrics import ConfusionMatrix, Accuracy

class ZScoreNormalization(nn.Module):
    def __init__(self):
        super(ZScoreNormalization, self).__init__()

    def forward(self, x):
        mean = torch.mean(x, dim=1, keepdim=True)
        std = torch.std(x, dim=1, keepdim=True)
        return (x - mean) / (std + 1e-6)  # Adding epsilon to avoid division by zero

class MinMaxNormalization(nn.Module):
    def __init__(self):
        super(MinMaxNormalization, self).__init__()

    def forward(self, x):
        min_val = torch.min(x, dim=1, keepdim=True)[0]
        max_val = torch.max(x, dim=1, keepdim=True)[0]
        return (x - min_val) / (max_val - min_val + 1e-6)  # Min-Max normalization with epsilon

class Sequence(nn.Module):
    def __init__(self, num_features_lstm=1, hidden_size=32, num_layers=3, time_seq=28, dropout=0.5, bidirectional=False):
        super(Sequence, self).__init__()
        self.normalization = ZScoreNormalization()
        # Define the LSTM layer
        self.lstm = nn.LSTM(
            input_size=num_features_lstm * time_seq,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0  # Apply dropout only if more than one layer
        )
        # Fully connected layer (to predict contact for each joint step)
        if bidirectional:
            self.fc = nn.Linear(hidden_size*2, 1)  
        else:
            self.fc = nn.Linear(hidden_size, 1) 
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, input):
        # Pass the input through the LSTM layer
        normalized_input = self.normalization(input)
        lstm_out, _ = self.lstm(normalized_input)
        
        # Pass each time step's output through the fully connected layer and sigmoid activation
        # lstm_out shape: (batch_size, joint_seq, hidden_size)
        joint_step_outputs = self.fc(lstm_out)  # Shape: (batch_size, joint_seq, 1)
        
        # Apply sigmoid to get probabilities
        #joint_step_outputs = self.sigmoid(joint_step_outputs)  # Shape: (batch_size, joint_seq, 1)
        
        return joint_step_outputs.squeeze()  # Return the prediction for each joint step (batch_size, joint_seq)


def get_output(data_loader, model, device):
    model.eval()
    labels_pred = []
    labels_true = []

    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            output = model(X_batch) # Return the prediction for each joint step (batch_size, joint_seq)

            preds = output.argmax(axis=1)  # shape: [batch_size]

            labels_pred.append(preds.cpu().numpy())
            labels_true.append(y_batch.cpu().numpy())

    # Convert lists to arrays
    labels_pred = np.concatenate(labels_pred, axis=0)
    labels_true = np.concatenate(labels_true, axis=0)

    # Convert numpy arrays back to tensors
    labels_pred = torch.tensor(labels_pred, dtype=torch.int64)  # Ensure long tensor for labels
    labels_true = torch.tensor(labels_true, dtype=torch.int64)  # Ensure long tensor for labels

    return labels_pred, labels_true



# training

In [3]:
# Path to save trained models
main_path = os.getcwd().replace('AIModels','')
path_name = main_path + 'AIModels/trainedModels/'

# Create directory if it does not exist
if not os.path.exists(path_name):
    os.makedirs(path_name)

# Model configuration
num_features_lstm = 1
num_features_dataset =7
train_all_data = False  # Train a model using all available data

collision = False
localization = True

batch_size = 4096
num_classes = 7

torch.manual_seed(2020)
np.random.seed(2020)
random.seed(2020)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

if device.type == "cuda":
    print("Using GPU:", torch.cuda.get_device_name())

# Load data and create training and testing sets
training_data = create_tensor_dataset_localization(
    main_path + '/dataset/localization/4dataset_train.pkl',
    num_features_dataset=num_features_dataset, num_features_lstm=num_features_lstm, num_classes=num_classes,
    collision=collision, localization=localization
)
testing_data = create_tensor_dataset_localization(
    main_path + '/dataset/localization/4dataset_test.pkl',
    num_features_dataset=num_features_dataset, num_features_lstm=num_features_lstm, num_classes=num_classes,
    collision=collision, localization=localization
)

train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(testing_data, batch_size=batch_size, shuffle=True)

train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

# Build the model
model = Sequence(num_features_lstm=num_features_lstm, num_layers=1, hidden_size=64, dropout=0.1, bidirectional=True)
model = model.double().to(device)  # Move model to device

lr = 0.1
# Use Adam optimizer and CrossEntropyLoss as the loss function
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()
# Initialize learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=1, verbose=True)


Using GPU: Quadro T2000
Feature batch shape: torch.Size([4096, 7, 28])
Labels batch shape: torch.Size([4096])


In [4]:
n_epochs = 60
lr_threshold = 0.0005
model.train()

# Training loop
for epoch in range(n_epochs):
    running_loss = []
    for X_batch, y_batch in train_dataloader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch.long())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss.append(loss.cpu().detach().numpy())
    
    if train_all_data:
        for X_batch, y_batch in test_dataloader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = loss_fn(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            running_loss.append(loss.cpu().detach().numpy())
    
    avg_loss = np.mean(running_loss)
    print(f"Epoch: {epoch + 1}/{n_epochs} - learning rate: {optimizer.param_groups[0]['lr']:.5f}, classification loss: {avg_loss:.4f}")

    # Update the scheduler with the average loss
    scheduler.step(avg_loss)
    current_lr = optimizer.param_groups[0]['lr']
    if current_lr < lr_threshold:
        print(f"Learning rate has dropped below the threshold of {lr_threshold}. Stopping training.")
        break
# Validation
model.eval()

with torch.no_grad():
    confusionMatrix = ConfusionMatrix(task="multiclass", num_classes=num_classes)
    accuracy_metric = Accuracy()

    # Update the metric with predictions and true labels
    
    y_pred, y_test = get_output(test_dataloader, model, device)
    print("On the test set:\n", confusionMatrix(y_pred,y_test))
    accuracy_metric.update(y_pred, y_test)
    print("Accuray on the test set:\n", accuracy_metric.compute())
    

    #y_pred, y_train = get_output(training_data, model)
    #print("On the train set:\n", confusionMatrix(y_train, y_pred))

testing_data = create_tensor_dataset_localization( main_path + '/dataset/localization/0dataset_test.pkl',
    num_features_dataset=7, num_features_lstm=1, num_classes=7,
    collision=0, localization=1)
    
test_dataloader2 = DataLoader(testing_data, batch_size=batch_size, shuffle=True)
model.eval()
with torch.no_grad():
    confusionMatrix = ConfusionMatrix(task="multiclass", num_classes=7)
    accuracy_metric = Accuracy()

    # Update the metric with predictions and true labels
    
    y_pred, y_test = get_output(test_dataloader2, model, device)
    print("On the test set:\n", confusionMatrix(y_pred,y_test))
    accuracy_metric.update(y_pred, y_test)
    print("Accuray on the test set:\n", accuracy_metric.compute())


Epoch: 1/60 - learning rate: 0.10000, classification loss: 2.0551
Epoch: 2/60 - learning rate: 0.10000, classification loss: 1.6031
Epoch: 3/60 - learning rate: 0.10000, classification loss: 1.3820
Epoch: 4/60 - learning rate: 0.10000, classification loss: 1.1751
Epoch: 5/60 - learning rate: 0.10000, classification loss: 1.0030
Epoch: 6/60 - learning rate: 0.10000, classification loss: 0.8667
Epoch: 7/60 - learning rate: 0.10000, classification loss: 0.7675
Epoch: 8/60 - learning rate: 0.10000, classification loss: 0.6916
Epoch: 9/60 - learning rate: 0.10000, classification loss: 0.6350
Epoch: 10/60 - learning rate: 0.10000, classification loss: 0.5836
Epoch: 11/60 - learning rate: 0.10000, classification loss: 0.5492
Epoch: 12/60 - learning rate: 0.10000, classification loss: 0.5194
Epoch: 13/60 - learning rate: 0.10000, classification loss: 0.4929
Epoch: 14/60 - learning rate: 0.10000, classification loss: 0.4595
Epoch: 15/60 - learning rate: 0.10000, classification loss: 0.4340
Epoc

In [None]:
lr=0.05
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)

# saving and loading models

In [110]:
torch.save({"model_state_dict": model.state_dict()},'model_9344_110epoches.pth')

In [25]:
# test on source robot

testing_data = create_tensor_dataset_localization( main_path + '/dataset/test_dataset_source_robot/dataset_test.pkl',
    num_features_dataset=7, num_features_lstm=1, num_classes=7,
    collision=0, localization=1)
    
test_dataloader2 = DataLoader(testing_data, batch_size=batch_size, shuffle=True)
model.eval()
with torch.no_grad():
    confusionMatrix = ConfusionMatrix(task="multiclass", num_classes=7)
    accuracy_metric = Accuracy()

    # Update the metric with predictions and true labels
    
    y_pred, y_test = get_output(test_dataloader2, model, device)
    print("On the test set:\n", confusionMatrix(y_pred,y_test))
    accuracy_metric.update(y_pred, y_test)
    print("Accuray on the test set:\n", accuracy_metric.compute())


On the test set:
 tensor([[  0,   0,   0,   0,   0,   0,   0],
        [  9, 116,   2,   4,   1,   3,   0],
        [  4,  21, 137,   4,   5,   9,   0],
        [  0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0]])
Accuray on the test set:
 tensor(0.8032)


# testing on second franka robot

In [21]:
data_path = main_path + 'dataset/test_dataset_target_robot/dataset_test.pkl'


testing_data = create_tensor_dataset_localization( data_path,
    num_features_dataset=num_features_dataset, num_features_lstm=1, num_classes=num_classes,
    collision=0, localization=1)
    
test_dataloader2 = DataLoader(testing_data, batch_size=batch_size, shuffle=True)

with torch.no_grad():
    confusionMatrix = ConfusionMatrix(task="multiclass", num_classes=num_classes)
    accuracy_metric = Accuracy()

    # Update the metric with predictions and true labels
    
    y_pred, y_test = get_output(test_dataloader2, model, device)
    print("On the test set:\n", confusionMatrix(y_pred,y_test))
    accuracy_metric.update(y_pred, y_test)
    print("Accuray on the test set:\n", accuracy_metric.compute())


On the test set:
 tensor([[  0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0],
        [  1,   0,   0,  15,  14, 140, 335],
        [  0,   0,   4,   6,   3, 240,  87],
        [  0,   0,   0,   0,   0,   0,   0]])
Accuray on the test set:
 tensor(0.3006)


# testing on UR10 robot

In [28]:
data_path = main_path + 'dataset/test_dataset_target_robot_ur10/dataset_test.pkl'


testing_data = create_tensor_dataset_localization( data_path,
    num_features_dataset=6, num_features_lstm=1, num_classes=6, dof = 6,
    collision=0, localization=1)
    
test_dataloader2 = DataLoader(testing_data, batch_size=batch_size, shuffle=True)

with torch.no_grad():
    confusionMatrix = ConfusionMatrix(task="multiclass", num_classes=6)
    accuracy_metric = Accuracy()

    # Update the metric with predictions and true labels
    
    y_pred, y_test = get_output(test_dataloader2, model, device)
    print("On the test set:\n", confusionMatrix(y_pred,y_test))
    accuracy_metric.update(y_pred, y_test)
    print("Accuray on the test set:\n", accuracy_metric.compute())


On the test set:
 tensor([[  0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0],
        [ 12,  19, 187, 179,  98,  15],
        [  8,  21, 106, 179, 148,  43],
        [  7,  51,  59, 243, 126,  39],
        [  0,   0,   0,   0,   0,   0]])
Accuray on the test set:
 tensor(0.3195)


# test on UR5 robot

In [29]:
data_path = main_path + 'dataset/test_dataset_target_robot_ur5/dataset_test.pkl'


testing_data = create_tensor_dataset_localization( data_path,
    num_features_dataset=6, num_features_lstm=1, num_classes=6, dof = 6,
    collision=0, localization=1)
    
test_dataloader2 = DataLoader(testing_data, batch_size=batch_size, shuffle=True)

with torch.no_grad():
    confusionMatrix = ConfusionMatrix(task="multiclass", num_classes=6)
    accuracy_metric = Accuracy()

    # Update the metric with predictions and true labels
    
    y_pred, y_test = get_output(test_dataloader2, model, device)
    print("On the test set:\n", confusionMatrix(y_pred,y_test))
    accuracy_metric.update(y_pred, y_test)
    print("Accuray on the test set:\n", accuracy_metric.compute())


On the test set:
 tensor([[  0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0],
        [ 11,  45, 123, 129, 165, 142],
        [  3,  31,  70, 102, 203,  86],
        [  7,  20,  74, 187,  97, 130],
        [  0,   0,   0,   0,   0,   0]])
Accuray on the test set:
 tensor(0.1982)


# save the best model

In [None]:
# Save the trained model
named_tuple = time.localtime()
if input('Do you want to save the data in trained models? (y/n): ') == 'y':
    try:
        if collision:
            path_name_1 = path_name + '/collisionDetection/trainedModel' + str(time.strftime("_%m_%d_%Y_%H:%M:%S", named_tuple)) + '.pth'
            path_name_2 = path_name + '/collisionDetection/trainedModel.pth'
        elif localization:
            path_name_1 = path_name + '/localization/trainedModel' + str(time.strftime("_%m_%d_%Y_%H:%M:%S", named_tuple)) + '.pth'
            path_name_2 = path_name + '/localization/trainedModel.pth'
        elif num_classes == 2:
            path_name_1 = path_name + '/contactDetection/trainedModel' + str(time.strftime("_%m_%d_%Y_%H:%M:%S", named_tuple)) + '.pth'
            path_name_2 = path_name + '/contactDetection/trainedModel.pth'

        torch.save({"model_state_dict": model.state_dict(),
                    "optimizer_state_dict": optimizer.state_dict(),
                    "collision": collision, "localization": localization, "network_type": network_type,
                    "n_epochs": n_epochs, "batch_size": batch_size, "num_features_lstm": num_features_lstm,
                    "num_classes": num_classes, "lr": lr}, path_name_1)
        
        torch.save({"model_state_dict": model.state_dict(),
                    "optimizer_state_dict": optimizer.state_dict(),
                    "collision": collision, "localization": localization, "network_type": network_type,
                    "n_epochs": n_epochs, "batch_size": batch_size, "num_features_lstm": num_features_lstm,
                    "num_classes": num_classes, "lr": lr}, path_name_2)
        print('Model saved successfully!')
    except Exception as e:
        print(f"An error occurred while saving the model: {e}")

In [None]:
# Load Model
model_path= 'with_0.1_0.1_1_scheduler/model_92_50epoches.pth'

num_classes = 7
num_features_dataset=7
num_features_lstm=1
batch_size= 4096

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Build the model

model = Sequence(num_features_lstm=num_features_lstm, num_layers=1, hidden_size=64, dropout=0.1)
model.load_state_dict(torch.load(model_path)['model_state_dict'])
model = model.double().to(device)  # Move model to device

model.eval()

# Path to save trained models
model_path= 'model.pth'