In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import letRun #This library can be deleted, it is used for debugging
import RoboSkin as sk
import cv2 
import pickle

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(torch.version.cuda)
print("GPU:",torch.cuda.is_available())
def sigmoid(x):                                        
   return 1 / (1 + np.exp(-x))

11.7
GPU: True


In [2]:
##DATA SET CREATOR
class dataset:
    def __init__(self,names=["flat_detection.avi","edge_detection.avi","flat_slip_detection.avi"]):
        self.path=letRun.path
        self.names=names
        self.SIZE=0.3
        name="C:/Users/dexte/OneDrive/Documents/AI/Data_Labeller/pickle_imputer_small.pkl" #use standard imputer or one for small
        self.reg=None
        with open(name,'rb') as file:
            self.reg=pickle.load(file)
    def predict(self,reg,dat):
        p=reg.predict(dat)
        p=(p.reshape((p.shape[0],p.shape[1]//2,2))*255/self.SIZE)
        return p
    def generate(self,STORE=5,y_labels=[]):
        BIG_DATA_X=None
        BIG_DATA_y=None
        assert len(y_labels)>=len(self.names),"Incorrect length of labels"
        for j,name in enumerate(self.names):
            skin=sk.Skin(videoFile=self.path+name)#videoFile=path+"Movement4.avi") #load skin object using demo video
            cap = cv2.VideoCapture(self.path+name)
            length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            skin.sharpen=False #falsify the sharpness if recorded with sharpness
            frame=skin.getFrame()
            h=frame.shape[1]*self.SIZE
            w=frame.shape[0]*self.SIZE
            frame=cv2.resize(frame,(int(h),int(w)),interpolation=cv2.INTER_AREA)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).flatten()/255
            past=self.predict(self.reg,np.array([frame]))[0]
            initial=past.copy()
            initial_frame=frame.copy()

            X=[]
            y=[] #label as [edge surface soft hard slippery]
            lastFrames=[]
            gyro=np.zeros((length,6))
            try:
                gyro=np.load(self.path+name.replace(".avi","")+"_gyro.npy")
            except:
                pass
            for i in range(length): #lop through all
                frame_=skin.getFrame()
                frame=cv2.resize(frame_,(int(h),int(w)),interpolation=cv2.INTER_AREA)
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).flatten()/255
                points=self.predict(self.reg,np.array([frame]))[0]
                #get pressure map
                diff=np.sum(np.abs(initial_frame-frame))/(frame.shape[0]*3)
                vecs=initial-points
                lastFrames.append(np.concatenate((vecs.flatten(),gyro[i]*1)))
                if len(lastFrames)>STORE: lastFrames.pop(0)
                if diff>0.01 and len(lastFrames)==STORE: #significant contact
                    X.append(np.array(lastFrames).flatten()) #store temporal element
                    y.append(y_labels[j])
            if type(BIG_DATA_y)==type(None):
                BIG_DATA_y=np.array(y.copy())
                BIG_DATA_X=np.array(X.copy())
            else:
                BIG_DATA_y= np.concatenate((np.array(y.copy()),BIG_DATA_y))
                BIG_DATA_X= np.concatenate((np.array(X.copy()),BIG_DATA_X))
        a,b=np.array(BIG_DATA_X),np.array(BIG_DATA_y)
        a=a.reshape((a.shape))
        return a,b



### Gyro plotter

In [3]:
gyro_edge=np.load(letRun.path+"edge_detection"+"_gyro.npy")
gyro_flat=np.load(letRun.path+"flat_detection"+"_gyro.npy")
gyro_slip=np.load(letRun.path+"flat_slip_detection"+"_gyro.npy")



In [5]:
plt.plot(gyro_edge[:,0],label="Accelerometer x")
plt.plot(gyro_edge[:,1],label="Accelerometer y")
plt.plot(gyro_edge[:,2],label="Accelerometer z")

plt.plot(gyro_flat[:,0],label="Accelerometer x")
plt.plot(gyro_flat[:,1],label="Accelerometer y")
plt.plot(gyro_flat[:,2],label="Accelerometer z")

plt.plot(gyro_slip[:,0],label="Accelerometer x")
plt.plot(gyro_slip[:,1],label="Accelerometer y")
plt.plot(gyro_slip[:,2],label="Accelerometer z")

plt.title("Accelerometer over time")
plt.legend(loc="lower left")
plt.show()

In [96]:
plt.plot(gyro_edge[:,3],c="g",label="Accelerometer x")
plt.plot(gyro_edge[:,4],c="g",label="Accelerometer y")
plt.plot(gyro_edge[:,5],c="g",label="Accelerometer z")

plt.plot(gyro_flat[:,3],c="r",label="Accelerometer x")
plt.plot(gyro_flat[:,4],c="r",label="Accelerometer y")
plt.plot(gyro_flat[:,5],c="r",label="Accelerometer z")

plt.plot(gyro_slip[:,3],c="b",label="Accelerometer x")
plt.plot(gyro_slip[:,4],c="b",label="Accelerometer y")
plt.plot(gyro_slip[:,5],c="b",label="Accelerometer z")

plt.title("Gyro over time")
plt.legend(loc="lower left")
plt.show()

### Main code

In [6]:
path="C:/Users/dexte/github/RoboSkin/Code/Models/labeller/"
lin_path="/its/home/drs25/Documents/GitHub/RoboSkin/Code/Models/labeller"

d=dataset()
X,ya=d.generate(STORE=5,y_labels=[[0,1,0,1,0],[1,0,0,1,0],[0,1,0,0,1]])
Xa=X/100
ya=ya/10

X, data_test, y, labels_test = train_test_split(Xa, ya, test_size=0.20, random_state=42)

print(X.shape,y.shape)
print(data_test.shape,labels_test.shape)

(756, 1360) (756, 5)
(190, 1360) (190, 5)


In [7]:

# Define the size of the input (n) and output (m) layers
n_inputs = X.shape[1]
m_outputs = len(y[0])


# Convert data to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y, dtype=torch.float32).to(device)

# Define the neural network model
class SimpleNeuralNetwork(nn.Module):
    def __init__(self, input_size, output_size,layers=[400,150,10],drop_out_prob=0.2):
        super(SimpleNeuralNetwork, self).__init__()
        self.fc=[nn.Linear(input_size, layers[0])]
        self.fc.append(nn.ReLU())
        self.fc.append(nn.Dropout(p=drop_out_prob))
        for i in range(1,len(layers)): #create layers 
                self.fc.append(nn.Linear(layers[i-1], layers[i]))
                self.fc.append(nn.ReLU())
                self.fc.append(nn.Dropout(p=drop_out_prob))
        self.fc.append(nn.Linear(layers[-1], output_size))
        self.fc_layers = nn.Sequential(*self.fc)
    def forward(self, x):
        x=self.fc_layers(x)
        return x

# Create the neural network
model = SimpleNeuralNetwork(n_inputs, m_outputs).to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)



In [8]:
# Training loop
def get_acc(predictions,should_be=[]):
    predictions*=10 #normalize
    pred_array=np.zeros_like(predictions)
    inds=np.argmax(predictions[:,0:2],axis=1) #convert at threshold
    inds2=np.argmax(predictions[:,2:-1],axis=1) #convert at threshold
    for i in range(len(pred_array)):
        pred_array[i][inds[i]]=1
        pred_array[i][2+inds2[i]]=1
    correct=0
    for j,pred in enumerate(pred_array): #loopthrough predictions
        inds=np.where(pred==1) #
        if len(should_be)>0: #validation task
            sb=should_be[j]*10
            sb=np.where(sb==1)
            if len(sb[0])==len(inds[0]):
                c=0
                for i in range(len(sb[0])):
                    if sb[0][i]==inds[0][i]: c+=1
                if c==len(sb[0]): correct+=1
    return correct/len(should_be)

def train(model,num_epochs=2500):
    loss_ar=[]
    accuracies=[]
    for epoch in range(num_epochs):
        # Forward pass
        y_pred = model(X_tensor)

        # Compute the loss
        loss = criterion(y_pred, y_tensor)

        # Zero gradients, backward pass, and update the weights
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loss_ar.append(loss.item())
        #predict
        predictions = model(torch.tensor(X, dtype=torch.float32).to(device))
        accuracies.append(get_acc(predictions.cpu().detach().numpy(),should_be=y))
        # Print the current loss to monitor training progress
        if epoch%1000==0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}","Accuracy",accuracies[-1])
    return np.array(loss_ar),np.array(accuracies)



In [13]:
plt.title("Loss while training tactile model")
plt.ylabel("Loss and accuracy/5")
plt.xlabel("Epoch")
a,b=train(model)
#plt.plot(a,label="Loss")
plt.plot(b,label="Accuracy")
plt.legend(loc="upper right")
plt.show()

Epoch [1/2500], Loss: 0.0443 Accuracy 0.062169312169312166
Epoch [1001/2500], Loss: 0.0003 Accuracy 0.593915343915344
Epoch [2001/2500], Loss: 0.0003 Accuracy 0.6111111111111112


In [16]:
# After training, you can use the trained model for predictions on new data.
# For example, if you have new input data 'X_new', you can do:
with torch.no_grad():
     model.eval()
     predictions = model(torch.tensor(data_test, dtype=torch.float32).to(device))


def predict(predictions,should_be=[]):
    predictions*=10 #normalize
    pred_array=np.zeros_like(predictions)
    inds=np.argmax(predictions[:,0:2],axis=1) #convert at threshold
    inds2=np.argmax(predictions[:,2:-1],axis=1) #convert at threshold
    for i in range(len(pred_array)):
        pred_array[i][inds[i]]=1
        pred_array[i][2+inds2[i]]=1
    names=["edge", "surface", "soft", "hard", "slippery"]
    array=[]
    correct=0
    for j,pred in enumerate(pred_array): #loopthrough predictions
        inds=np.where(pred==1) #
        ar=[]
        if len(should_be)>0: #validation task
            sb=should_be[j]*10
            sb=np.where(sb==1)
            if len(sb[0])==len(inds[0]):
                c=0
                for i in range(len(sb[0])):
                    if sb[0][i]==inds[0][i]: c+=1
                if c==len(sb[0]): correct+=1
        for i in inds[0]:
            ar.append(names[i])
        array.append(ar)
    if correct!=0: print("Percentage correct:",(correct/len(should_be))*100,"%")
    return array

p=predict(predictions.cpu(),should_be=labels_test)
p[0:10]  

Percentage correct: 63.1578947368421 %


[['surface', 'hard'],
 ['surface', 'hard'],
 ['surface', 'hard'],
 ['edge', 'hard'],
 ['surface', 'hard'],
 ['edge', 'hard'],
 ['surface', 'hard'],
 ['surface', 'hard'],
 ['edge', 'hard'],
 ['surface', 'hard']]

# Experiments

### With and without slippage

In [20]:
TRIALS=100
#without slippage
#merge datasets
d=dataset(names=["flat_detection.avi","edge_detection.avi"])
X,ya=d.generate(STORE=5,y_labels=[[0,1,0,1,0],[1,0,0,1,0]])
Xa=X/100
ya=ya/10

X, data_test, y, labels_test = train_test_split(Xa, ya, test_size=0.20, random_state=42)

average_a=np.zeros((2500,))
p_=0

for i in range(TRIALS):
    print("Trial B",i,"Acc:",p_/(i+1))
    # Create the neural network
    model = SimpleNeuralNetwork(n_inputs, m_outputs).to(device)

    # Define the loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    l,a=train(model)
    average_a+=np.array(a)
    #unseen data
    with torch.no_grad():
        model.eval()
        predictions = model(torch.tensor(data_test, dtype=torch.float32).to(device))
    p_+=get_acc(predictions.cpu(),should_be=labels_test)

average_a=average_a/(TRIALS) #average by 10 and then divide by 5 for scaling
p_/=TRIALS

average_a1=np.zeros((2500,))

d=dataset()
X,ya=d.generate(STORE=5,y_labels=[[0,1,0,1,0],[1,0,0,1,0],[0,1,0,0,1]])
Xa=X/100
ya=ya/10

X, data_test, y, labels_test = train_test_split(Xa, ya, test_size=0.20, random_state=42)
X_tensor = torch.tensor(X, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y, dtype=torch.float32).to(device)

p=0
for i in range(TRIALS):
    print("Trial A",i,"Acc:",p/(i+1))
    # Create the neural network
    model = SimpleNeuralNetwork(n_inputs, m_outputs).to(device)

    # Define the loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    l,a=train(model)
    average_a1+=np.array(a)

    #unseen data
    with torch.no_grad():
        model.eval()
        predictions = model(torch.tensor(data_test, dtype=torch.float32).to(device))
    p+=get_acc(predictions.cpu(),should_be=labels_test)
average_a1=average_a1/(TRIALS) #average by 10 and then divide by 5 for scaling
p/=TRIALS
#plt.plot(average_l,'--',c="b",label="Loss with slip data")
plt.plot(average_a1,c="b",label="Accuracy with slip data")


print("Accuracies",p,p_)
plt.title("Performance of models with and without slippage over 100 trials")
plt.ylabel("Accuracy")
plt.xlabel("Epoch")
#plt.plot(average_l,c="r",label="Loss without slip data")
plt.plot(average_a,c="r",label="Accuracy without slip data")
plt.legend(loc="lower right")
plt.show()

print(np.std(average_a),np.std(average_a1))

Trial B 0 Acc: 0.0
Epoch [1/2500], Loss: 0.0349 Accuracy 0.4354166666666667
Epoch [1001/2500], Loss: 0.0003 Accuracy 0.93125
Epoch [2001/2500], Loss: 0.0003 Accuracy 0.94375
Trial B 1 Acc: 0.5
Epoch [1/2500], Loss: 0.0498 Accuracy 0.5354166666666667
Epoch [1001/2500], Loss: 0.0010 Accuracy 0.5708333333333333
Epoch [2001/2500], Loss: 0.0009 Accuracy 0.5708333333333333
Trial B 2 Acc: 0.5111111111111111
Epoch [1/2500], Loss: 0.0539 Accuracy 0.4979166666666667
Epoch [1001/2500], Loss: 0.0007 Accuracy 0.8791666666666667
Epoch [2001/2500], Loss: 0.0006 Accuracy 0.9770833333333333
Trial B 3 Acc: 0.63125
Epoch [1/2500], Loss: 0.0299 Accuracy 0.5541666666666667
Epoch [1001/2500], Loss: 0.0004 Accuracy 0.9729166666666667
Epoch [2001/2500], Loss: 0.0003 Accuracy 0.96875
Trial B 4 Acc: 0.7033333333333334
Epoch [1/2500], Loss: 0.0167 Accuracy 0.26458333333333334
Epoch [1001/2500], Loss: 0.0003 Accuracy 0.8854166666666666
Epoch [2001/2500], Loss: 0.0003 Accuracy 0.9
Trial B 5 Acc: 0.7513888888888888

### Store size

In [22]:
TRIALS=20
TIMES=15
average_a=np.zeros((TIMES,2500))

for i in range(1,TIMES):
    print(i,average_a[-1][-1])
    d=dataset()
    x,y=d.generate(STORE=i,y_labels=[[0,1,0,1,0],[1,0,0,1,0],[0,1,0,0,1]])
    SIZE=x.shape[1]
    x=x/100
    y=y/10
    n_inputs = x.shape[1]
    m_outputs = len(y[0])
    for j in range(TRIALS):
        X, data_test, Y, labels_test = train_test_split(x, y, test_size=0.20, random_state=42)
        X_tensor = torch.tensor(X, dtype=torch.float32).to(device)
        y_tensor = torch.tensor(Y, dtype=torch.float32).to(device)
        
        model = SimpleNeuralNetwork(n_inputs, m_outputs).to(device)
        # Define the loss function and optimizer
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=0.01)
        l,a=train(model)
        average_a[i]+=a
    average_a[i]/=TRIALS #get average
#diplay
plt.title("Performance of models with different sizes of temporal information")
plt.ylabel("Accuracy")
plt.xlabel("Epoch")
#plt.plot(average_l,c="r",label="Loss without slip data")


for i,dat in enumerate(average_a):
    plt.plot(dat,label="T="+str(i))

plt.legend(loc="lower right")
plt.show()

1 0.0
Epoch [1/2500], Loss: 0.0451 Accuracy 0.16173361522198731
Epoch [1001/2500], Loss: 0.0003 Accuracy 0.1416490486257928
Epoch [2001/2500], Loss: 0.0002 Accuracy 0.12790697674418605
Epoch [1/2500], Loss: 0.0177 Accuracy 0.12050739957716702
Epoch [1001/2500], Loss: 0.0005 Accuracy 0.113107822410148
Epoch [2001/2500], Loss: 0.0004 Accuracy 0.11416490486257928
Epoch [1/2500], Loss: 0.0498 Accuracy 0.22727272727272727
Epoch [1001/2500], Loss: 0.0006 Accuracy 0.15961945031712474
Epoch [2001/2500], Loss: 0.0005 Accuracy 0.1427061310782241
Epoch [1/2500], Loss: 0.0107 Accuracy 0.2452431289640592
Epoch [1001/2500], Loss: 0.0006 Accuracy 0.16913319238900634
Epoch [2001/2500], Loss: 0.0005 Accuracy 0.16596194503171247
Epoch [1/2500], Loss: 0.0390 Accuracy 0.2769556025369979
Epoch [1001/2500], Loss: 0.0003 Accuracy 0.13530655391120508
Epoch [2001/2500], Loss: 0.0003 Accuracy 0.13530655391120508
Epoch [1/2500], Loss: 0.0253 Accuracy 0.0507399577167019
Epoch [1001/2500], Loss: 0.0003 Accuracy 0.

In [20]:
np.save(path+"data_of_t_size",average_a)
print(np.max(average_a,axis=1).shape)

(15,)


In [21]:
plt.title("Accuricies vs T size")
plt.plot(np.max(average_a,axis=1))
plt.ylabel("Accuracy")
plt.xlabel("Temporal vector size (T)")
plt.show()