In [35]:
import os
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from joblib import dump, load
import matplotlib.pyplot as plt
import csv
import torch
import torch.nn as nn
import torch.optim as optim
from data_loader import CustomSignalData, CustomSignalData1
from torch.autograd import Variable
from encoder import Encoder as E
from helpers import set_cmd_cb, rms_formuula, get_data, get_all_data, get_shift_data, get_operators, plot_cfs_mat, roll_data


In [36]:
DEVICE = torch.device("cpu")
def getFeatureMatrix(rawDataMatrix, windowLength, windowOverlap):
    rms = lambda sig: np.sqrt(np.mean(sig**2))
    nChannels,nSamples = rawDataMatrix.shape    
    I = int(np.floor(nSamples/(windowLength-windowOverlap)))
    featMatrix = np.zeros([nChannels, I])
    for channel in range(nChannels):
        for i in range (I):
            wdwStrtIdx=i*(windowLength-windowOverlap)
            sigWin = rawDataMatrix[channel][wdwStrtIdx:(wdwStrtIdx+windowLength-1)] 
            featMatrix[channel, i] = rms(sigWin)
    featMatrixData = np.array(featMatrix)
    return featMatrixData

class FFNN(torch.nn.Module):
    def __init__(self, inputSize, outputSize):
        super(FFNN, self).__init__()
        self.encoder = torch.nn.Sequential(
            torch.nn.Linear(inputSize, 9, bias=False),
            torch.nn.Sigmoid()
        )
        self.classifer = torch.nn.Sequential(
            torch.nn.Linear(9, outputSize, bias=False),
            # torch.nn.Softmax(dim=1)
        )

    def forward(self, x, encoder=None):
        if not encoder:
            encoder = self.encoder
        z = encoder(x)
        class_z = self.classifer(z)

        return class_z

class Operator(nn.Module):
    def __init__(self, in_features, n_rotations):
        super(Operator, self).__init__()
        """
        Args:
          in_features (int): Number of input features which should be equal to xsize.
          out_features (out): Number of output features which should be equal to ysize.
        """
        self.in_features = in_features
        self.core = torch.nn.Parameter(torch.zeros(3*self.in_features**2)- 0*torch.diag(torch.rand(3*self.in_features**2)/10))
        self.core.requires_grad = True
        self.n_rotations = n_rotations
        
    def rotate_batch(self, x, d, out_features):
      rotated = torch.empty(x.shape[0], 3*out_features*out_features, device=DEVICE)
      phies = [torch.linalg.matrix_power(self.core,i).to(DEVICE) for i in range (0,self.n_rotations+0)]
      for i in range (x.shape[0]):
        rotated[i] = phies[(d[i]+0)%4].matmul(x[i]) 
      return rotated

    def forward(self, x, d):
        """
        Args:
          x of shape (batch_size, 3, xsize, xsize): Inputs.
        
        Returns:
          y of shape (batch_size, 3*xsize^2): Outputs.
        """
        z = self.rotate_batch(x, d, self.in_features)
        return z
def get_tensor(arr):
    return torch.tensor(arr, device=DEVICE,dtype=torch.float )

def rotate_batch(x, d, out_features):
    M = torch.diag(torch.ones(8)).roll(-1,1)
    used_bases = [torch.linalg.matrix_power(M,i).to(DEVICE) for i in range (8)]
    rotated = torch.empty(x.shape, device=DEVICE)
    for i in range (x.shape[0]):
        rotated[i] = used_bases[d[i]].matmul(x[i]) 
    return rotated

def clf_acc(model, loader, masks = None, encoder = None):
    model.eval()
    correct = 0
    iter = 0
    with torch.no_grad():
        for inputs, labels,_,_ in loader:
            inputs = inputs.to(DEVICE)
            if masks is not None:
                inputs = inputs * masks[:inputs.size()[0]]
            labels = labels.to(DEVICE)
            labels = labels.flatten()
            if encoder:
                pred = model(inputs, encoder)
            else:
                pred = model(inputs)
            correct += (1-torch.abs(torch.sign(torch.argmax(pred,dim = 1)- labels))).mean().item()
            iter += 1
    return correct/iter

def compute_accuracy(a, b, loader):
    a.eval()
    b.eval()
    
    correct = 0
    iter = 0
    
    with torch.no_grad():
        for inputs1, inputs2, shift1, shift2, labels, _ in loader:
            inputs1 = inputs1.to(DEVICE)
            inputs2 = inputs2.to(DEVICE)
            shift1 = -shift1.int().flatten().to(DEVICE)
            shift2 = -shift2.int().flatten().to(DEVICE)
            labels = labels.flatten().to(DEVICE)
            # zero the parameter gradients
            optimizer.zero_grad()
            
            # forward + backward + optimize
            y1 = a(inputs1)
            y_tr_est1 = rotate_batch(y1,shift1,6)
            y_tr1 = b(y_tr_est1)

            y2 = a(inputs2)
            y_tr_est2 = rotate_batch(y2,shift1,6)
            y_tr2 = b(y_tr_est2)

            correct += (1-torch.abs(torch.sign(torch.argmax(y_tr1,dim = 1)- labels))).mean().item() + \
                    (1-torch.abs(torch.sign(torch.argmax(y_tr2,dim = 1)- labels))).mean().item()
            iter += 1
    return correct * 0.5 / iter

In [37]:
subject = '22'
No_shift = '1'

Fs = 1000
windowLength = int(np.floor(0.1*Fs))  #160ms
windowOverlap =  int(np.floor(50/100 * windowLength))

# X_train = np.zeros([0,8])
# y_train= np.zeros([0])
X_test = np.zeros([0,8])
y_test = np.zeros([0])

for shift in range(0,int(No_shift)): 
    for files in sorted(os.listdir(f'Subject_{subject}/Shift_5/')):
        _, class_,_, rep_ = files.split('_')
        if int(class_) in [1,2,3,4,5,6,7,8,9]:
            df = pd.read_csv(f'Subject_{subject}/Shift_5/{files}',skiprows=0,sep=' ',header=None)
            data_arr = np.stack([np.array(df.T[i::8]).T.flatten().astype('float32') for i in range (8)])
            data_arr -= 121
            data_arr /= 255.0
            feaData = getFeatureMatrix(data_arr, windowLength, windowOverlap)
            
            if not class_.startswith('9'):
                rms_feature = feaData.sum(0)
                baseline = 2*rms_feature[-50:].mean()
                start_ = np.argmax(rms_feature[::1]>baseline)
                end_  = -np.argmax(rms_feature[::-1]>baseline)
                feaData = feaData.T[start_:end_]
            else:
                feaData = feaData.T

            # if rep_.startswith('2'):
            #     X_test = np.concatenate([X_test,feaData])
            #     y_test = np.concatenate([y_test,np.ones_like(feaData)[:,0]*int(class_)-1])
            
            X_test = np.concatenate([X_test,feaData])
            y_test= np.concatenate([y_test,np.ones_like(feaData)[:,0]*int(class_)-1])


In [38]:
print(X_test)
print(X_test.shape)
print(y_test)
print(y_test.shape)

[[0.01358471 0.01684657 0.0059903  ... 0.01526977 0.03508222 0.05078959]
 [0.01459355 0.0197342  0.00832357 ... 0.02019328 0.04748765 0.05708802]
 [0.01294052 0.02060827 0.00856275 ... 0.01890604 0.04923975 0.064333  ]
 ...
 [0.00506272 0.00176261 0.00294942 ... 0.0031034  0.00197066 0.00336747]
 [0.00551785 0.00193085 0.00312833 ... 0.00294942 0.00184865 0.00339046]
 [0.00533391 0.00175378 0.00313112 ... 0.00306911 0.00186016 0.00336776]]
(4689, 8)
[0. 0. 0. ... 8. 8. 8.]
(4689,)


LOAD LEARNING MODEL

In [39]:
logRegres  = load('LogisticRegression1.joblib')

DEVICE = torch.device("cpu") # operations is in CPU or GPU.
M = torch.diag(torch.ones(8)).roll(-1,1) # Create a diagnoise matrix then shift it to the right
used_bases = [torch.linalg.matrix_power(M,i).to(DEVICE) for i in range (8)] #

N_points = 1000

TEST ACCURACY OF LOGISTIC rEGRESSION

In [40]:
accuracies_LosReg_shift = []
for i in range (-4, 4):
    X_test_shift = roll_data(X_test, i)
    accuracies_LosReg_shift.append(logRegres.score(X_test_shift,y_test)) 

accuracies_LosReg = logRegres.score(X_test, y_test)
print(accuracies_LosReg_shift)
print(accuracies_LosReg)

[0.3529537214757944, 0.3508210705907443, 0.4130944764342077, 0.3702281936447004, 0.40840264448709745, 0.5397739390061846, 0.4049904030710173, 0.32075069311153764]
0.40840264448709745


TEST ACCURACY OF FEED FORWARD NEURAL NETWORK 

In [41]:
 # This model is without opeartor. It is similiarily to the classifier model but do not have encoder
modelWOoperator = FFNN(8,9)
modelWOoperator.load_state_dict(torch.load("modelwoOperator.pt")) # loaded from the file: parameters learned during training.
modelWOoperator.eval() # evaluation mode ensures consistent behavior during inference.


accuracies_ffnn = []
for i in range (-4, 4):
    X_test_shift = roll_data(X_test, i)
    test_shift_dataset = CustomSignalData(get_tensor(X_test_shift), get_tensor(y_test))
    testshiftloader = torch.utils.data.DataLoader(test_shift_dataset, batch_size=24, shuffle=True)
    accuracies_ffnn.append(clf_acc(modelWOoperator, testshiftloader, encoder = None))


In [42]:
print(accuracies_ffnn)

[0.671768707888467, 0.6033163275949809, 0.5858134934488608, 0.6535572584490387, 0.5742630408126481, 0.6167091845857854, 0.6476757368262933, 0.575609411664155]


In [43]:
test_dataset = CustomSignalData(get_tensor(X_test), get_tensor(y_test))
testloader = torch.utils.data.DataLoader(test_dataset, batch_size=24, shuffle=True)
accuracies_ffnn_single = clf_acc(modelWOoperator, testloader, encoder = None)
print(accuracies_ffnn_single)

0.5742630395962267


TEST ACCURACY OF SELF-SUPERVISED LEARNING

In [44]:
classifier = FFNN(8,9) # This indicates that the neural network expects input data with 8 features and will produce output predictions across 9 classes.
encoder = E(8,8)
encoder.load_state_dict(torch.load("encoder.pt")) # contains the learned parameters (weights and biases) of the encoder model
recovered_points_= torch.load("reference_points.pt") # These points represent reference points for inference or evaluation in the model
print(recovered_points_)
classifier.load_state_dict(torch.load("classifier.pt")) # contains the weights and biases learned during training.
classifier.eval() # sets the model to evaluation mode.
encoder.eval() # sets the model to evaluation mode.
# with torch.no_grad():
#     encoder.eval()
#     N_points = 1000
#     rand_idx = np.random.choice(all_X_test.shape[0], N_points)
#     y_tr = encoder(get_tensor(all_X_test[rand_idx]))
#     recovered_points_ = rotate_batch(y_tr,-all_shift_test[rand_idx].flatten(), 6)
#     del y_tr

accuracies_inferred_operator = []
for i in range (-4, 4):
    X_test_shift = roll_data(X_test, i)
    y1 = encoder(get_tensor(X_test_shift))
    y_tr_rotated = torch.zeros(X_test_shift.shape[0])
    for j, y_ in enumerate(y1):
        distances = np.zeros(8)
        for d in (range(-4,4)):
            x_rotated = used_bases[d].matmul(y_).repeat(N_points,1)
            distances[d] = ((x_rotated-recovered_points_)**2).mean(1).topk(2, largest=False)[0].mean()
        y_tr_rotated[j] = distances.argmin()
    y_tr_est1 = rotate_batch(y1, y_tr_rotated.int(),6)
    y_tr1 = classifier(y_tr_est1).argmax(1)
    # print(y_tr1- get_tensor(y_test).flatten())
    accuracies_inferred_operator.append((1-torch.abs(torch.sign(y_tr1- get_tensor(y_test).flatten()))).mean())

print(accuracies_inferred_operator)

tensor([[ 1.1943e-02,  7.0172e-03,  1.8266e-02,  ...,  1.5343e-02,
          2.2784e-03,  3.2136e-02],
        [ 1.1721e-01,  5.5396e-02,  1.8309e-01,  ...,  6.2217e-02,
          3.4511e-02,  4.1451e-02],
        [ 8.6316e-02,  3.0073e-02,  1.6398e-01,  ...,  5.3353e-02,
         -9.6507e-03,  1.6960e-01],
        ...,
        [ 1.7151e-02,  3.5203e-02,  3.3019e-04,  ...,  5.2602e-02,
          7.1660e-02,  2.5194e-02],
        [ 1.4617e-01,  2.4301e-01,  1.2075e-01,  ...,  4.0121e-02,
         -2.9864e-02,  2.1188e-01],
        [-5.6227e-02,  3.3742e-01,  3.7218e-01,  ...,  9.2131e-02,
         -7.3398e-02,  2.3686e-01]])


KeyboardInterrupt: 

In [None]:
# No need for shifting or rotating, directly use X_test
y1 = encoder(get_tensor(X_test))

# Calculate the rotated indices
y_tr_rotated = torch.zeros(X_test.shape[0])
for j, y_ in enumerate(y1):
    distances = np.zeros(8)
    for d in range(-4, 4):
        x_rotated = used_bases[d].matmul(y_).repeat(N_points, 1)
        distances[d] = ((x_rotated - recovered_points_)**2).mean(1).topk(2, largest=False)[0].mean()
    y_tr_rotated[j] = distances.argmin()

# Estimate rotated representations
y_tr_est1 = rotate_batch(y1, y_tr_rotated.int(), 6)

# Classify the rotated representations
y_tr1 = classifier(y_tr_est1).argmax(1)

# Calculate accuracy
accuracy = (1 - torch.abs(torch.sign(y_tr1 - get_tensor(y_test).flatten()))).mean()

print(accuracy)

tensor(0.5967)


In [46]:
27*9*8

1944