In [30]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset
import torch.optim as optim
import numpy as np
import pandas as pd

np.set_printoptions(
    edgeitems=4,
    linewidth=1000,
    formatter=dict(float=lambda x: "%.3g" % x),
    suppress=True,
)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 100)
torch.set_printoptions(edgeitems=2, linewidth=1000, precision=3, sci_mode=False)

np.random.seed(69)
torch.manual_seed(69)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(69)

In [31]:
data = pd.read_csv("datasets/MNIST/mnist_train.csv").drop(["label"], axis=1).values
print(f"Raw training data shape: {data.shape}")
data = data / 255
# data = data.reshape(-1, 28, 28)
print(f"Training data shape: {data.shape}")

Raw training data shape: (60000, 784)
Training data shape: (60000, 784)


In [32]:
class ReLUNet(nn.Module):
    def __init__(self, latent_dim, hidden_dim=1000, num_layers=5):
        super(ReLUNet, self).__init__()
        # latend_dim = input_dim // 2 (split into either odds or evens)
        # m: R^d -> R^{D-d}=R^{d} (ie: latent_dim -> input_dim - latent_dim = latent_dim)
        modules = [nn.Linear(latent_dim, hidden_dim)]
        for _ in range(num_layers):
            modules.append(nn.Linear(hidden_dim, hidden_dim))
            modules.append(nn.ReLU())
        modules.append(nn.Linear(hidden_dim, latent_dim))
        self.net = nn.Sequential(*modules)

    def forward(self, x):
        return self.net(x)


class CouplingLayer(nn.Module):
    """
    Additive coupling layer for nice model:
        g(a;b) = (a + b), where
        a = x2, b = m(x1), and m is a recfified neural network (ReLUNet) from R^d -> R^{D-d}=R^{d}
        Note: x1 and x2 are the odd and even parts of the input x, so for x in R^{D}, x1, x2 in R^{d}
            d = D-d = D/2
        Forward:
            y1 = x1
            y2 = g(x2;m(x1)) = x2 + m(x1)
            y = (y1, y2)
        Inverse:
            x1 = y1
            x2 = g^{-1}(y2;m(y1)) = y2 - m(y1)
            x = (x1, x2)

    """

    def __init__(self, input_dim, hidden_dim, num_layers, parity: str):
        super(CouplingLayer, self).__init__()

        self.parity = parity
        latent_dim = input_dim // 2

        # Define NN layers for the transformation
        self.m = ReLUNet(latent_dim, hidden_dim, num_layers)

    def forward(self, x):
        # Split input into "odd" and "even" parts
        odd, even = x[:, 0::2], x[:, 1::2]
        if self.parity == "odd":
            x1, x2 = odd, even
        else:
            x1, x2 = even, odd

        # Part 1 of the input is pass through an identity function (remains unchanged)
        y1 = x1
        # Apply the coupling transformation to the part 2 of the input
        y2 = x2 + self.m(x1)

        # Concatenate (or couple) the two parts back together
        y = torch.cat([y1, y2], dim=1)
        return y

    def inverse(self, y):
        # Split the output into two parts
        odd, even = y[:, 0::2], y[:, 1::2]
        if self.parity == "odd":
            y1, y2 = odd, even
        else:
            y1, y2 = even, odd

        # The first part of the output is unchanged
        x1 = y1
        # Apply the inverse transformation to the second part of the output
        x2 = y2 - self.m(y1)

        # Concatenate the two parts back together
        x = torch.cat([x1, x2], dim=1)
        return x

In [33]:
class NICE(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(NICE, self).__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        # Sequence of 4 alternativing parity coupling layers
        self.couple1 = CouplingLayer(input_dim, hidden_dim, num_layers, parity="odd")
        self.couple2 = CouplingLayer(input_dim, hidden_dim, num_layers, parity="even")
        self.couple3 = CouplingLayer(input_dim, hidden_dim, num_layers, parity="odd")
        self.couple4 = CouplingLayer(input_dim, hidden_dim, num_layers, parity="even")

        """
        Create the scaling diagonal matrix (see Section 3.3 of the NICE paper)
        Multiplies the ith output value by S_ii. Weights certain dim more than others.
        Similar to eigenspectrum of PCA, exposing the variation present in each latent dimension 
        (larger S_ii means the less important dimension i is). More important dimensions of the 
        spctrum can be viewed as a manifold learned by the mdoel.
        """
        self.scaling_diag = nn.Parameter(torch.ones(input_dim))

    def forward(self, x):
        """
        Forward pass is the encoding step of the NICE model.
        """
        # Apply the coupling layers
        y = self.couple1(x)
        y = self.couple2(y)
        y = self.couple3(y)
        y = self.couple4(y)
        # Apply the scaling layer
        y = y @ torch.diag(torch.exp(self.scaling_diag))
        return y

    def inverse(self, y):
        """
        Inverse pass is the decoding step of the NICE model.
        """
        with torch.no_grad():
            # Apply the inverse scaling layer
            x = y @ torch.diag(torch.exp(-self.scaling_diag))
            # Apply the inverse coupling layers
            x = self.couple4.inverse(x)
            x = self.couple3.inverse(x)
            x = self.couple2.inverse(x)
            x = self.couple1.inverse(x)
        return x

In [46]:
class _NICECriterion(nn.Module):
    """
    Implementation of equation (3) above. Base class for Gaussian and Logistic criterion classes.
    """

    def __init__(self, average=True):
        super(_NICECriterion, self).__init__()
        self.average = average

    def prior(self, h):
        # Implement in child classes (4) and (5)
        raise NotImplementedError("Must implement prior function in child class")

    def forward(self, h, s_diag):
        # Implementation of (3). Identical for both Gaussian and Logistic.
        # Don't take log of S_ii since it's already in log space, we take exp(S_ii) in forward pass.
        log_p = torch.sum(self.prior(h), dim=1) + torch.sum(s_diag)
        if self.average:
            return torch.mean(log_p)
        else:
            return torch.sum(log_p)


class GaussianNICECriterion(_NICECriterion):
    """
    Implementation of (4) above. Gaussian prior based log-lokielihood critereon.
    """

    def __init__(self, average=True):
        super(GaussianNICECriterion, self).__init__()

    def prior(self, h):
        # Implementation of (4) above.
        return -0.5 * (h**2 + torch.log(torch.tensor(2 * np.pi)))

In [36]:
epcohs = 1000
batch_size = 16
batch_np = data[:batch_size, :]
batch = torch.from_numpy(batch_np).float()
batch.shape

torch.Size([16, 784])

In [37]:
input_dim = 784
hidden_dim = 1000
num_layers = 5

In [47]:
model = NICE(input_dim, hidden_dim, num_layers)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
nice_loss_fn = GaussianNICECriterion()


def loss_fn(y):
    return nice_loss_fn(y, model.scaling_diag)

In [50]:
optimizer.zero_grad()
outputs = model(batch)
loss = loss_fn(outputs)
loss.backward()
optimizer.step()


In [40]:
print(outputs.shape)
outputs


torch.Size([16, 784])


tensor([[0.000, 0.000,  ..., 0.064, 0.099],
        [0.000, 0.000,  ..., 0.066, 0.101],
        ...,
        [0.000, 0.000,  ..., 0.066, 0.098],
        [0.000, 0.000,  ..., 0.061, 0.102]], grad_fn=<MmBackward0>)

In [49]:
print(loss.shape)
loss


torch.Size([])


tensor(-228.367, grad_fn=<MeanBackward0>)