<a href="https://colab.research.google.com/github/rvinas/CIBERSORT/blob/master/CD_VAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tensorflow-probability==0.8.0rc0



In [2]:
import tensorflow as tf
import numpy as np

# tf.compat.v1.disable_eager_execution()
"""
from tensorflow.python import tf2
if not tf2.enabled():
    import tensorflow.compat.v2 as tf
    tf.enable_v2_behavior()
    assert tf2.enabled()
"""

import tensorflow_probability as tfp

tfk = tf.keras
tfkl = tf.keras.layers
tfpl = tfp.layers
tfd = tfp.distributions

if tf.test.gpu_device_name() != '/device:GPU:0':
    print('WARNING: GPU device not found.')
else:
    print('SUCCESS: Found GPU: {}'.format(tf.test.gpu_device_name()))

SUCCESS: Found GPU: /device:GPU:0


In [0]:
class GCN(tfkl.Layer):
    def __init__(self, units=32):
        super(GCN, self).__init__()
        self.units = units

    def build(self, input_shape):
        bs, nb_vars, dim = input_shape[0]

        self.w = self.add_weight(shape=(dim, self.units),
                              initializer='random_normal',
                              trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)
        super(GCN, self).build(input_shape)
    
    def call(self, inputs):
        h, adj = inputs

        # Compute normalized adjacency matrix
        nb_vars = adj.shape[0]
        adj_hat = adj + tf.linalg.diag(tf.ones(nb_vars))
        d_hat = tf.reduce_sum(adj_hat, axis=-1)  # Degree of each variable
        d_hat_sqrt = tf.linalg.diag(tf.sqrt(d_hat))
        norm_adj_hat = d_hat_sqrt @ adj_hat @ d_hat_sqrt

        # Perform graph convolutions
        h_t = tf.transpose(h, perm=(0, 2, 1))  # (bs, dim, nb_vars)
        norm_adj_hat_t = tf.transpose(norm_adj_hat, perm=(1, 0))  # Shape=(nb_vars, nb_vars)
        out = tf.transpose(h_t @ norm_adj_hat_t, perm=((0, 2, 1))) @ self.w + self.b

        return out

class Encoder(tfkl.Layer):
    def __init__(self):
        super(Encoder, self).__init__()

    def build(self, input_shape):
        self.loc = tfkl.Dense(nb_vars)
        self.scale = tfkl.Dense(nb_vars, activation=tf.nn.softplus)
        self._trainable_weights = [self.loc.trainable_weights, self.scale.trainable_weights]
        super(Encoder, self).build(input_shape)

    def call(self, x):
        return tfd.MultivariateNormalDiag(self.loc(x),
                                          self.scale(x))

class Decoder(tfkl.Layer):
    def __init__(self):
        super(Decoder, self).__init__()

    def build(self, input_shape):
        # Input shape: (bs, nb_vars)
        bs, nb_vars = input_shape
        p = self.add_weight(shape=(nb_vars, nb_vars),
                            initializer=lambda shape, dtype: tf.ones(shape, dtype=dtype)/2,
                            trainable=True)
        mask = 1 - tf.linalg.diag(tf.ones(nb_vars))
        self.p = p * mask
        self.f = GCN(2)
        super(Decoder, self).build(input_shape)

    def call(self, x, temperature = 0.2):
        a = tfd.RelaxedBernoulli(temperature, probs=self.p).sample()
        e = x[..., None]  # Noise terms for each variable. Shape=(bs, nb_vars, 1) 
        h = self.f([e, a])  # Shape=(bs, nb_vars, 2)
        loc = h[..., 0]
        scale = h[..., 1]
        out = tfd.MultivariateNormalDiag(loc, scale)
        return out

def _encoder(x, nb_vars):
    # x = tfkl.Input((nb_vars,))
    loc = tfkl.Dense(nb_vars)(x)
    scale = tfkl.Dense(nb_vars, activation=tf.nn.softplus)(x)
    z = tfd.MultivariateNormalDiag(loc, scale)
    # Consider using a tfpl.IndependentNormal distribution, since noise terms are assumed to be independent
    return z

def prior(nb_vars):
    loc = tf.zeros(nb_vars)
    scale = tf.ones(nb_vars)
    return tfd.MultivariateNormalDiag(loc, scale)

def _decoder(z, nb_vars, temperature = 0.2):
    # Sample adjacency matrix
    p = tf.Variable([[0.5] * nb_vars] * nb_vars)  # Probability of edge between pairs of variables
    mask = 1 - tf.linalg.diag(tf.ones(nb_vars))
    p = p * mask
    a = tfd.RelaxedBernoulli(temperature, probs=p)

    # Define GNN
    e = z[..., None]  # Noise terms for each variable. Shape=(bs, nb_vars, 1) 


In [4]:
nb_vars = 10
x_ = tf.placeholder(tf.float32, [None, nb_vars])
encoder = Encoder()(x_)  # Posterior approximation
code = encoder.sample()
decoder = Decoder()(code)

likelihood = decoder.log_prob(x_)
divergence = tfd.kl_divergence(encoder, prior(nb_vars))
elbo = tf.reduce_mean(likelihood - divergence)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


In [7]:
optimize = tf.train.AdamOptimizer(0.001).minimize(-elbo)

x = np.ones((32, nb_vars))

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for epoch in range(20):
        _, loss = sess.run([optimize, -elbo], {x_: x})
        print('Epoch: {}, loss: {}'.format(epoch, loss))

Epoch: 0, loss: 24643.818359375
Epoch: 1, loss: 127284.0859375
Epoch: 2, loss: 61146.0625
Epoch: 3, loss: 72896.46875
Epoch: 4, loss: 2335.203125
Epoch: 5, loss: 4472.8916015625
Epoch: 6, loss: 61876.86328125
Epoch: 7, loss: 5953.37841796875
Epoch: 8, loss: 2421.377685546875
Epoch: 9, loss: 263428.09375
Epoch: 10, loss: 1043436.3125
Epoch: 11, loss: 178079.8125
Epoch: 12, loss: 7622.06640625
Epoch: 13, loss: 7755.15185546875
Epoch: 14, loss: 5827.3466796875
Epoch: 15, loss: 3505.302001953125
Epoch: 16, loss: 14061.1181640625


KeyboardInterrupt: ignored

In [0]:
temperature = 0.2
nb_vars = 2
p = tf.Variable([[0.5] * nb_vars] * nb_vars)
dist = tfd.RelaxedBernoulli(temperature, probs=p)