# NICE
### Non-Linear Independent Components Estimation
### Paper: https://arxiv.org/pdf/1410.8516.pdf
### Implementation inspiration: https://github.com/DakshIdnani/pytorch-nice

## Import modules

In [None]:
from os import path
import numpy as np 
import pandas as pd

import tensorflow as tf 
from tensorflow import keras
import tensorflow_probability as tfp

from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
data_path = "mnist_784.csv"

## Load and prepare data

In [None]:
if path.exists(data_path):
    data = pd.read_csv(data_path)
    X, y = data.iloc[:,:-1].values/255, data.iloc[:,-1].values/255
else:
    X, y = fetch_openml('mnist_784', version=1, return_X_y=True)
    X, y = X/255, y/255

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

In [None]:
plt.imshow(X_train[0,:].reshape(28,28))
plt.show()

## NICE model

In [None]:
class ScaleLayer(tf.keras.layers.Layer):
    """Layer that performs scaling."""
    def __init__(self,dim):
        super(Scale, self).__init__()
        S_init = tf.random.normal(shape=(dim,),stddev=.1, dtype=tf.float32)
        self.S = tf.Variable(W_init,trainable=True)

    def call(self, h, inverse=False):
        if inverse == True:
            y = tf.multiply(tf.math.exp(-self.S),h)
            return y, _
        h = tf.multiply(tf.math.exp(self.S),h)
        return h, self.S

In [None]:
class AdditiveCouple(tf.keras.layers.Layer):
    """Layer for Additive Coupling."""
    def __init__(self,dim,split_dim,mask):
        super(AdditiveCouple, self).__init__()
        
        # Declare constant to express where to split input
        self.split_dim = split_dim
        
        if mask == 'left':
            mask_a = tf.ones(split_dim)
            mask_b = tf.zeros(dim-split_dim)
            self.mask = tf.concat([mask_a,mask_b],axis=0)
        if mask == 'right':
            mask_a = tf.zeros(split_dim)
            mask_b = tf.ones(dim-split_dim)
            self.mask = tf.concat([mask_a,mask_b],axis=0)
            
        # Declare layers of NeuralNet~m
        self.l1 = tf.keras.layers.Dense(dim,activation='relu')
        self.l2 = tf.keras.layers.Dense(dim,activation='relu')
        self.l3 = tf.keras.layers.Dense(dim,activation='linear')
        
    def m(self,x):
        x = self.l1(x)
        x = self.l2(x)
        return self.l3(x)

    def call(self, x, inverse=False):
        if inverse == True:
            y1, y2 = self.mask*x, tf.math.abs(self.mask-1)*x
            x1, x2 = y1, y2-(self.m(y1)*tf.math.abs(self.mask-1))
            return x1+x2
        
        x1, x2 = self.mask*x, tf.math.abs(self.mask-1)*x
        y1, y2 = x1, x2+(self.m(x1)*(tf.math.abs(self.mask-1)*x))
        y = x1+x2
        return y

In [None]:
class NICE(keras.Model):
    def __init__(self,input_dim,split_dim,**kwargs):
        super(NICE, self).__init__(**kwargs)
        
        # Declare constant to express where to split input
        self.split_dim = split_dim
        
        # Declare prior distribution for output
        self.prior = tfp.distributions.Logistic(0, 1,name='Logistic')
        # px = self.prior.prob(x)

        # Declare coupling layers
        self.L1 = AdditiveCouple(dim=input_dim,split_dim=split_dim,mask='left')
        self.L2 = AdditiveCouple(dim=input_dim,split_dim=split_dim,mask='right')
        self.L3 = AdditiveCouple(dim=input_dim,split_dim=split_dim,mask='left')
        self.L4 = AdditiveCouple(dim=input_dim,split_dim=split_dim,mask='right')
        
        # Declare Scaling layer
        self.S = ScaleLayer(dim=input_dim)
        
    def train_step(self, data):
        X, y = data[0], data[1]
        with tf.GradientTape() as tape:
            # Feed input through model 
            h1 = self.L1(X)
            h2 = self.L2(h1)
            h3 = self.L3(h2)
            h4 = self.L4(h3)
            h, s_vals = self.S(h4)
            
            # Caculate loss, prior is standard logistic
            h = (-1)*tf.math.log(1+tf.exp(h))-tf.log(1+tf.exp(-h))
            loss = (tf.math.reduce_sum(h)+tf.math.reduce_sum(s_vals))*(-1)
            
        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "log_likelihood": loss,
        }
    
    def call(self,data,inverse=False):
        if inverse == True:
            x = self.prior.sample(1)
            h4 = self.S(inverse=True)(x)
            h3 = self.L4(inverse=True)(h4)
            h2 = self.L3(inverse=True)(h3)
            h1 = self.L2(inverse=True)(h2)
            x = self.L1(inverse=True)(h1)
            return x
        else:
            h1 = self.L1(data)
            h2 = self.L2(h1)
            h3 = self.L3(h2)
            h4 = self.L4(h3)
            h, s_vals = self.S(h4)
            h = (-1)*tf.math.log(1+tf.exp(h))-tf.log(1+tf.exp(-h))
            return h

## Declare and train model

In [None]:
model = Nice(input_dim=784,split_dim=392)
model.compile(optimizer="adam")
history = model.fit(X_train, y_train, epochs=3, batch_size=32)

## Sample Model

In [None]:
x = model(inverse=True)