In [None]:
import torch
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
import time
from scipy.spatial.distance import cdist
from sklearn.metrics import average_precision_score

In [None]:
def RVSML_OT_Learning(trainset, templatenum, lambda_val, delta = 1, lambda1 = 0.1, 
                     lambda2 = 10, max_nIter = 1, err_limit = 1.0e-6):
    lambda1=torch.tensor(0.1, requires_grad=True)
    lambda2=torch.tensor(10.0, requires_grad=True)
    learning_rate = 1e-3
    optimizer = torch.optim.Adam([lambda1, lambda2], learning_rate)

    classnum = len(trainset)
    downdim = classnum * templatenum
    dim = len(trainset[0][0][0])

    trainsetnum = [0] * classnum
    virtual_sequence = [None] * classnum
    active_dim = 0

    for c in range(0, classnum-1):
        trainsetnum[c] = len(trainset[c])
        virtual_sequence[c] = torch.zeros((templatenum, downdim))
        for a_d in range(0, templatenum-1):
            active_dim += 1
            virtual_sequence[c][a_d][active_dim] = 1

    R_A = torch.zeros((dim, dim))
    R_B = torch.zeros((dim, downdim))
    N = sum(trainsetnum);
    for c in range(classnum):
        for n in range(trainsetnum[c]):
             seqlen = len(trainset[c][n])
             T_ini = torch.ones((seqlen, templatenum)) / (seqlen * templatenum)
             for i in range(seqlen):
                 trainset[c][n][i] = torch.tensor(trainset[c][n][i])
                 temp_ra = torch.ger(trainset[c][n][i], trainset[c][n][i])
                 for j in range(templatenum):
                     R_A += T_ini[i, j] * temp_ra
                     R_B += T_ini[i, j] * torch.ger(trainset[c][n][i], virtual_sequence[c][j])

    R_I = R_A + lambda_val * N * torch.eye(dim)
    L = torch.linalg.solve(R_I, R_B)

    loss_old = 1e8

    for nIter in range(max_nIter):
        loss = 0
        R_A = torch.zeros((dim, dim))
        R_B = torch.zeros((dim, downdim))
        N = np.sum(trainsetnum)
    
        for c in range(classnum):
            for n in range(trainsetnum[c]):
                seqlen = len(trainset[c][n])
                trainset[c][n] = torch.stack(trainset[c][n])
                dist, T = OPW_w(torch.matmul(trainset[c][n], L), virtual_sequence[c], verbose=0, lambda1=lambda1, lambda2=lambda2)
                # print(torch.tensor(T).shape)
                loss += dist
                
                for i in range(seqlen):
                    temp_ra = torch.ger(trainset[c][n][i], trainset[c][n][i])
                    for j in range(templatenum):
                        
                        R_A += ((T[i, j] * temp_ra))
                        R_B += (T[i, j] * torch.ger(trainset[c][n][i], virtual_sequence[c][j]))

        
        loss = loss / N + torch.trace(torch.matmul(L.t(), L))

        if abs(loss - loss_old) < err_limit:
            break
        else:
            loss_old = loss
        print(loss)
    
        R_I = R_A + lambda_val * N * torch.eye(dim)
        L = torch.linalg.solve(R_I, R_B)

        loss.backward()
        optimizer.step()
        print(lambda1)
        print(lambda2)
    return L


In [None]:
def OPW_w(x: torch.Tensor, y: torch.Tensor, a: torch.Tensor = None, b: torch.Tensor = None, std=1, verbose=0, lambda1=0.1, lambda2=10,
          tol=.5e-2, maxIter=20, p_norm='inf', metric='sqreuclidean'):
          
    assert y.size(1) == x.size(1), "The dimensions of instances in the input sequences must be the same!"
    N = x.size(0)
    M = y.size(0)
    col_x = torch.arange(1, N+1)/N
    col_x = col_x.view(N, 1)
    col_y = torch.arange(1, M+1)/M
    relative_pos = col_x-col_y

    l = torch.abs(relative_pos) / ((1/N**2 + 1/M**2)**0.5)
    P = torch.exp(-l**2/(2*std**2)) / (std*(2*np.pi)**0.5)

    S = lambda1 / (relative_pos**2 + 1)

    D = pdist2(x, y, metric=metric)

    K = P * torch.exp((S - D) / lambda2)

    if a is None:
        a = torch.ones(N, 1) / N

    ainvK = K / a   # [N, M]

    iter = 0
    u = torch.ones(N, 1) / N

    if b is None:
        b = torch.ones(M, 1) / M
    while iter < maxIter:
        # Ensure tensors have the correct shape and scalar type
        ainvK = ainvK.to(torch.float32)
        b = b.to(torch.float32)
        K = K.to(torch.float32)
        u = u.to(torch.float32)

        # Perform the operations
        K_transpose = K.t()
        temp = b / torch.matmul(K_transpose, u)
        temp2 = torch.matmul(ainvK, temp)
        u = 1.0 / temp2
        
        iter += 1
        if iter % 20 == 1 or iter == maxIter:
            v = b / torch.matmul(K.T, u)    # [M, 1]
            u = 1 / torch.matmul(ainvK, v)  # [N, 1]

            criterion = torch.sum(torch.abs(v * torch.matmul(K.T, u) - b), dim=0)
            criterion = criterion.norm(p=float(p_norm))
            if abs(criterion) < tol:
                break

            iter += 1
            if verbose > 0:
                print(f"Iteration : {iter}, Criterion: {criterion}")
    
    U = K * D   # [N, M]
    
    dist = torch.sum(u.double() * torch.matmul(U.double(), v.double()), dim=0)
    T = v.T.double() * (u.double() * K.double())
 
    return dist, T

In [None]:
def pdist2(X, Y, metric='sqreuclidean'):
    if metric.lower() == 'sqreuclidean':
        return distEucSqr(X, Y)
    elif metric.lower() == 'euclidean':
        return torch.sqrt(distEucSqr(X, Y))
    elif metric.lower() == 'L1':
        return distL1(X, Y)
    elif metric.lower() == 'cosine':
        return distCosine(X, Y)
    elif metric.lower() == 'emd':
        return distEmd(X, Y)
    elif metric.lower() == 'chisqr':
        return distChiSqr(X, Y)
    else:
        raise NotImplementedError(f'pdist - unknown metric: {metric}')


def distL1(x: torch.Tensor, y: torch.Tensor):
    return torch.abs(x.unsqueeze(1) - y).sum(dim=-1)


def distCosine(x: torch.Tensor, y: torch.Tensor, eps=1e-8):
    assert x.dtype == torch.float or y.dtype == torch.float, "Inputs must be of type float"
    cos = torch.nn.CosineSimilarity(dim=-1, eps=eps)
    return 1 - cos(x.unsqueeze(1), y)


def distEmd(x: torch.Tensor, y: torch.Tensor):
    x_cdf = torch.cumsum(x, dim=-1)
    y_cdf = torch.cumsum(y, dim=-1)

    return torch.abs(x_cdf.unsqueeze(1) - y_cdf).sum(dim=-1)


def distEucSqr(x: torch.Tensor, y: torch.Tensor):
    return torch.cdist(x, y, p=2)**2


def distChiSqr(x: torch.Tensor, y: torch.Tensor, eps=1e-10):
    a = x.unsqueeze(1) + y
    b = x.unsqueeze(1) - y
    return (b**2 / (a + eps)).sum(dim=-1) / 2

In [None]:
def NNClassifier(classnum, trainset, trainsetnum, testsetdata, testsetdatanum, testsetlabel, 
                 lambda1 = 0.1, lambda2 = 10, delta = 1):
    trainsetdatanum = sum(trainsetnum)
    # trainsetdata = trainsetdata = [None] * trainsetdatanum
    # trainsetlabel = torch.zeros((trainsetdatanum, 1))
    # sample_count = 0

    # for c in range(classnum):
    #     for per_sample_count in range(trainsetnum[c]):
    #         trainsetdata[sample_count] = trainset[c][per_sample_count]
    #         trainsetlabel[sample_count] = c
    #         sample_count += 1
    def getLabel(classid):
        X = torch.zeros((len(classid), torch.max(classid))) - 1
        for i in range(1, torch.max(classid) + 1):
            indx = torch.where(classid == i)[0]
            X[indx, i-1] = 1
        return X
    testsetlabelori = testsetlabel.clone()
    testsetlabel = getLabel(testsetlabelori);
    trainsetlabelfull = getLabel(trainsetlabel);

    k_pool = [1, 3, 5, 7, 9, 11, 15, 30]
    k_num = len(k_pool)
    Acc = torch.zeros((k_num, 1))

    start_time = time.time()

    dis_totrain_scores = torch.zeros((trainsetdatanum, testsetdatanum))
    ClassLabel = torch.arange(0, classnum)
    dis_ap = torch.zeros(testsetdatanum)

    rightnum = torch.zeros(k_num)
    for j in range(testsetdatanum):
        for m2 in range(trainsetdatanum):
            # [Dist, D, matchlength, w] = dtw2(trainsetdata[m2].T, testsetdata[j].T)
            # [Dist, T] = Sinkhorn_distance(trainsetdata[m2], testsetdata[j], lambda, 0)
            [Dist, T] = OPW_w(torch.tensor(trainsetdata[m2]), torch.tensor(testsetdata[j]), lambda1, lambda2)
            dis_totrain_scores[m2, j] = Dist
            if np.isnan(Dist):
                print('Not a number!')
        
        distm, index = torch.sort(dis_totrain_scores[:, j])
        
        for k_count in range(k_num):
            cnt = torch.zeros(classnum)
            for temp_i in range(k_pool[k_count]):
                ind = torch.where(ClassLabel == trainsetlabel[index[temp_i]])[0][0]
                cnt[ind] += 1
            distm2, ind = torch.max(cnt), torch.argmax(cnt)
            predict = ClassLabel[ind]
            predict = predict.item()
            if predict == testsetlabelori[j]:
                rightnum[k_count] += 1
        
        temp_dis = -dis_totrain_scores[:, j]
        temp_dis[np.isnan(temp_dis)] = 0
        average_precision = average_precision_score(trainsetlabelfull[:, testsetlabelori[j]], temp_dis)
        dis_ap[j] = average_precision
        
        Acc = rightnum / testsetdatanum
        Map = torch.mean(dis_ap)

        knn_time = time.time() - start_time
        knn_averagetime = knn_time / testsetdatanum

    return Map, Acc, knn_averagetime


In [None]:
def get_utterances(path):
  X = []
  utterances = []
  count = 0
  with open(path, 'r') as f:
      length_X = len([line for line in f.read().splitlines()])
  with open(path, 'r') as f:
      for line in f.read().splitlines():
          count += 1
          frame = [-abs(float(i)) if i.startswith('-') else abs(float(i)) for i in line.split()]
          if len(frame) > 0:
            utterances.append(frame)
            if count == length_X:
              X.append(utterances)
          else:
            if len(utterances) > 0:
              X.append(utterances)
              utterances = []
          
  return X

In [None]:
X_train = get_utterances(path="/content/Train_Arabic_Digit.txt")
X_test = get_utterances(path="/content/Test_Arabic_Digit.txt")
print(f"Number of training sample: {len(X_train)}")
print(f"Number of testing sample: {len(X_test)}")

In [None]:
trainset = []
trainset_cell = []
trainsetnum = []

for i, data_point in enumerate(X_train, 1):
    trainset_cell.append(data_point)
    
    if i % 660 == 0 or i == len(X_train):
        trainset.append(trainset_cell)
        trainsetnum.append(len(trainset_cell))
        trainset_cell = []

In [None]:
import torch
import numpy as np
import json 

n_class = torch.Tensor(torch.arange(0, 10)).view(10, 1)
y_train = n_class.expand_as(torch.empty((10, 660))).contiguous().view(6600)
y_test = n_class.expand_as(torch.empty((10, 220))).contiguous().view(2200)

In [None]:
y_train

In [None]:
with open('fe.json', 'w') as f:
    json.dump(3, f)

In [60]:
classnum = 10
dim = 60
CVAL = 1

testsetdata = X_test
testsetlabel = y_test
testsetdatanum = len(X_test)

trainsetdata = X_train
trainsetlabel = y_train
trainsetdatanum = len(X_train)

templatenum = 4
lambda_val = 0.01
start_time = time.time()

L = RVSML_OT_Learning(trainset, templatenum, lambda_val, delta = 1, lambda1 = 0.1, 
                     lambda2 = 10, max_nIter = 1, err_limit = 1.0e-6)
                     
RVSML_opw_time = time.time() - start_time
traindownset = [None] * classnum
testdownsetdata = [None] * testsetdatanum

for j in range(classnum):
    traindownset[j] = [None] * trainsetnum[j]
    for m in range(trainsetnum[j]):
        traindownset[j][m] = np.dot(trainset[j][m], L)

testdownsetdata = [None] * testsetdatanum

for j in range(testsetdatanum):
    testdownsetdata[j] = np.dot(testsetdata[j], L)

Map, Acc, knn_averagetime = NNClassifier(classnum, traindownset, trainsetnum, testdownsetdata, testsetdatanum, testsetlabel)
RVSML_opw_acc_1 = Acc[0]

print(f"Training time of RVSML instantiated by OPW is {RVSML_opw_time:.4f}")
print("Classification using 1 nearest neighbor classifier with OPW distance:")
# print(f"MAP is {RVSML_opw_map:.4f}")
print(f"Accuracy is {RVSML_opw_acc_1:.4f}")


KeyboardInterrupt: ignored