In [1]:
import tensorflow as tf
from tensorflow.keras import Model, layers
import ot
import numpy as np
from sinkhorn import *

In [2]:
class InvarianceTestGraph(Model, layers.Layer):
    
    def __init__(self):
        super(InvarianceTestGraph, self).__init__()
        
        self.weight = {'weight1': self.add_weight(shape=(64, 32), initializer='random_normal', trainable=True), 
                        'weight2': self.add_weight(shape=(32, 16), initializer='random_normal', trainable=True), 
                        'weight3': self.add_weight(shape=(16, 6), initializer='random_normal', trainable=True),
                        'weight_final': self.add_weight(shape = (6,2), initializer = 'random_normal', trainable = True)}
        
        self.bias = {'bias1': self.add_weight(shape=(32, ), initializer='random_normal', trainable=True), 
                        'bias2': self.add_weight(shape=(16, ), initializer='random_normal', trainable=True), 
                        'bias3': self.add_weight(shape=(6, ), initializer='random_normal', trainable=True),
                        'bias_final0': self.add_weight(shape = (2,), initializer = 'random_normal', trainable = True), 
                        'bias_final1': self.add_weight(shape = (2,), initializer = 'random_normal', trainable = True)}
        
    def invariantMap(self, x):
        out = tf.nn.sigmoid(tf.add(tf.matmul(x, self.weight['weight1']), self.bias['bias1']))
        out = tf.nn.sigmoid(tf.add(tf.matmul(out, self.weight['weight2']), self.bias['bias2']))
        out = tf.nn.sigmoid(tf.add(tf.matmul(out, self.weight['weight3']), self.bias['bias3']))
        return out


    def call(self, x, env = 0):
        out = self.invariantMap(x)
        if env == 0:
            out = tf.add(tf.matmul(out, self.weight['weight_final']), self.bias['bias_final0'])
        else:
            out = tf.add(tf.matmul(out, self.weight['weight_final']), self.bias['bias_final1'])
        return out
    
inv = InvarianceTestGraph()

In [3]:
y0 = np.random.binomial(1, 0.5, (1200,))
y1 = np.random.binomial(1, 0.8, (1500,))
f = lambda y: np.random.normal(1, 1, (64,)) if y else np.random.normal(0, 1, (64,))
x0 = [f(y) for y in y0]
x1 = [f(y) for y in y1]
y0 = tf.one_hot(y0, 2)
y1 = tf.one_hot(y1, 2)
x0 = tf.cast(x0, dtype=tf.float32)
x1 = tf.cast(x1, dtype = tf.float32)
batch0 = tf.data.Dataset.from_tensor_slices((x0, y0))
batch1 = tf.data.Dataset.from_tensor_slices((x1, y1))
batch0 = batch0.repeat().shuffle(5000).batch(200).prefetch(1)
batch1 = batch1.repeat().shuffle(5000).batch(200).prefetch(1)


learningRate = 0.01
optimizer = tf.optimizers.Adam(learningRate)

In [5]:
for step, ((bx0, by0), (bx1, by1)) in enumerate(zip(batch0.take(120), batch1.take(150)), 1):
    with tf.GradientTape() as g:
        loss = tf.nn.softmax_cross_entropy_with_logits(by0, inv(bx0, 0))
        loss = loss + tf.nn.softmax_cross_entropy_with_logits(by1, inv(bx1, 1))
        loss = loss + sinkhorn_dist(bx0, bx1, 0.1, 5)
        
    trainable_variables = inv.trainable_variables
    gradients = g.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

In [4]:
x = np.random.normal(0, 1, (100,64))
y = np.random.normal(0, 1, (100,64))
x = tf.cast(x, dtype=tf.float32)
y = tf.cast(y, dtype=tf.float32)

In [35]:
def WDist(x, y, reg):
    """
    :param x: (na, 2)
    :param y: (nb, 2)
    :return:
    """
    nx = x.shape[0]
    ny = y.shape[0]
    histX = np.ones((nx,))/nx
    histX = tf.cast(histX, dtype = tf.float32)
    histY = np.ones((ny,))/ny
    histY = tf.cast(histY, dtype = tf.float32)
    mmp1 = tf.tile(tf.expand_dims(x, axis=1), [1, y.shape[0], 1])  # (na, nb, 2)
    mmp2 = tf.tile(tf.expand_dims(y, axis=0), [x.shape[0], 1, 1])  # (na, nb, 2)

    mm = tf.sqrt(tf.reduce_sum(tf.square(tf.subtract(mmp1, mmp2)), axis=2))  # (na, nb)
    mm = tf.cast(mm, dtype = tf.float32)
    T = ot.sinkhorn(histX, histY, mm, reg)
    
    
    return np.sum(T*mm.numpy())

In [5]:
with tf.GradientTape() as g:
    x_new, y_new = inv.invariantMap(x), inv.invariantMap(y)
    M = dist(x_new, y_new)
    a = tf.cast(np.ones((x_new.shape[0],))/x_new.shape[0], dtype = tf.float32)
    b = tf.cast(np.ones((y_new.shape[0],))/y_new.shape[0], dtype = tf.float32)
    loss = sinkhorn(a, b, M, 0.1, 5)
    
trainable_variables = inv.trainable_variables
gradients = g.gradient(loss, trainable_variables)


In [6]:
gradients

[<tf.Tensor: id=40174, shape=(64, 32), dtype=float32, numpy=
 array([[-1.1028768 ,  0.2919299 ,  0.37407345, ...,  0.416089  ,
         -1.0677972 , -1.068077  ],
        [-0.15632856, -0.03562798,  0.027945  , ..., -0.05432164,
         -0.01674667,  0.16921844],
        [ 2.8320374 , -0.66990876, -0.7832186 , ..., -0.89114267,
          2.2387261 ,  2.3875136 ],
        ...,
        [-0.63977957,  0.08149122,  0.12310924, ...,  0.15920326,
         -0.497426  , -0.30283564],
        [ 0.63410676, -0.21728165, -0.20855008, ..., -0.27412713,
          0.6971822 ,  0.66742235],
        [ 1.0892975 , -0.23141158, -0.26879767, ..., -0.3467461 ,
          1.0523837 ,  0.98998976]], dtype=float32)>,
 <tf.Tensor: id=40175, shape=(32, 16), dtype=float32, numpy=
 array([[ 1.145144  , -1.1650627 ,  0.9422933 ,  1.064434  , -1.4959941 ,
         -2.0559068 ,  1.1060169 ,  0.9993057 , -2.2822943 ,  1.1701638 ,
          1.0945299 ,  1.1645286 ,  1.1156363 ,  1.0858866 ,  1.1511474 ,
         -1.6

In [36]:
M = WDist(x, y, 1)

In [37]:
M

10.65166292193037

In [2]:
a = [[1, 2], [3,2], [2,4]]
a = tf.cast(a, dtype = tf.float32)

In [7]:
tf.argmax(a, axis = 1)

<tf.Tensor: id=15, shape=(3,), dtype=int64, numpy=array([1, 0, 1], dtype=int64)>