In [1]:
# Imports
import torch
import torch.nn.functional as F  # Parameterless functions, like (some) activation functions
import torchvision.datasets as datasets  # Standard datasets
import torchvision.transforms as transforms  # Transformations we can perform on our dataset for augmentation
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset  # Gives easier dataset managment by creating mini batches etc.
from sklearn.model_selection import train_test_split

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, f1_score, recall_score, precision_score, confusion_matrix, ConfusionMatrixDisplay


import sys


import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, models, transforms
import time
import copy

In [2]:
#read the data 
#algorithm to read all the files

'''
for folder in this folder:
    read xelasensor1.csv
    read sliplabel.csv
    concat it in a single dataframe along axis = 0

print the dataframe
'''

directory = 'CNN-GradCAM/train2dof'
directory2 = '/Users/elijahnelson/Desktop/SIWES/IML/Tactile_IML/'

def read_file(detect_or_pred, n = None):

    #store all directories in a list
    list_xela_allfiles = []
    list_sliplabel_allfiles = []

    for root, subdirectories, files in os.walk(directory):
        for sdirectory in subdirectories:

            #subdirectory with absolute path
            subdirectory = '{}/{}'.format(root, sdirectory)

            #read specific files in the subdirectory
            for file in os.listdir(subdirectory):
            
                if file.endswith("sensor1.csv"):
                    df = pd.read_csv('{}/{}'.format(subdirectory, file), index_col=None, header=0)
                    
                    if detect_or_pred ==0:
                        list_xela_allfiles.append(df)
                    elif detect_or_pred ==1 and n is not None:
                        list_xela_allfiles.append(df[:-n])

                if file.endswith("label.csv"):
                    df = pd.read_csv('{}/{}'.format(subdirectory, file), index_col=None, header=0)
                    if detect_or_pred ==0:
                        list_sliplabel_allfiles.append(df)
                    elif detect_or_pred ==1 and n is not None: 
                        list_sliplabel_allfiles.append(df[n:])

    return list_xela_allfiles, list_sliplabel_allfiles

    #np.newaxis; np.zeros (3,4,4) -> 
                    


In [3]:
#concat the list of xela_allfiles and sliplabel_allfiles across axis = 0
n = 5
list_xela_allfiles, list_sliplabel_allfiles = read_file(0)

#for slip prediction, comment the line above and uncomment the line below
#list_xela_allfiles, list_sliplabel_allfiles = read_file(1, n)

pd_xela_allfiles = pd.concat(list_xela_allfiles, axis=0, ignore_index=True)
pd_sliplabel_allfiles = pd.concat(list_sliplabel_allfiles, axis=0, ignore_index=True)
pd_sliplabel_allfiles = pd_sliplabel_allfiles['slip']

#reshape the target array into (rows, 1)
tac_label = pd_sliplabel_allfiles.values.reshape(pd_sliplabel_allfiles.shape[0], 1)


In [4]:
#convert to numpy values
pd_xela_allfiles = np.array(pd_xela_allfiles.values)
pd_sliplabel_allfiles = np.array(pd_sliplabel_allfiles.values)

#split the data into train and test
xela_train, xela_test, sliplabel_train, sliplabel_test = train_test_split(pd_xela_allfiles, pd_sliplabel_allfiles, shuffle=True, test_size=0.1, random_state=42)

#split into validation and holdout
xela_train, xela_valid, sliplabel_train, sliplabel_valid = train_test_split(xela_train, sliplabel_train, shuffle = True, test_size=0.3, random_state=42)

In [5]:
#Batch training for the data

class Batchdata(Dataset):

    def __init__(self, tac_image_train, tac_label_train, tac_image_valid, tac_label_valid, valid = None):
        self.x = tac_image_train
        self.y = tac_label_train
        self.xvalid = tac_image_valid
        self.yvalid = tac_label_valid
        self.valid = valid

    def __len__(self):
        if self.valid == True:
            return self.xvalid.shape[0]
        else:
            return self.x.shape[0]

    def __getitem__(self, idx):

        if self.valid == True:
            return self.xvalid[idx], self.yvalid[idx]
        else:
            return self.x[idx], self.y[idx]


dataset = Batchdata(xela_train, sliplabel_train, xela_valid, sliplabel_valid)
dataset2 = Batchdata(xela_train, sliplabel_train, xela_valid, sliplabel_valid, valid = True)

xelaloader = DataLoader(dataset = dataset, batch_size=32, shuffle=True)
xelaloadervalid = DataLoader(dataset = dataset2, batch_size=32, shuffle=True)


# Other Metrics

In [None]:
#Metrics definition
def detection_metrics(xela_test, sliplabel_test, model_type):
    #predict using the holdout set (DONE)
    predicted = model_type(xela_test).detach()#.numpy()
    _, predicted_cls = predicted.max(1)
    predicted_cls = predicted_cls.numpy().round()

    #Plot the loss values against number of epochs (DONE)
    #validation test (DONE)

    #Print the accuracy
    x = 0
    for i in range(predicted_cls.shape[0]):
        if predicted_cls[i].item() == sliplabel_test[i].item():
            x += 1

    accuracy = x/ float(sliplabel_test.shape[0])
    print(f'Accuracy for slip detection is {accuracy}')

    #Print the fscore
    fscore = f1_score(sliplabel_test.numpy(), predicted_cls, average='macro')
    print(f'Fscore for slip detection is {fscore}')

    #print the Precision
    precision = precision_score(sliplabel_test.numpy(), predicted_cls, average='macro')
    print(f'Precision for slip detection is {precision}')

    #print the Recall
    recall = recall_score(sliplabel_test.numpy(), predicted_cls, average='macro')
    print(f'Recall for slip detection is {recall}')

def slip_metrics(xela_test, sliplabel_test, modeltype):
    #predict using the holdout set (DONE)
    predicted = modeltype(xela_test).detach()#.numpy()
    _, predicted_cls = predicted.max(1)
    predicted_cls = predicted_cls.numpy().round()
    
    #Plot the loss values against number of epochs (DONE)
    #validation test (DONE)

    #Print the accuracy
    x = 0
    for i in range(predicted_cls.shape[0]):
        if predicted_cls[i].item() == sliplabel_test[i].item():
            x += 1

    accuracy = x/ float(sliplabel_test.shape[0])
    print(f'Accuracy for slip prediction for (t+{n}) is {accuracy}')

    #Print the fscore
    fscore = f1_score(sliplabel_test.numpy(), predicted_cls, average='macro')
    print(f'Fscore for slip prediction for (t+{n}) is {fscore}')

    #print the Precision
    precision = precision_score(sliplabel_test.numpy(), predicted_cls, average='macro')
    print(f'Precision for slip prediction for (t+{n}) is {precision}')

    #print the Recall
    recall = recall_score(sliplabel_test.numpy(), predicted_cls, average='macro')
    print(f'Recall for slip prediction for (t+{n}) is {recall}')



In [None]:
#print metrics
detection_metrics(tac_image_test, tac_label_test, model)
slip_metrics(tac_image_test, tac_label_test, model)

# RESNET IMPLEMENTATION


In [None]:
# upscale from 4,4 to 224, 244, then, reshae x, y, z to 3, 224, 224



In [None]:
tac_image.shape, tac_label.shape, tac_image_train.shape, tac_label_train.shape, tac_image_valid.shape, tac_label_valid.shape

In [None]:
plt.imshow(tac_image_train[0][0])

In [None]:
#create the batch training class for the model
mean = np.array([0.5, 0.5, 0.5])
std = np.array([0.25, 0.25, 0.25])


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

composed = torchvision.transforms.Compose ([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)])

class tactile_resnet(Dataset):
    def __init__(self, tac_image, tac_label, transform = None):
        self.x = tac_image
        self.y = tac_label
        self.transform = transform
    
    def __len__(self):
        return self.x.shape[0]
    
    def __getitem__(self, index):
        sample = self.x[index], self.y[index]

        if self.transform:
            sample = self.transform(sample)
        
        return sample


data_set_train = tactile_resnet(tac_image_train, tac_label_train, transform = None)
data_set_valid = tactile_resnet(tac_image_valid, tac_label_valid, transform = None)

dataloader_train = DataLoader(dataset = data_set_train, batch_size = 32, shuffle = True )
dataloader_valid = DataLoader(dataset = data_set_valid, batch_size = 32, shuffle = True )

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    train_loss = []
    valid_loss = []

    t_loss = []
    v_loss = []

    t_acc = []
    v_acc = []

    t_acc_t = []
    v_acc_t = []


    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        total = 0
        correct = 0
        l_xelaloader = 0

        #Training
        for inputs, labels in dataloader_train:
            model.train()
            inputs = inputs.to(device)
            labels = labels.to(device)

            #empty the gradients
            optimizer.zero_grad()

            t = time.time()
            #Forward pass
            outputs = model(inputs)

            print(time.time() - t)
            #compute the loss
            l = criterion(outputs, labels)

            #apply softmax to the output
            _, preds = torch.max(outputs, 1)

            #compute the gradient
            l.backward()

            #update the weights
            optimizer.step()

            scheduler.step()
            #append each loss per batch
            train_loss.append(l.item())

            #accuracy
            total += labels.size(0)
            correct += outputs.round().eq(labels).sum().item()
            l_xelaloader += inputs.shape[0]
        
    
        t_acc = correct/l_xelaloader
        t_acc_t.append(t_acc)

        total = 0
        correct = 0
        l_xelaloader = 0
        
        #append the total training loss and accuracy per epoch
        t_loss.append(np.mean(train_loss))

        #validation
        model.eval()
        for inputs, labels in dataloader_valid:
            outputs_valid = model(inputs)
            lv = criterion(outputs_valid, labels)

            _, preds = torch.max(outputs_valid, 1)

                #append each loss per batch
            valid_loss.append(lv.item())

            total += labels.size(0)
            correct += outputs.round().eq(labels).sum().item()
            l_xelaloader += inputs.shape[0]
    
        v_acc = correct/l_xelaloader
        v_acc_t.append(v_acc)
            # deep copy the model
        if t_acc > best_acc:
            best_acc = t_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        #Metrics
        #append the total validation loss and accuracy per epoch
        v_loss.append(np.mean(valid_loss))

        
    print(f'For training epoch {epoch+1}, loss ={l:.8f}', f'For validation epoch {epoch+1}, loss ={lv:.8f}'  )

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
# Load a pretrained model and reset final fully connected layer.

model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
# Here the size of each output sample is set to 2.
# Alternatively, it can be generalized to nn.Linear(num_ftrs, len(class_names)).

#create the new fully connected layer of the model
model.fc = nn.Linear(num_ftrs, 1)

model = model.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer = optim.SGD(model.parameters(), lr=0.001)

#print(model)

# StepLR Decays the learning rate of each parameter group by gamma every step_size epochs
# Decay LR by a factor of 0.1 every 7 epochs
# Learning rate scheduling should be applied after optimizer’s update
# e.g., you should write your code this way:
# for epoch in range(100):
#     train(...)
#     validate(...)
#     scheduler.step()

step_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

model = train_model(model, criterion, optimizer, step_lr_scheduler, num_epochs=25)


In [None]:
#### ConvNet as fixed feature extractor ####
# Here, we need to freeze all the network except the final layer.
# We need to set requires_grad == False to freeze the parameters so that the gradients are not computed in backward()
model_conv = models.resnet18(pretrained=True)

for param in model_conv.parameters():
    param.requires_grad = False

# Parameters of newly constructed modules have requires_grad=True by default
num_ftrs = model_conv.fc.in_features

model_conv.fc = nn.Linear(num_ftrs, 1)

model_conv = model_conv.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that only parameters of final layer are being optimized as
# opposed to before.
optimizer_conv = optim.SGD(model_conv.fc.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_conv, step_size=7, gamma=0.1)

model_conv = train_model(model_conv, criterion, optimizer_conv,
                         exp_lr_scheduler, num_epochs=25)

# RNN & LSTM IMPLEMENTATION

In [2]:
inputsize = 2 # 2 => 16*16 as 2D
seq_length = 3 # 3 => unroll for x, y, z
num_layers = 2 
hidden_size = 2
learning_rate = 0.001

class RNN(nn.Module):
    def __init__(self, inputsize, hidden_size, num_layers, num_classes = 1):
        super (RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(inputsize, hidden_size, num_layers, batch_first = True)

        self.fc  = nn.Linear(hidden_size*seq_length, num_classes)

    def forward(self, x):
        h0 = torch.zeros(num_layers, x.shape[0], inputsize)
        out, _ = self.rnn(x, h0)
        out = out.reshape(out.shape[0], -1)
        out = self.fc(out)
        return out
        


NameError: name 'nn' is not defined

In [1]:
model = RNN(inputsize, hidden_size, num_layers, num_classes = 1)
model.load_state_dict(torch.load('models/RNN_main.pth', map_location=torch.device('cpu')))

NameError: name 'RNN' is not defined

In [7]:
#Training and validation loop 
model = RNN(inputsize, hidden_size, num_layers, num_classes = 1)

# Loss and optimizer
loss = nn.BCELoss()#pos_weight=torch.tensor([2.0]))
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

train_loss = []
valid_loss = []

t_loss = []
v_loss = []

t_acc = []
v_acc = []

t_acc_t = []
v_acc_t = []
num_epochs = 100
for epoch in range(num_epochs):
    #Train per batch
    
    total = 0
    correct = 0
    l_xelaloader = 0

    model.train()
    for (x, y) in (xelaloader):

        #Forward pass
        y_pred = model(x)
       
        #compute the loss
        l = loss(y_pred, y)

        #empty the gradients
        optimizer.zero_grad()

        #compute the gradient
        l.backward()

        #update the weights
        optimizer.step()


        #append each loss per batch
        train_loss.append(l.item())

        #accuracy
        total += y.size(0)
        correct += y_pred.round().eq(y).sum().item()
        l_xelaloader += x.shape[0]
    
    torch.save(model.state_dict(), "RNN_main.pth",_use_new_zipfile_serialization=False)
    break
    t_acc = correct/l_xelaloader
    t_acc_t.append(t_acc)

    total = 0
    correct = 0
    l_xelaloader = 0

    #calculate and plot the validation loss
    model.eval()
    for (x,y) in (xelaloadervalid):
        y_pred_test = model(x)
        lv = loss(y_pred_test, y)
        #append the loss per batch
        valid_loss.append(lv.item())

        #accuracy
        total += y.size(0)
        correct += y_pred_test.round().eq(y).sum().item()
        l_xelaloader += x.shape[0]
        
    v_acc = correct/l_xelaloader
    v_acc_t.append(v_acc)

    #append the total loss and accuracy per epoch
    t_loss.append(np.mean(train_loss))
    v_loss.append(np.mean(valid_loss))

    print(f'For training epoch {epoch+1}, loss ={l:.8f}', f'For validation epoch {epoch+1}, loss ={lv:.8f}'  )

RuntimeError: For unbatched 2-D input, hx should also be 2-D but got 3-D tensor