In [1]:
#
# importing the necessary libraries:
# NumPy, SciPy (spatial algorithms and optimization), 
# multiprocessing features for speed-ups, and also
# plotting functions, and a handler for csv files
#
import numpy as np
import scipy.spatial as sp
import scipy.optimize as opt
import matplotlib.pyplot as plt
import csv

In [2]:
#
# Centering a point cloud X
#
def barycentered(X):
    #
    bar = np.sum(X, axis=1)/X.shape[1]
    #
    return np.array([vec - bar for vec in X.T]).T

In [3]:
#
# First constructing a random psd matrix with specified condition number,
# then multiplying it by a random orthogonal matrix 
#
# Bierlaire, M., Toint, P., and Tuyttens, D. (1991). 
# On iterative algorithms for linear ls problems with bound constraints. 
# Linear Algebra and Its Applications, 143, 111–143.
#
def rand_mat_cond(dim=3, cond=5.0):
    #
    log_cond = np.log(cond)
    exp_vec = np.arange(-log_cond/4.0, log_cond * (dim + 1)/(4 * (dim - 1)),\
                                                                    log_cond/(2.0*(dim-1)))
    D = np.diag(np.exp(exp_vec[:dim]))
    U, _ = np.linalg.qr((np.random.rand(dim,dim) - 5.0) * 200)
    V, _ = np.linalg.qr((np.random.rand(dim,dim) - 5.0) * 200)
    P = U @ D @ V.T
    P = P @ P.T
    #
    seed = np.random.normal(0.0, 1.0, (dim,dim))
    O, _ = np.linalg.qr(seed, mode='complete')
    if np.linalg.det(O) < 0:
        O = O @ np.diag([1]*(dim-1)+[-1])
    #
    M = P @ O
    #
    return M

In [4]:
#
# Detecting nearest neighbours between two unlabelled point coluds X and Y
#
# Assumption: cardinality of X <= cardinality of Y 
#
# Input: point clouds X, Y
#
# Output: indices ind such that X <--> Y[ind] are nearest neighbours
#
#
def nearest_neighbours(X, Y):
    #
    nx = X.shape[1]
    ny = Y.shape[1]
    #
    assert (nx <= ny)
    #
    tree = sp.KDTree(Y.T, leafsize=10, compact_nodes=True, copy_data=True, balanced_tree=True)
    #
    matching = []
    #
    for i in range(nx):
        _, ind = tree.query(X.T[i], k=1, p=2, workers=-1)
        matching += [ind]
    #    
    return matching

In [5]:
#
# Running sample tests with visual output on several point clouds
#
# Number of Laplacian eigenvectors used is num_laplacian = 3
#
# Condition number of random linear map used is cond = 3
#
file_names = ["Teapot.csv", "Bunny.csv", "Cow.csv"]
#
sigma = 0.005 # multiplicative noise level
level = 0.95 # discrepancy level
num_laplacian = 6 # number of Laplacian eigenvectors (asserted >= dim)
cond = 3.0 # condition number of random linear map
#
for name in file_names:
    #
    X0 = []
    #
    f = open(name)
    reader = csv.reader(f)
    #
    for line in reader:
        X0 += [[RDF(v) for v in line]]
    #
    f.close()
    #
    print("File read: {}".format(name))
    #
    X0 = np.array(X0).T
    X0 = barycentered(X0)
    dim, num_y = X0.shape
    #
    num_x = int(level * num_y)
    ind = np.random.default_rng().choice(num_y, size=num_x, replace=False)
    ind = sorted(ind)
    X = X0.T[ind].T
    X = barycentered(X)
    #
    assert(num_x <= num_y)
    #
    dX = [X0.T[i] for i in range(num_y) if i not in ind]
    dX = np.array(dX).T
    disc_X = np.linalg.norm(dX, 2) / np.linalg.norm(X0, 2)
    #
    print("Dimension = {}, number of points: {} vs {}".format(dim, num_x, num_y))
    print("Relative discrepancy = {}".format(disc_X))
    #
    L = rand_mat_cond(dim, cond)
    #
    S  = np.random.default_rng().permutation(np.identity(num_y))
    #
    N  = np.random.normal(1, sigma, (dim, num_y))
    #
    Y0 = L @ X0 @ S
    Y  = N * Y0
    Y  = barycentered(Y)
    #
    noise = np.linalg.norm(Y - Y0, 2) / np.linalg.norm(Y0, 2)
    print("Noise = {}".format(noise))
    #
    assert num_laplacian >= dim
    #
    _, _, Vx = np.linalg.svd(X)
    Gx = sp.distance_matrix(Vx[:dim].T, Vx[:dim].T, p=2)
    Lx = np.diag(Gx @ np.array([1]*num_x)) - Gx
    _, Sigmax, Ex = np.linalg.svd(Lx)
    Ex = Ex[:num_laplacian]
    #
    _, _, Vy = np.linalg.svd(Y)
    Gy = sp.distance_matrix(Vy[:dim].T, Vy[:dim].T, p=2)
    Ly = np.diag(Gy @ np.array([1]*num_y)) - Gy
    _, Sigmay, Ey = np.linalg.svd(Ly)
    Ey = Ey[:num_laplacian]
    #
    W = MatrixGroup([matrix.diagonal(d) for d in Permutations([-1]+[1]*(num_laplacian-1))])
    W = [np.array(matrix(m)) for m in W]
    #
    sols = []
    for w in W:
        #
        wEx = w @ Ex
        #
        col_ind = nearest_neighbours(wEx, Ey)
        #
        L0 = Y[:,col_ind] @ X.T @ np.linalg.inv(X @ X.T)
        #
        err_L = np.linalg.norm(L0 - L, 2) / np.linalg.norm(L, 2)
        #
        sols += [(err_L, L0, w)]
    #
    sols = sorted(sols, key=lambda x: x[0])
    #
    err_L, L0, w = sols[0]
    #
    print("Relative error in L = {}".format(err_L))
    #    
    err_Y = np.linalg.norm(L @ X - L0 @ X, 2) / np.linalg.norm(L @ X, 2)
    #
    print("Relative error in matching images err_Y = {}".format(err_Y))
    #
    X_pre = np.linalg.inv(L0) @ Y
    #
    img  = point3d(X.T, color='yellow', size=2)
    img += point3d(X_pre.T, color='red', size=2)
    img += point3d(dX.T, color='blue', size=4)
    img.plot().show()

File read: Teapot.csv
Dimension = 3, number of points: 333 vs 351
Relative discrepancy = 0.24368216105033355
Noise = 0.0040208935989047455
Relative error in L = 0.0013139607329335857
Relative error in matching images err_Y = 0.0013794514221132017


File read: Bunny.csv
Dimension = 3, number of points: 501 vs 528
Relative discrepancy = 0.2100054911008819
Noise = 0.004233141528933814
Relative error in L = 0.4882406454593611
Relative error in matching images err_Y = 0.3631046266699846


File read: Cow.csv
Dimension = 3, number of points: 571 vs 602
Relative discrepancy = 0.24978500620569127
Noise = 0.004654849319475167
Relative error in L = 0.3274408446969057
Relative error in matching images err_Y = 0.46332946409417347


In [None]:
#
#
# Running a test with given parameters on a given point cloud X0 with X0.shape = (d, n)
# where d = dimension 3, n = number of points
#
# creates two clouds X = X0 with int(level*n) points and Y = L*X*S with L a random
# linear map with condition number cond, S a random permutation from Sym(n). 
# 
# Normal multiplicative noise N(1, sigma^2) is added to Y by way of Hadamard product of 
# Y with matrix N having i.i.d. entries from N(1, sigma^2)
# 
# Parameters:
#
# sigma = normal multiplicative noise N(1, sigma^2), sigma \in [0, +\infty]
#
# level = discrepancy level for X and Y where Y has n points, 
# and X has int(n*level) points; level \in [0,  1]
#
# num_laplacian = number of Laplacian eigenvectors used to recover S; 
# asserted num_laplacian >= d (= dimension)
#
# cond = condition number of random linear map L; cond \in [1, \infty]
#
# verbose = verbose flag for additional info / debugging 
#
# Output: 
#
# noise = normalised noise in Y = ||Y - Y0|| / ||Y0||, where Y0 = L*X*S w/o noise
#
# disc_X = point discrepancy in X = ||dX|| / ||X0||, where dX = points in X0 missing from X
#
# err_L = relative error in recovering L = ||L0 - L|| / ||L||, where L0 = recovered map;
# L0 is recovered by least squares after S has been obtained via rFAQ voting
#
# err_Y = relative error in matching images = points of X under the action of L and L0 =
# = ||L0*X - L*X|| / ||L*X||
#
#
def test_point_cloud(X0, sigma=0.05, level=1.0, num_laplacian=None, cond=3.0, verbose=False):
    #
    X0 = barycentered(X0)
    dim, num_y = X0.shape
    #
    num_x = int(level * num_y)
    ind = np.random.default_rng().choice(num_y, size=num_x, replace=False)
    ind = sorted(ind)
    X = X0.T[ind].T
    X = barycentered(X)
    #
    assert(num_x <= num_y)
    #
    dX = [X0.T[i] for i in range(num_y) if i not in ind]
    dX = np.array(dX).T
    disc_X = np.linalg.norm(dX, 2) / np.linalg.norm(X0, 2)
    #
    L = rand_mat_cond(dim, cond)
    #
    S  = np.random.default_rng().permutation(np.identity(num_y))
    #
    N  = np.random.normal(1, sigma, (dim, num_y))
    #
    Y0 = L @ X0 @ S
    Y  = N * Y0
    Y  = barycentered(Y)
    #
    noise = np.linalg.norm(Y - Y0, 2) / np.linalg.norm(Y0, 2)
    #
    if verbose:
        print("Dimension = {}, number of points: {} vs {}".format(dim, num_x, num_y))
        print("Relative discrepancy = {}".format(disc_X))
        print("Noise = {}".format(noise))
    #
    if num_laplacian == None:
        num_laplacian = dim
    #
    assert num_laplacian >= dim
    #
    _, _, Vx = np.linalg.svd(X)
    Gx = sp.distance_matrix(Vx[:dim].T, Vx[:dim].T, p=2)
    Lx = np.diag(Gx @ np.array([1]*num_x)) - Gx
    _, Sigmax, Ex = np.linalg.svd(Lx)
    Ex = Ex[:num_laplacian]
    #
    _, _, Vy = np.linalg.svd(Y)
    Gy = sp.distance_matrix(Vy[:dim].T, Vy[:dim].T, p=2)
    Ly = np.diag(Gy @ np.array([1]*num_y)) - Gy
    _, Sigmay, Ey = np.linalg.svd(Ly)
    Ey = Ey[:num_laplacian]
    #
    W = MatrixGroup([matrix.diagonal(d) for d in Permutations([-1]+[1]*(num_laplacian-1))])
    W = [np.array(matrix(m)) for m in W]
    #
    sols = []
    for w in W:
        #
        wEx = w @ Ex
        #
        col_ind = nearest_neighbours(wEx, Ey)
        #
        L0 = Y[:,col_ind] @ X.T @ np.linalg.inv(X @ X.T)
        #
        err_L = np.linalg.norm(L0 - L, 2) / np.linalg.norm(L, 2)
        #
        sols += [(err_L, L0, w)]
    #
    sols = sorted(sols, key=lambda x: x[0])
    #
    err_L, L0, w = sols[0]
    #    
    err_Y = np.linalg.norm(L @ X - L0 @ X, 2) / np.linalg.norm(L @ X, 2)
    #
    if verbose:
        print("Relative error in L = {}".format(err_L))
        print("Relative error in matching images err_Y = {}".format(err_Y))
    #
    return noise, disc_X, err_L, err_Y

In [None]:
#
# Running a batch of num_tests on a given point cloud X0 (parameters described above)
#
# Collecting some statistics (average noise, discrepancy, err_L, err_Y)
#
def run_tests_point_cloud(X0, sigma=0.05, level=1.0, num_laplacian=None, cond=3.0, num_tests=100, verbose=False):
    #
    array_noise  = []
    array_disc_X = []
    array_err_L  = []
    array_err_Y  = []
    #
    for _ in range(num_tests):
        noise, disc_X, err_L, err_Y = test_point_cloud(X0, sigma, level, num_laplacian,\
                                                       cond, verbose)
        array_noise  += [noise]
        array_disc_X += [disc_X]
        array_err_L  += [err_L]
        array_err_Y  += [err_Y]
    #
    array_noise  = np.array(array_noise)
    array_disc_X = np.array(array_disc_X)
    array_err_L  = np.array(array_err_L)
    array_err_Y  = np.array(array_err_Y)
    #
    mean_noise  = array_noise.mean()
    mean_disc_X = array_disc_X.mean()
    mean_err_L  = array_err_L.mean()
    mean_err_Y  = array_err_Y.mean()
    #
    return mean_noise, mean_disc_X, mean_err_L, mean_err_Y

In [None]:
#
# Values of sigma and level for tests
#
sigma_array = [0.0, 0.01, 0.05, 0.1, 0.15, 0.2]
level_array = [1.0, 0.95, 0.90, 0.85, 0.8, 0.7, 0.6, 0.5]

In [None]:
#
# Running tests with num_laplacian = 3 (default) and collecting statistics
#
file_names = ["Teapot.csv", "Bunny.csv", "Cow.csv"]
file_stats = []
#
_num_laplacian = 3 # number of Laplacian eigenvectors (asserted >= dim)
_cond = 3.0 # condition number of random linear map
_num_tests = 10 # number of tests in a batch (for a single file)
#
for name in file_names:
    #
    X0 = []
    #
    f = open(name)
    reader = csv.reader(f)
    #
    for line in reader:
        X0 += [[RDF(v) for v in line]]
    #
    f.close()
    #
    print("File read: {}".format(name))
    #
    X0 = np.array(X0).T
    #
    stats = []
    #
    for sigma in sigma_array:
        for level in level_array:
            stat_noise, stat_disc_X, stat_err_L, stat_err_Y = \
            run_tests_point_cloud(X0, sigma, level, num_laplacian=_num_laplacian,\
                                  cond=_cond, num_tests=_num_tests, verbose=False)
            stats += [(sigma, level, stat_noise, stat_disc_X, stat_err_L, stat_err_Y)]
    #
    print("Stats computed ...")
    #
    file_stats += [(name, stats)]
    #

In [None]:
#
# Saving stats in files
#
for data in file_stats:
    name = data[0]
    stats = data[1]
    f = open('GrassGraph_stats_'+name, 'w')
    writer = csv.writer(f)
    for s in stats:
        writer.writerow(s)
    f.close()

In [None]:
#
# Running tests with num_laplacian = 10 (> x3 increase) and collecting statistics
#
# WARNING: runtime may be long!
#
file_names = ["Teapot.csv", "Bunny.csv", "Cow.csv"]
file_stats_mod = []
#
_num_laplacian = 10 # number of Laplacian eigenvectors (asserted >= dim)
_cond = 3.0 # condition number of random linear map
_num_tests = 10 # number of tests in a batch (for a single file)
#
for name in file_names:
    #
    X0 = []
    #
    f = open(name)
    reader = csv.reader(f)
    #
    for line in reader:
        X0 += [[RDF(v) for v in line]]
    #
    f.close()
    #
    print("File read: {}".format(name))
    #
    X0 = np.array(X0).T
    #
    stats = []
    #
    for sigma in sigma_array:
        for level in level_array:
            stat_noise, stat_disc_X, stat_err_L, stat_err_Y = \
            run_tests_point_cloud(X0, sigma, level, num_laplacian=_num_laplacian,\
                                  cond=_cond, num_tests=_num_tests, verbose=False)
            stats += [(sigma, level, stat_noise, stat_disc_X, stat_err_L, stat_err_Y)]
    #
    print("Stats computed ...")
    #
    file_stats_mod += [(name, stats)]
    #

In [None]:
#
# Saving stats in files 
#
for data in file_stats_mod:
    name = data[0]
    stats = data[1]
    f = open('GrassGraph_stats_mod_'+name, 'w')
    writer = csv.writer(f)
    for s in stats:
        writer.writerow(s)
    f.close()