In [1]:
#
# importing the necessary libraries:
# NumPy, SciPy (optimization), 
# multiprocessing features for speed-ups, and also
# plotting functions, and a handler for csv files
#
import numpy as np
import scipy.optimize as opt
import multiprocess as mp
import matplotlib.pyplot as plt
import csv

In [2]:
#
# Number of CPU cores
#
CPU_COUNT = mp.cpu_count()

In [3]:
#
# Centering a point cloud X
#
def barycentered(X):
    #
    bar = np.sum(X, axis=1)/X.shape[1]
    #
    return X - bar[:, np.newaxis]

In [4]:
#
# First constructing a random psd matrix with specified condition number,
# then multiplying it by a random orthogonal matrix 
#
# Bierlaire, M., Toint, P., and Tuyttens, D. (1991). 
# On iterative algorithms for linear ls problems with bound constraints. 
# Linear Algebra and Its Applications, 143, 111–143.
#
def rand_mat_cond(dim=3, cond=5.0):
    #
    log_cond = np.log(cond)
    exp_vec = np.arange(-log_cond/4.0, log_cond * (dim + 1)/(4 * (dim - 1)),\
                                                                    log_cond/(2.0*(dim-1)))
    D = np.diag(np.exp(exp_vec[:dim]))
    U, _ = np.linalg.qr((np.random.rand(dim,dim) - 5.0) * 200)
    V, _ = np.linalg.qr((np.random.rand(dim,dim) - 5.0) * 200)
    P = U @ D @ V.T
    P = P @ P.T
    #
    seed = np.random.normal(0.0, 1.0, (dim,dim))
    O, _ = np.linalg.qr(seed, mode='complete')
    if np.linalg.det(O) < 0:
        O = O @ np.diag([1]*(dim-1)+[-1])
    #
    M = P @ O
    #
    return M

In [5]:
#
# Orthogonal projection onto the orthogonal complement of ker X: R^n -> R^d 
# Generically dim ker X = n-d, so that dim ker X^\perp = d
#
def ortho_proj_ker(X):
    #
    Ux, Sigmax, Vx = np.linalg.svd(X, full_matrices=False)
    proj = Vx.T @ Vx
    #
    return proj

In [15]:
#
# Creating some pics: the initial point cloud X and its image Y 
# under a random linear transformation L (with condition number = cond).
# The feature correspondence is 
#
file_names = ["Teapot.csv", "Bunny.csv", "Cow.csv"]
#
cond   = 4.0  # condition number of random linear map
#
for name in file_names:
    #
    f = open(name)
    reader = csv.reader(f)
    #
    X = []
    #
    for line in reader:
        X += [[RDF(v) for v in line]]
    #
    f.close()
    #
    print("File read: {}".format(name))
    #
    X = np.array(X).T
    X = barycentered(X)
    #
    dim, num = X.shape
    L = rand_mat_cond(dim, cond)
    #
    shift = np.array([1.0,.0,.0])
    Y = L @ X
    Y = barycentered(Y) + shift[:, np.newaxis]
    #
    img  = point3d(X.T, color='blue', size=4)
    img += point3d(Y.T, color='red',  size=4)
    #
    lines = []
    for i in range(num):
        l = line3d([(X[0, i], X[1, i], X[2, i]), (Y[0, i], Y[1, i], Y[2, i])], color='grey')
        lines += [l]
    #
    img += sum(lines)
    img.plot().show()

File read: Teapot.csv


File read: Bunny.csv


File read: Cow.csv


In [6]:
#
# Creating some pics for (multiplicative) noise with sigma=0.05, discrepancy level=0.90
#
file_names = ["Teapot.csv", "Bunny.csv", "Cow.csv"]
#
sigma  = 0.05 # multiplicative noise
level  = 0.90 # discrepancy level (missing or added points)
cond   = 3.0  # condition number of random linear map
n_iter = 2**10 # number of trials for probabilistic voting (number of "votes")
#
for name in file_names:
    #
    X0 = []
    #
    f = open(name)
    reader = csv.reader(f)
    #
    for line in reader:
        X0 += [[RDF(v) for v in line]]
    #
    f.close()
    #
    print("File read: {}".format(name))
    #
    X0 = np.array(X0).T
    X0 = barycentered(X0)
    dim, num_y = X0.shape
    #
    num_x = int(level * num_y)
    ind = np.random.default_rng().choice(num_y, size=num_x, replace=False)
    X = X0[:, ind]
    X = barycentered(X)
    #
    assert(num_x <= num_y)
    #
    dX = X0[:, np.setdiff1d(np.arange(num_y), ind)]
    #
    if dX.shape[1]>0:
        disc_X = np.linalg.norm(dX, 2) / np.linalg.norm(X0, 2)
    else:
        disc_X = 0.0
    #
    print("Dimension = {}, number of points: {} vs {}".format(dim, num_x, num_y))
    print("Relative discrepancy = {}".format(disc_X))
    #
    I = np.identity(num_y)
    #
    L = rand_mat_cond(dim, cond)
    #
    S  = np.random.default_rng().permutation(I)
    #
    N  = np.random.normal(1, sigma, (dim, num_y))
    #
    Y0 = L @ X0 @ S
    Y  = N * Y0
    Y  = barycentered(Y)
    #
    noise = np.linalg.norm(Y - Y0, 2) / np.linalg.norm(Y0, 2)
    print("Noise = {}".format(noise))
    #
    Px = ortho_proj_ker(X)
    o  = np.zeros((num_y-num_x, num_y-num_x))
    oo = np.zeros((num_y-num_x, num_x))
    Px = np.block([[Px, oo.T],[oo, o]])
    Py = ortho_proj_ker(Y)
    #
    def f_opt(i):
        np.random.seed(i) # reseed to avoid races for the RNG
        sol = opt.quadratic_assignment(Px, Py, method="faq", options = {'maximize':True, 'P0':'randomized', 'tol':1e-3})
        ind = sol['col_ind']
        s = I[ind]
        weight = np.exp(-1000.0*(np.trace(Px @ s @ Py @ s.T) - dim)**2)
        return weight, s, i
    #
    with mp.Pool(CPU_COUNT, maxtasksperchild=int(100)) as pool:
        vals = pool.map(f_opt, range(n_iter))
    #    
    mat = sum([v[0]*v[1] for v in vals])
    #
    row_ind, col_ind = opt.linear_sum_assignment(mat, maximize=True)
    #
    L0 = Y[:, col_ind][:, :num_x] @ X.T @ np.linalg.inv(X @ X.T)
    #
    err_L = np.linalg.norm(L0 - L, 2) / np.linalg.norm(L, 2)
    #
    print("Relative error in L = err_L = {}".format(err_L))
    #
    err_Y = np.linalg.norm(L @ X - L0 @ X, 2) / np.linalg.norm(L @ X, 2)
    #
    print("Relative error in matching images err_Y = {}".format(err_Y))
    #
    X_pre = np.linalg.inv(L0) @ Y[:, col_ind][:, :num_x]    
    err_X = np.linalg.norm(X - X_pre, 2) / np.linalg.norm(X, 2)
    #
    print("Relative error in matching inverse images err_X = {}".format(err_X))
    #
    print("err_X : err_Y = {} ~ {} = cond(L)".format(err_X / err_Y, cond))
    #
    img  = point3d(X.T, color='yellow', size=2)
    img += point3d(X_pre.T, color='red', size=2)
    img += point3d(dX.T, color='blue', size=4)
    img.plot().show()

File read: Teapot.csv
Dimension = 3, number of points: 315 vs 351
Relative discrepancy = 0.2956647830046439
Noise = 0.0385695537404892
Relative error in L = err_L = 0.07031077259152224
Relative error in matching images err_Y = 0.04648797044636394
Relative error in matching inverse images err_X = 0.1289197977672535
err_X : err_Y = 2.773186192673141 ~ 3.00000000000000 = cond(L)


File read: Bunny.csv
Dimension = 3, number of points: 475 vs 528
Relative discrepancy = 0.35581251107246287
Noise = 0.03798139207175843
Relative error in L = err_L = 0.04759479031250942
Relative error in matching images err_Y = 0.04406468657513578
Relative error in matching inverse images err_X = 0.12775947901573725
err_X : err_Y = 2.8993620276384227 ~ 3.00000000000000 = cond(L)


File read: Cow.csv
Dimension = 3, number of points: 541 vs 602
Relative discrepancy = 0.32899419621986425
Noise = 0.044526679352103714
Relative error in L = err_L = 0.03280594014254386
Relative error in matching images err_Y = 0.030377894723089712
Relative error in matching inverse images err_X = 0.12303915812571338
err_X : err_Y = 4.05028588212841 ~ 3.00000000000000 = cond(L)


In [7]:
#
# Running a test with given parameters on a given point cloud X0 with X0.shape = (d, n)
# where d = dimension 3, n = number of points
#
# creates two clouds X = X0 with int(level*n) points and Y = L*X*S with L a random
# linear map with condition number cond, S a random permutation from Sym(n). 
# 
# Normal multiplicative noise N(1, sigma^2) is added to Y by way of Hadamard product of 
# Y with matrix N having i.i.d. entries from N(1, sigma^2)
# 
# Parameters:
#
# sigma = normal multiplicative noise N(1, sigma^2), sigma \in [0, +\infty]
#
# level = discrepancy level for X and Y where Y has n points, 
# and X has int(n*level) points; level \in [0,  1]
#
# cond = condition number of random linear map L; cond \in [1, \infty]
#
# n_iter = number of iterations of rFAQ = number of votes / trials for 
# recovering permutation S and assembling its "statistical" version
# that is projected to nearest permutation by the Hungarian algorithm
#
# verbose = verbose flag for additional info / debugging 
#
# Output: 
#
# noise = normalised noise in Y = ||Y - Y0|| / ||Y0||, where Y0 = L*X*S w/o noise
#
# disc_X = point discrepancy in X = ||dX|| / ||X0||, where dX = points in X0 missing from X
#
# err_L = relative error in recovering L = ||L0 - L|| / ||L||, where L0 = recovered map;
# L0 is recovered by least squares after S has been obtained via rFAQ voting
#
# err_Y = relative error in matching images = points of X under the action of L and L0 =
# = ||L0*X - L*X|| / ||L*X||; sometimes (e.g. in presence of symmetries, high levels of
# discrepancy and/or noise) L cannot be recovered thus err_L is large, but err_Y is still
# relatively small
#
# err_X = relative error in matching preimages = points of Y (those corresponding to some
# points in X) under the action of L^-1 and L0^-1 
#
def test_point_cloud(X0, sigma=0.01, level=1.0, cond=3.0, n_iter=2**9, verbose=False):
    #
    X0 = barycentered(X0)
    dim, num_y = X0.shape
    #
    num_x = int(level * num_y)
    ind = np.random.default_rng().choice(num_y, size=num_x, replace=False)
    X = X0[:, ind]
    X = barycentered(X)
    #
    assert(num_x <= num_y)
    #
    dX = X0[:, np.setdiff1d(np.arange(num_y), ind)]
    if dX.shape[1]>0:
        disc_X = np.linalg.norm(dX, 2) / np.linalg.norm(X0, 2)
    else:
        disc_X = .0
    #
    L = rand_mat_cond(dim, cond)
    #
    S  = np.random.default_rng().permutation(np.identity(num_y))
    #
    N  = np.random.normal(1, sigma, (dim, num_y))
    #
    Y0 = L @ X0 @ S
    Y  = N * Y0
    Y  = barycentered(Y)
    #
    noise = np.linalg.norm(Y - Y0, 2) / np.linalg.norm(Y0, 2)
    #
    Px = ortho_proj_ker(X)
    o  = np.zeros((num_y-num_x, num_y-num_x))
    oo = np.zeros((num_y-num_x, num_x))
    Px = np.block([[Px, oo.T],[oo, o]])
    Py = ortho_proj_ker(Y)
    I = np.identity(num_y)
    #
    def f_opt(i):
        np.random.seed(i) # reseed to avoid races for the RNG
        sol = opt.quadratic_assignment(Px, Py, method="faq", options = {'maximize':True,\
                                                            'P0':'randomized', 'tol':1e-3})
        ind = sol['col_ind']
        s = I[ind]
        weight = np.exp(-1000.0*(np.trace(Px @ s @ Py @ s.T) - dim)**2)
        return weight, s, i
    #
    with mp.Pool(CPU_COUNT, maxtasksperchild=int(100)) as pool:
        vals = pool.map(f_opt, range(n_iter))
    #    
    mat = sum([v[0]*v[1] for v in vals])
    #
    row_ind, col_ind = opt.linear_sum_assignment(mat, maximize=True)
    #
    L0 = Y[:, col_ind][:, :num_x] @ X.T @ np.linalg.inv(X @ X.T)
    #
    err_L = np.linalg.norm(L0 - L, 2) / np.linalg.norm(L, 2)
    #
    err_Y = np.linalg.norm(L @ X - L0 @ X, 2) / np.linalg.norm(L @ X, 2)   
    #
    X_pre = np.linalg.inv(L0) @ Y[:, col_ind][:, :num_x]    
    err_X = np.linalg.norm(X - X_pre, 2) / np.linalg.norm(X, 2)
    #
    if verbose:
        print("Dimension = {}, number of points: {} vs {}".format(dim, num_x, num_y))
        print("Noise = {}".format(noise))
        print("Relative discrepancy = {}".format(disc_X))
        print("Relative error in L = err_L = {}".format(err_L))
        print("Relative error in matching images err_Y = {}".format(err_Y))
        print("Relative error in matching inverse images err_X = {}".format(err_X))
        print("err_X : err_Y = {} ~ {} = cond(L)".format(err_X / err_Y, cond))
    #
    return noise, disc_X, err_L, err_Y, err_X

In [8]:
#
# Running a batch of num_tests on a given point cloud X0 (parameters described above)
#
# Collecting some statistics (average noise, discrepancy, err_L, err_Y, err_X)
#
def run_tests_point_cloud(X0, sigma=0.05, level=1.0, cond=3.0, n_iter=2**9, num_tests=100, verbose=False):
    #
    array_noise  = []
    array_disc_X = []
    array_err_L  = []
    array_err_Y  = []
    array_err_X  = []
    #
    for _ in range(num_tests):
        noise, disc_X, err_L, err_Y, err_X = test_point_cloud(X0, sigma, level, cond,\
                                                              n_iter, verbose)
        array_noise  += [noise]
        array_disc_X += [disc_X]
        array_err_L  += [err_L]
        array_err_Y  += [err_Y]
        array_err_X  += [err_X]
    #
    array_noise  = np.array(array_noise)
    array_disc_X = np.array(array_disc_X)
    array_err_L  = np.array(array_err_L)
    array_err_Y  = np.array(array_err_Y)
    array_err_X  = np.array(array_err_X)
    #
    mean_noise  = array_noise.mean()
    mean_disc_X = array_disc_X.mean()
    mean_err_L  = array_err_L.mean()
    mean_err_Y  = array_err_Y.mean()
    mean_err_X  = array_err_X.mean()
    #
    return mean_noise, mean_disc_X, mean_err_L, mean_err_Y, mean_err_X

In [9]:
#
# Values of sigma and level for tests
#
sigma_array = [0.0, 0.01, 0.05, 0.1, 0.15, 0.2]
level_array = [1.0, 0.95, 0.90, 0.85, 0.8, 0.7, 0.6, 0.5]

In [10]:
#
# Running tests and collecting statistics
#
# WARNING: runtime may be relatively long!
#
file_names = ["Teapot.csv", "Bunny.csv", "Cow.csv"]
file_stats = []
#
_cond = 3.0 # condition number of random linear map
_n_iter = 2**10 # number of trials for probabilistic voting (number of "votes")
_num_tests = 10 # number of tests in a batch (for a single file)
#
for name in file_names:
    #
    X0 = []
    #
    f = open(name)
    reader = csv.reader(f)
    #
    for line in reader:
        X0 += [[RDF(v) for v in line]]
    #
    f.close()
    #
    print("File read: {}".format(name))
    #
    X0 = np.array(X0).T
    #
    stats = []
    #
    for sigma in sigma_array:
        for level in level_array:
            stat_noise, stat_disc_X, stat_err_L, stat_err_Y, stat_err_X = \
            run_tests_point_cloud(X0, sigma, level, cond=_cond, n_iter=_n_iter,\
                                  num_tests=_num_tests, verbose=True)
            stats += [(sigma, level, stat_noise, stat_disc_X, stat_err_L, stat_err_Y, stat_err_X)]
    #
    print("Stats computed ...")
    #
    file_stats += [(name, stats)]
    #

File read: Teapot.csv
Stats computed ...
File read: Bunny.csv
Stats computed ...
File read: Cow.csv
Stats computed ...


In [11]:
#
# Saving stats in files
#
for data in file_stats:
    name = data[0]
    stats = data[1]
    f = open("roam_stats_{}_{}".format(_num_tests, name), 'w')
    writer = csv.writer(f)
    for s in stats:
        writer.writerow(s)
    f.close()