# PyTorch implementation of the cVAE
---

In [1]:
import torch
import torch.nn          as nn
import numpy             as np
import matplotlib.pyplot as plt

from torch.utils.data import Dataset, DataLoader
from torch.optim      import Adam
from tqdm             import tqdm
from ipywidgets       import interact

plt.rcParams['figure.dpi'] = 150

In [2]:
# Model Hyperparameters
cuda   = False
DEVICE = torch.device("cuda" if cuda else "cpu")

batch_size = 100

lr = 1e-3

epochs = 30

In [None]:
def models_data_transform(self, x):
    # Take log10
    x = np.log10(x)
    self.models_mean = np.mean(x)
    # Normalise
    x = (x - np.mean(x)) / np.std(x)
    # Return result
    return x

def inverse_models_data_transform(x):

In [131]:
class MyDataset(Dataset):
    def __init__(self, models_file, I_outs_file, train=True, fraction=0.9):
        
        self.models_file = models_file
        self.I_outs_file = I_outs_file
        
        # Load data
        self.I_out = self.model()
        self.model = self.I_out()
        
        # Select relevant data
        self.I = self.I_out
        self.η = self.model[:, 0]
        
        # Take log10 and clip
        self.I = np.clip(np.log10(self.I), -20.0, np.inf)
        self.η = np.clip(np.log10(self.η), -20.0, np.inf)
        
        # Normalise
        self.I = (self.I - np.mean(self.I)) / np.std(self.I)
        self.η = (self.η - np.mean(self.η)) / np.std(self.η)
        
        # Set type
        self.I = self.I.astype(np.float32)
        self.η = self.η.astype(np.float32)
        
        N = int(fraction * self.I.shape[0])
        
        if train:
            # Take fraction of the data
            self.I = self.I[:N]
            self.η = self.η[:N]
        else:
            # Take 1-fraction of the data 
            self.I = self.I[N:]
            self.η = self.η[N:]

    def model(self):
        return np.load(models_file)
    
    def I_out(self):
        return np.load(I_outs_file)
            
    def __len__(self):
        return self.I.shape[0]

    def __getitem__(self, idx):
        return self.I[idx], self.η[idx]

In [132]:
train_data = MyDataset('data/Models/model.npy', 'data/Models/I_out.npy', train=True,  fraction=0.8)
valid_data = MyDataset('data/Models/model.npy', 'data/Models/I_out.npy', train=False, fraction=0.8)

kwargs = {'num_workers': 1, 'pin_memory': True} 

train_loader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True,  **kwargs)
valid_loader = DataLoader(dataset=valid_data, batch_size=batch_size, shuffle=False, **kwargs)

In [5]:
N_depth = train_data.η.shape[1]
N_freqs = train_data.I.shape[1]

In [6]:
for batch_idx, (I, η) in enumerate(tqdm(valid_loader)):
    I = I.view(batch_size, N_freqs)
    η = η.view(batch_size, N_depth)
    break

  0%|                                                                   | 0/200 [00:00<?, ?it/s]


In [20]:
interact(lambda i: plt.plot(η[i]), i=(0, batch_size-1))

interactive(children=(IntSlider(value=49, description='i', max=99), Output()), _dom_classes=('widget-interact'…

<function __main__.<lambda>(i)>

In [8]:
interact(lambda i: plt.plot(I[i]), i=(0, batch_size-1))

interactive(children=(IntSlider(value=49, description='i', max=99), Output()), _dom_classes=('widget-interact'…

<function __main__.<lambda>(i)>

In [9]:
class q_NN(nn.Module):
    """
    "Posterior Network" q(l|m): giving the distribution over the latent variable, given a model.
    """
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()

        self.FC_input  = nn.Linear( input_dim, hidden_dim)
        self.FC_input2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_mean   = nn.Linear(hidden_dim, latent_dim)
        self.FC_var    = nn.Linear(hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        self.training = True
        
        
    def forward(self, x):
        h_       = self.LeakyReLU(self.FC_input(x))
        h_       = self.LeakyReLU(self.FC_input2(h_))
        mean     = self.FC_mean(h_)
        log_var  = self.FC_var(h_)
        
        # encoder produces mean and log of variance 
        # (i.e., parateters of simple tractable normal distribution "q"
        
        return mean, log_var

In [10]:
class Encoder(nn.Module):
    """
    Encoder network.
    """
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()

        self.FC_input  = nn.Linear( input_dim, hidden_dim)
        self.FC_input2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_input3 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_mean   = nn.Linear(hidden_dim, latent_dim)
        self.FC_var    = nn.Linear(hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        # self.tanh      = nn.Tanh()
        
    def forward(self, I):
        h       = self.LeakyReLU(self.FC_input (I))
        h       = self.LeakyReLU(self.FC_input2(h))
        h       = self.LeakyReLU(self.FC_input3(h))
        mean    = self.FC_mean(h)
        log_var = self.FC_var (h)
        
        return mean, log_var
    
    
class Decoder(nn.Module):
    """
    Decoder network.
    """
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(Decoder, self).__init__()
        
        self.FC_hidden  = nn.Linear(latent_dim, hidden_dim)
        self.FC_hidden2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_hidden3 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_output  = nn.Linear(hidden_dim, output_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        # self.tanh      = nn.Tanh()
        
    def forward(self, z):
        h = self.LeakyReLU(self.FC_hidden (z))
        h = self.LeakyReLU(self.FC_hidden2(h))
        h = self.LeakyReLU(self.FC_hidden3(h))
        
        η = self.FC_output(h)
        
        return η
    
    
class Model(nn.Module):
    """
    Variational Autoencoder
    """
    def __init__(self, Encoder, Decoder):
        super(Model, self).__init__()
        
        self.Encoder = Encoder
        self.Decoder = Decoder
        
        
    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(DEVICE)   # sampling epsilon        
        z = mean #+ var * epsilon                     # reparameterization trick
        return z
        
                
    def forward(self, I):
        mean, log_var = self.Encoder(I)
        z             = self.reparameterization(mean, torch.exp(0.5 * log_var)) # takes exponential function (log var -> var)
        η             = self.Decoder(z)
        
        return η, mean, log_var

In [39]:
nn.Conv1d(1, 4, 4, 2).stride

(2,)

In [48]:
def Conv1d_L_out(L_in, cnns):
    
    L_out = L_in

    for cnn in cnns:
        L_out = np.floor((L_out + 2 * cnn.padding[0] - cnn.dilation[0] * (cnn.kernel_size[0] - 1) - 1) / cnn.stride[0] + 1)
    
    return int(L_out)

In [123]:
Conv1d_L_out(13, [nn.Conv1d(1, 4, 5, 1, 2)])

13

In [54]:
N_freqs

80

In [61]:
Conv1d_L_out(N_freqs, [nn.Conv1d(1, 6, 8, 1), nn.Conv1d(6, 1, 8, 1)])

66

In [112]:
Conv1d_L_out(10, [nn.Conv1d(1, 1, 1, 1)])

10

In [89]:
I.size(2)

IndexError: Dimension out of range (expected to be in range of [-2, 1], but got 2)

In [124]:
class CNN_Encoder(nn.Module):
    """
    Encoder network, using convolutional layers.
    """
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(CNN_Encoder, self).__init__()

        self.Conv1d_1 = nn.Conv1d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2)
        self.Conv1d_2 = nn.Conv1d(in_channels=6, out_channels=1, kernel_size=5, stride=1, padding=2)
        
        L_out = Conv1d_L_out(input_dim, [self.Conv1d_1, self.Conv1d_2])
        
        self.Linear_1    = nn.Linear(L_out,      hidden_dim)
        self.Linear_2    = nn.Linear(hidden_dim, hidden_dim)
        self.Linear_mean = nn.Linear(hidden_dim, latent_dim)
        self.Linear_lvar = nn.Linear(hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)

        
    def forward(self, I):
        
        I = I.reshape(I.size(0), 1, I.size(1))
        
        h = self.Conv1d_1(I)
        h = self.Conv1d_2(h)

        h = h.reshape(h.size(0), -1)
        
        h = self.LeakyReLU(self.Linear_1(h))
        h = self.LeakyReLU(self.Linear_2(h))
        
        mean = self.LeakyReLU(self.Linear_mean(h))
        lvar = self.LeakyReLU(self.Linear_lvar(h))
    
        return mean, lvar
    
    
class CNN_Decoder(nn.Module):
    """
    Decoder network, using convolutional layers.
    """
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(CNN_Decoder, self).__init__()
        
        self.Conv1d_1 = nn.Conv1d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2)
        self.Conv1d_2 = nn.Conv1d(in_channels=6, out_channels=1, kernel_size=5, stride=1, padding=2)
      
        L_out = Conv1d_L_out(latent_dim, [self.Conv1d_1, self.Conv1d_2])
    
        self.Linear_1 = nn.Linear(L_out, output_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
    def forward(self, z):
        
        # z = z.reshape(z.size(0), 1, z.size(1))
        
        z = z.reshape(z.size(0), 1, z.size(1))
        
        h = self.Conv1d_1(z)
        h = self.Conv1d_2(h)
        
        h = h.reshape(h.size(0), -1)
        
        h = self.LeakyReLU(self.Linear_1(h))
        
        η = h
        
        return η
    
    
class Model(nn.Module):
    """
    Variational Autoencoder
    """
    def __init__(self, Encoder, Decoder):
        super(Model, self).__init__()
        
        self.Encoder = Encoder
        self.Decoder = Decoder
        
        
    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(DEVICE)   # sampling epsilon        
        z = mean #+ var * epsilon                     # reparameterization trick
        return z
        
                
    def forward(self, I):
        mean, log_var = self.Encoder(I)
        z             = self.reparameterization(mean, torch.exp(0.5 * log_var)) # takes exponential function (log var -> var)
        η             = self.Decoder(z)
        
        return η, mean, log_var

In [125]:
encoder = CNN_Encoder( input_dim=N_freqs, hidden_dim=N_freqs, latent_dim=N_depth)
decoder = CNN_Decoder(latent_dim=N_depth, hidden_dim=N_depth, output_dim=N_depth)

model = Model(Encoder=encoder, Decoder=decoder).to(DEVICE)


# model = Model(input_dim=N_freqs, Decoder=decoder).to(DEVICE)

In [126]:
def loss_function(η, η_hat, mean, log_var):
    reproduction_loss = nn.functional.mse_loss(η_hat, η)
    KLD               = -0.5 * torch.sum(1.0 + log_var - mean.pow(2) - log_var.exp())
    return reproduction_loss #+ 0.5*KLD

In [127]:
# def loss_function(η, η_hat):
#     return nn.functional.mse_loss(η_hat, η)

In [128]:
optimizer = Adam(model.parameters(), lr=lr)


model.train()

for epoch in range(epochs):
    
    overall_loss = 0
    
    for batch_idx, (I, η) in enumerate(train_loader):
        # I = I.view(batch_size, N_freqs)
        # η = η.view(batch_size, N_depth)
        
        I = I.to(DEVICE)
        η = η.to(DEVICE)

        η_hat, mean, log_var = model(I)
        loss                 = loss_function(η, η_hat, mean, log_var)
        # loss                 = loss_function(η, η_hat)
        
        overall_loss += loss.item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    print("\tEpoch", epoch + 1, "complete!", "\tAverage Loss: ", overall_loss / (batch_idx*batch_size))

	Epoch 1 complete! 	Average Loss:  0.001354424984745672
	Epoch 2 complete! 	Average Loss:  0.0003103168344402567
	Epoch 3 complete! 	Average Loss:  0.00023059129832692454
	Epoch 4 complete! 	Average Loss:  0.00019508747019116138
	Epoch 5 complete! 	Average Loss:  0.0001665187180531823
	Epoch 6 complete! 	Average Loss:  0.00014873623581036682
	Epoch 7 complete! 	Average Loss:  0.00013468247311462523
	Epoch 8 complete! 	Average Loss:  0.00012397465010515144
	Epoch 9 complete! 	Average Loss:  0.00011472355228765997
	Epoch 10 complete! 	Average Loss:  0.00010672861961962583
	Epoch 11 complete! 	Average Loss:  0.00010059591925916295
	Epoch 12 complete! 	Average Loss:  9.548630751902529e-05
	Epoch 13 complete! 	Average Loss:  9.140281919949457e-05
	Epoch 14 complete! 	Average Loss:  8.705841162905227e-05
	Epoch 15 complete! 	Average Loss:  8.379710201192671e-05
	Epoch 16 complete! 	Average Loss:  8.105451408983471e-05
	Epoch 17 complete! 	Average Loss:  7.818223256967468e-05
	Epoch 18 comple

In [129]:
model.eval()

with torch.no_grad():
    
    for batch_idx, (I, η) in enumerate(tqdm(valid_loader)):
        I = I.view(batch_size, N_freqs)
        η = η.view(batch_size, N_depth)
        I = I.to(DEVICE)
        η = η.to(DEVICE)
        
        η_hat, _, _ = model(I)

        break

  0%|                                                                   | 0/200 [00:00<?, ?it/s]


In [None]:
def inverse_model(I):
    
    return

In [130]:
def plot(i):
    plt.plot(η    [i])
    plt.plot(η_hat[i])
interact(plot, i=(0,batch_size-1))

interactive(children=(IntSlider(value=49, description='i', max=99), Output()), _dom_classes=('widget-interact'…

<function __main__.plot(i)>

In [21]:
model.eval()

with torch.no_grad():
    
    for batch_idx, (I, η) in enumerate(tqdm(train_loader)):
        I = I.view(batch_size, N_freqs)
        η = η.view(batch_size, N_depth)
        I = I.to(DEVICE)
        η = η.to(DEVICE)
        
        η_hat, _, _ = model(I)

        break

  0%|                                                                   | 0/800 [00:00<?, ?it/s]


In [22]:
def plot(i):
    plt.plot(η    [i])
    plt.plot(η_hat[i])
interact(plot, i=(0,batch_size-1))

interactive(children=(IntSlider(value=49, description='i', max=99), Output()), _dom_classes=('widget-interact'…

<function __main__.plot(i)>

In [72]:
η_hat.max()

tensor(1.0986)