In [None]:
import numpy as np
from numpy import linalg as LA
import torch
from torch import linalg as LAT
import scipy
from scipy import linalg as LAS
from scipy.stats import unitary_group
import math
import cv2
import os
from scipy.stats import ortho_group

from tensorly import unfold


In [None]:
def import_data_weiz_reg(numFrames=35,height=50,width=50):
    """
    Collects data from /data/ folder in current working directory, with folder names as labels.
    Inputs:
    -- numFrames=35
    -- height=50,
    -- width=50
    Outputs:
    -- Data as a numpy array (vid number,frame,height,width,color)
    -- Ordered list of video labels
    """
    dataDir = os.getcwd()+"/Weizmann rebuild/"
    dataNameList = os.listdir(dataDir)
    fileCount = 0
    totalFiles = 0
    
    
        
    class_data = []
    labelList = []
    peopleOrder = []
    for foldername in os.listdir(dataDir):
        totalFiles = np.size(os.listdir(dataDir+foldername))
        bigBuffer = np.zeros((totalFiles,numFrames,height,width))
        label = []
        peopleOrder.append(foldername)
        fileCount = 0
        for filename in os.listdir(dataDir+foldername):
            
            vid = cv2.VideoCapture(dataDir+foldername+"/"+filename)
            vidFrames = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
            buffer = np.zeros((numFrames,height,width))
            
            for i in range(numFrames):
                sane, thisFrame = vid.read()                
                if not sane:
                    vid.set(cv2.CAP_PROP_POS_FRAMES, 0)
                    sane, thisFrame = vid.read()
                    if not sane:
                        print("something broke at",i,filename)
                        break
                
                scale = 1
                height2, width2, channels = thisFrame.shape
                centerX,centerY=int(height2/2),int(width2/2)
                radiusX,radiusY = int(centerX*scale), int(centerY*scale)
                minX,maxX=centerX-radiusX,centerX+radiusX
                minY,maxY=centerY-radiusY,centerY+radiusY
                cropped = thisFrame[minX:maxX, minY:maxY]
                
                buffer[i] = cv2.cvtColor(cv2.resize(cropped,(width,height)),cv2.COLOR_BGR2GRAY)
                
            label.append(filename)
            bigBuffer[fileCount] = buffer
            fileCount += 1
            vid.release()
        class_data.append(bigBuffer)
        labelList.append(label)
        
    return class_data, labelList, peopleOrder

data, labels, people = import_data_weiz_reg()

In [None]:
import torch
import scipy
from tensorly import unfold

leaveOut = 'subject1'
labelDict = {labels[0][i] : i for i in range(len(labels[0]))}
peopleDict = {people[i] : i for i in range(len(people))}

leaveOutIndex = peopleDict[leaveOut]
def my_hosvd(A):
    dim = unfold(A,0)
    dim2 = unfold(A,1)
    dim3 = unfold(A,2)
    
    _,_,vh1 = LAS.svd(dim,full_matrices=False)
    _,_,vh2 = LAS.svd(dim2,full_matrices=False)
    _,_,vh3 = LAS.svd(dim3,full_matrices=False)
    
    return [np.matrix.getH(vh1),np.matrix.getH(vh2),np.matrix.getH(vh3)]

def stieflel_to_grassman(stif):
    grass = []
    for i in range(len(stif)):
        grass.append(stif[i]@ortho_group.rvs(stif[i].shape[1]))
    return grass



data_mls = []
for c in range(len(labels[0])):
    data_mls.append([])
    for f in range(3):
        data_mls[c].append([])

for person in range(len(data)):
    if person == leaveOutIndex:
        continue
    for n in range(0,data[person].shape[0]):
        fac = my_hosvd(data[person][n])
        fac = stieflel_to_grassman(fac)
        classNum = labelDict[labels[person][n]]
        data_mls[classNum][0].append(fac[0])
        data_mls[classNum][1].append(fac[1])
        data_mls[classNum][2].append(fac[2])

data_testing_set = []
for c in range(len(labels[0])):
    data_testing_set.append([])
    for f in range(3):
        data_testing_set[c].append([])

for n in range(len(labels[leaveOutIndex])):
    factest = my_hosvd(data[leaveOutIndex][n])
    factest = stieflel_to_grassman(factest)
    classNum = labelDict[labels[leaveOutIndex][n]]
    data_testing_set[classNum][0].append(factest[0])
    data_testing_set[classNum][1].append(factest[1])
    data_testing_set[classNum][2].append(factest[2])


In [None]:
def frob_norm_mat_calc(pset):
  size = len(pset)
  matrix = np.zeros((size,size))
  for i in range(size):
    for j in range(size):
      matrix[i,j] = np.power(np.linalg.norm(pset[i].T@pset[j],ord="fro"),2)
  return matrix

def build_pseudos(trainingData,listPseudos):
  numClasses = len(trainingData)
  for c in range(numClasses):
    for f in range(3):
      listPseudos[c][f].append(frob_norm_mat_calc(trainingData[c][f]))


def frob_norm_vec_calc(pset,x):
  size = len(pset)
  matrix = np.zeros((size,1))
  for i in range(size):
    matrix[i] = np.power(np.linalg.norm(x.T@pset[i],ord="fro"),2)
  return matrix

def error_factor(factorSet,x,subreg):
  numSample = len(factorSet)
  estimate = 0
  for i in range(numSample):
    estimate += (factorSet[i]@factorSet[i].T)*subreg[i]
  return np.linalg.norm(x@x.T-estimate,ord="fro")

def sub_lin_reg(factorSet,x,pseudo):
  #pseudo = frob_norm_mat_calc(factorSet)
  vector = frob_norm_vec_calc(factorSet,x)
  return 2*np.linalg.inv(pseudo + pseudo.T)@vector

def lin_reg_class(pset,x,pseudos):
  numFactors = 3
  error = 0
  for f in range(numFactors):
    subreg = sub_lin_reg(pset[f],x[f],pseudos[f][0])
    error += error_factor(pset[f],x[f],subreg)
  return error

def lin_reg_err(trainingData,x,pseudos):
  numClasses = len(trainingData)
  errors = np.zeros((numClasses,1))
  for c in range(numClasses):
    errors[c] = lin_reg_class(trainingData[c],x,pseudos[c])
  return errors

pseudos = []
for c in range(len(data_mls)):
  pseudos.append([])
  for f in range(3):
    pseudos[c].append([])
build_pseudos(data_mls,pseudos)

for i in range(len(labels[0])):
  pointForTest = [data_testing_set[i][0][0],data_testing_set[i][1][0],data_testing_set[i][2][0]]
  test = lin_reg_err(data_mls,pointForTest,pseudos)
  print(np.argmin(test))

In [None]:
import torch
import scipy
from tensorly import unfold

confusion = np.zeros((len(labels[0]),len(labels[0])))

for leaveOutPerson in range(len(people)):

    leaveOutIndex = leaveOutPerson

    data_mls = []
    for c in range(len(labels[0])):
        data_mls.append([])
        for f in range(3):
            data_mls[c].append([])

    for person in range(len(data)):
        if person == leaveOutIndex:
            continue
        for n in range(0,data[person].shape[0]):
            fac = my_hosvd(data[person][n])
            fac = stieflel_to_grassman(fac)
            classNum = labelDict[labels[person][n]]
            data_mls[classNum][0].append(fac[0])
            data_mls[classNum][1].append(fac[1])
            data_mls[classNum][2].append(fac[2])

    data_testing_set = []
    for c in range(len(labels[0])):
        data_testing_set.append([])
        for f in range(3):
            data_testing_set[c].append([])

    for n in range(len(labels[leaveOutIndex])):
        factest = my_hosvd(data[leaveOutIndex][n])
        factest = stieflel_to_grassman(factest)
        classNum = labelDict[labels[leaveOutIndex][n]]
        data_testing_set[classNum][0].append(factest[0])
        data_testing_set[classNum][1].append(factest[1])
        data_testing_set[classNum][2].append(factest[2])
    pseudosList = []
    for c in range(len(data_mls)):
        pseudosList.append([])
        for f in range(3):
            pseudosList[c].append([])
    build_pseudos(data_mls,pseudosList)
    
    for i in range(len(labels[0])):
        pointForTest = [data_testing_set[i][0][0],data_testing_set[i][1][0],data_testing_set[i][2][0]]
        test = lin_reg_err(data_mls,pointForTest,pseudosList)
        confusion[i,np.argmin(test)] += 1


In [None]:
np.trace(confusion.T)
np.savetxt('confus_weiz_reg.csv',confusion,delimiter=',')