In [32]:
import numpy as np
import matplotlib.pyplot as pl
import ot
import ot.plot

import os
import json

In [33]:
# SANDBOX

#print(os.getcwd())

#newdirpath = os.path.join(os.getcwd(), "NewFolder")
#os.mkdir(newdirpath)


In [34]:
# (NOT NEEDED NOW) calculates barycentric projection of OT map
#
# x_ref, x_tar : (num,dim) matrices of the reference and target points
# p_ref, p_tar : (num,) probability vectors
# C : ground cost (num_ref,num_tar) matrix

def calc_OT_map(x_ref, x_tar, p_ref=None,p_tar=None,
                C=None, sinkhorn=False, lambd=1, normalize_T=True):
    
    num_ref, dim_ref = x_ref.shape              # number of points, and
    num_tar, dim_tar = x_tar.shape              # dimension of those points
    if p_ref is None:
        p_ref = np.ones((num_ref,))/num_ref     # if not given, assume the
    if p_tar is None:                           # probability density is uniform
        p_tar = np.ones((num_tar,))/num_tar
    
    if C is None:
        assert dim_ref == dim_tar
        C = ot.dist(x_ref, x_tar)               # ground cost is square distance
    
    # G : (num_ref,num_tar) matrix encoding of OT map
    if sinkhorn:
        G = ot.sinkhorn(p_ref, p_tar, C, lambd)
    else:
        G = ot.emd(p_ref, p_tar, C)
    
    
    Gstochastic = G/G.sum(axis=1)[:,None]       # row-normalized G
    T = Gstochastic @ x_tar                     # barycentric projection of G
    if normalize_T:
        T = T/np.sqrt(dim_ref)                  
    return T

In [35]:
# compute_cost

# returns :
    # C : (num_ref,num_tar) ground cost matrix

# parameters:
    # x_ref : (num_ref,dim_ref) num_ref reference points of dimension dim_ref
    # x_tar : (num_tar,dim_tar) num_tar target points of dimension dim_tar
        # if None then x_tar = x_ref
    # metric : method to use to compute costs, e.g.
        # "square_euclidean" , "euclidean" , "discrete" 

def compute_cost(x_ref, x_tar = None, metric = "square_euclidean"):
    
    if x_tar is None:
        x_tar = x_ref

    assert x_ref.shape[1] == x_tar.shape[1]
    
    # c(x,y) = ||x-y||_2^2
    if metric == "square_euclidean":
        C = ot.dist(x_ref, x_tar)
    
    # c(x,y) = ||x-y||_2
    elif metric == "euclidean":
        C = ot.dist(x_ref, x_tar, 'euclidean')
    
    # c(x,y) = {1 if x != y},{0 if x == y}
    elif metric == "discrete":
        num_ref = x_ref.shape[0]
        num_tar = x_tar.shape[0]
        assert num_ref == num_tar
        C = np.ones((num_ref, num_tar)) - np.identity(num_ref)
    
    return C

In [36]:
# create_plan

# returns :
    # G : (num_ref,num_tar) encoding of OT plan

# parameters :
    # p_ref : (num_ref,) reference weights
    # p_tar : (num_ref,) target weights
    # C : (num_ref,num_tar) cost matrix
    # sinkhorn : bool whether to use sinkhorn
    # beta : regularization parameter

def create_plan(p_ref, p_tar, C, sinkhorn=False, beta = 0.01):
    
    if sinkhorn:
        G = ot.sinkhorn(p_ref, p_tar, C, beta)
    else:
        G = ot.emd(p_ref, p_tar, C)
    
    return G

In [37]:
# compute_barycentric_map

# returns :
    # T : (num_ref, num_tar) barycentric projection map

# parameters :
    # G : (num_ref, num_tar) encoding of OT plan
    # x_tar : (num_tar,dim_tar) num_tar target points of dimension dim_tar

def compute_barycentric_map(G, x_tar):
    
    Gstochastic = G/G.sum(axis=1)[:,None]       # row-normalized G
    T = Gstochastic @ x_tar                     # barycentric projection of G
       
    return T

In [38]:
class LOT(object):
    def __init__(self, x_ref, x_tar, p_ref=None, p_tar=None, sinkhorn = False, beta = 0.01, metric = "square_euclidean"):
        self.np_attributes = ['x_ref','x_tar','p_ref','p_tar', 'cost','plan','ot_map']
        self.dict_attributes = ['sinkhorn','beta','metric']

        self.x_ref = x_ref
        self.x_tar = x_tar

        self.num_ref, self.dim_ref = x_ref.shape
        self.num_tar, self.dim_tar = x_tar.shape
        if p_ref is None:
            self.p_ref = np.ones((self.num_ref,))/self.num_ref
        else:
            self.p_ref = p_ref
        if p_tar is None:
            self.p_tar = np.ones((self.num_tar,))/self.num_tar
        else:
            self.p_tar = p_tar
        
        self.sinkhorn = sinkhorn
        self.beta = beta
        self.metric = metric
        self.cost = None
        self.plan = None
        self.ot_map = None

        

    def calc_ot_map(self,):
        self.cost = compute_cost(self.x_ref, self.x_tar, self.metric)
        print(self.cost)
        self.plan = create_plan(self.p_ref, self.p_tar, self.cost, self.sinkhorn, self.beta)
        print(self.plan)
        self.ot_map = compute_barycentric_map(self.plan, self.x_tar)



In [46]:
# TESTING

ref = np.array([[0],[1],[2]])
tar = ref
pr = np.array([3.,2.,1.])
pt = np.array([1.,2.,3.])

lot1 = LOT(ref, tar, pr, pt, False, 0.01)
print(lot1.x_ref)
print("X_TAR =",lot1.x_tar)
lot1.calc_ot_map()
print("MAP = ", lot1.ot_map)

[[0]
 [1]
 [2]]
X_TAR = [[0]
 [1]
 [2]]
[[0 1 4]
 [1 0 1]
 [4 1 0]]
[[1. 2. 0.]
 [0. 0. 2.]
 [0. 0. 1.]]
MAP =  [[0.66666667]
 [2.        ]
 [2.        ]]


In [64]:
# create_directory

# returns :
    # dir_path : str with path of new directory

# parameters :
    # base_path : str with path of base directory
    # dir_name : str with name of new directory

def create_directory(base_path, dir_name):
    dir_path = os.path.join(base_path, dir_name)
    try:
        os.mkdir(dir_path)
    except:
        #print("Directory Already Exists")
        pass
    return dir_path


In [65]:
# save_lot_object

# returns :
    # obj_path : str with path to object directory

# parameters :
    # obj : LOT object to be saved
    # dir_path : str with path to base directory
    # obj_name : name of object (will be name of directory with saved files)

def save_lot_object(obj, dir_path = None, obj_name = None):
    if dir_path is None:
        dir_path = os.getcwd()
    if obj_name is None:
        obj_name = "Unnamed_" + hex(np.random.randint(0,16**4))[2:]
    
    obj_path = create_directory(dir_path, obj_name)
    json_filepath = os.path.join(obj_path, "dict_attributes")
    npsv_filepath = os.path.join(obj_path, "npsv_attributes.npz")
    
    dictionary = {
        'sinkhorn': obj.sinkhorn,
        'beta': obj.beta,
        'metric': obj.metric
    }

    with open(json_filepath, 'w') as fp:
        json.dump(dictionary, fp)
    
    np.savez(npsv_filepath,
        x_ref = obj.x_ref, 
        x_tar = obj.x_tar,
        p_ref = obj.p_ref,
        p_tar = obj.p_tar,
        cost = obj.cost,
        plan = obj.plan,
        ot_map = obj.ot_map
    )
    
    return obj_path


In [None]:
# load_lot_object

# returns :
    # obj : LOT object with loaded attributes

# parameters :
    # obj_path : str with path to object directory

def load_lot_object(obj_path):
    dict_filepath = os.path.join(obj_path, "dict_attributes")
    npsv_filepath = os.path.join(obj_path, "npsv_attributes")

    with open(dict_filepath, 'r') as fp:
        dict_file = json.load(fp)
    npsv_file = np.load(npsv_filepath)

    sinkhorn = dict_file['sinkhorn']
    beta = dict_file['beta']
    metric = dict_file['metric']

    x_ref = npsv_file['x_ref']
    x_tar = npsv_file['x_tar']
    p_ref = npsv_file['p_ref']
    p_tar = npsv_file['p_tar']
    cost = npsv_file['cost']
    plan = npsv_file['plan']
    ot_map = npsv_file['ot_map']

    obj = LOT(x_ref, x_tar, p_ref, p_tar, sinkhorn, beta, metric)
    obj.cost = cost
    obj.plan = plan
    obj.ot_map = ot_map
    


In [66]:
path = os.path.join(os.getcwd(), "NewSandboxFolder")
obj_path = save_lot_object(lot1, path, "LOTObject1")