### Hungarian Algorithm and Earth Mover’s Distance

Compute the bijective mapping using the Hungarian Algorithm as well as the Earth Mover's Distance between the two point clouds sampled from the last question.

Result: My implementation returned the same cost as calculated by the build-in scipy function scipy.optimize.linear_sum_assignment.

In [None]:
import pymesh
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances
from scipy.optimize import linear_sum_assignment

In [None]:
teapot = pymesh.load_mesh("sampled_teapot.obj")
violin_case = pymesh.load_mesh("sampled_violin_case.obj")

In [None]:
vertices_teapot = teapot.vertices[np.random.permutation(1000)[:500],:]
vertices_violin_case = violin_case.vertices[np.random.permutation(1000)[:500],:]

In [None]:
# compute the cost matrix
a = euclidean_distances(vertices_teapot,vertices_violin_case)

In [None]:
def subtract_mins(a):
    n = a.shape[0]
    a_ = np.copy(a)  
    # find min for each row and subtract it from every element in its row 
    r = np.zeros([n,n])
    r[:,:] = np.amin(a_,axis=1).reshape([n,1])
    a_ = a_-r
    # find min for each col and subtract it from every element in its col
    c = np.zeros([n,n])
    c[:,:] = np.amin(a_,axis=0).reshape([1,n])
    a_ = a_-c
    return a_

def find_assignment(b):
    n = b.shape[0]
    b_ = np.copy(b)
    ass = np.zeros([n,n])

    # iterate the rows, mark the cell if only one zero in the original row
    for i in range(n):
        if np.sum(b[i,:]) == 1:
            j = np.argmax(b[i,:])
            # mark it only when it has not been crossed out yet
            if b_[i,j] == 1:
                ass[i,j] = 1
                # cross out all zeros in the corresponding column
                b_[:,j] = 0
    
    # return if no zeros left
    if np.sum(b_) == 0:
        return ass

    # same procedure as iterating the rows but now iterating the columns
    for j in range(n):
        if np.sum(b[:,j]) == 1:
            i = np.argmax(b[:,j])
            if b_[i,j] == 1:
                ass[i,j] = 1
                b_[i,:] = 0

    return ass

def covering_lines(a):
    n = a.shape[0]
    b = np.zeros([n,n])
    # mark all zeros 1, all others 0
    b[a==0] = 1

    # find as many valid assignments as possible
    ass = find_assignment(b)
    if np.sum(ass) == n:
        return ass,[],[]

    c_marks = np.zeros(n) # column marks
    r_marks = np.zeros(n) # row marks
    r_marks[np.where(np.sum(ass,axis=1)==0)[0]] = 1 
    # mark all unassigned rows 

    for i in range(n):
        if r_marks[i] == 0:
            continue
        cidx = np.where(b[i,:]==1)[0]
        # columns having zeros in the marked row
        if len(cidx) > 0:
            c_marks[cidx] = 1 # mark those columns 
            ridx = np.where(np.sum(ass[:,cidx],axis=1)==1)[0] 
            # rows having assignments in the newly marked columns
            if len(ridx) > 0:
                r_marks[ridx] = 1 # mark those rows
                
    rows = np.where(r_marks==0)[0] # uncovered rows
    cols = np.where(c_marks==1)[0] # covered columns
    
    return ass,rows,cols

def shift_zeros(a,r,c):
    n = a.shape[0]
    a_ = np.copy(a)
    
    a_[r,:] = 0
    a_[:,c] = 0

    # find min of the uncovered elements
    v = a_.reshape([n*n])
    min_entry = np.min(v[np.where(v>0)[0]])
    a_[a_>0] = a_[a_>0]-min_entry
    
    a_[r,:] = a[r,:]
    a_[:,c] = a[:,c]
    
    # add min to the elements covered by two lines
    for i in range(len(r)):
        for j in range(len(c)):
            a_[r[i]][c[j]] += min_entry
    return a_

In [None]:
def munkres_assignment(a):
    # pre processing
    n = a.shape[0]
    a_ = subtract_mins(a)
    
    # find minimum covering lines for all zeros
    ass,rows,cols = covering_lines(a_)
    
    # testing for optimality
    while np.sum(ass) < n:
        a_ = shift_zeros(a_,rows,cols)
        ass,rows,cols = covering_lines(a_)

    cost = np.sum(a[ass==1])
    print([np.argmax(ass[i,:]) for i in range(n)])
    return cost

In [None]:
print(munkres_assignment(a))

In [None]:
# verify my implementation with built-in scipy function
r,c = linear_sum_assignment(a)
print(a[r,c].sum())
print(c)