In [1]:
# TEMP: Import lagom
# Not useful once lagom is installed
import sys
sys.path.append('/home/zuo/Code/lagom/')

# 1. Data generation

In [2]:
import numpy as np

import torch
from torch.utils.data import Dataset, DataLoader


class Data(Dataset):
    """
    Generate a set of data point of an inverted sinusoidal function. 
    i.e. y(x) = 7sin(0.75x) + 0.5x + eps, eps~N(0, 1)
    
    Then we ask the neural networks to predict x given y, in __getitem__(). 
    In this case, the classic NN suffers due to only one output given input. 
    To address it, one can use Mixture Density Networks. 
    """
    def __init__(self, n):
        self.n = n
        self.x, self.y = self._generate_data(self.n)
    
    def _generate_data(self, n):
        eps = np.random.randn(n)
        x = np.random.uniform(low=-10.5, high=10.5, size=n)
        y = 7*np.sin(0.75*x) + 0.5*x + eps
        
        return np.float32(x), np.float32(y)  # Enforce the dtype to be float32, i.e. FloatTensor in PyTorch
    
    def __len__(self):
        return self.n
    
    def __getitem__(self, index):
        # Retrieve the x, y value
        x = self.x[index]
        y = self.y[index]
        # Keep array shape due to scalar value
        x = np.array([x], dtype=np.float32)
        y = np.array([y], dtype=np.float32)

        return y, x

# 2. Training

In [None]:
import torch.nn.functional as F
import torch.optim as optim

from lagom.core.networks import MDN

device = torch.device('cuda')

D = Data(2500)
train_loader = DataLoader(D, batch_size=64)

model = MDN(input_dim=1, 
            output_dim=1, 
            num_densities=5, 
            hidden_sizes=[20], 
            hidden_nonlinearity=F.relu)
model = model.to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3)


for i in range(2000):
    model.train()
    
    losses = []
    for data, target in train_loader:
        data = data.to(device)
        target = target.to(device)
        
        optimizer.zero_grad()
        log_pi, mu, std = model(data)
        loss = model.MDN_loss(log_pi=log_pi, mu=mu, std=std, target=target)
        loss.backward()
        losses.append(loss.item())
        optimizer.step()
        
    if i == 0 or (i+1)%100 == 0:
        #IPython.display.clear_output(wait=True)
        print(f'Epoch: {i+1}\t Loss: {np.mean(losses)}')
    

Epoch: 1	 Loss: 6.8650190234184265


# 3. Evaluation

In [None]:
import matplotlib.pyplot as plt

test_data = torch.tensor(np.linspace(-15, 15, num=1000, dtype=np.float32), device=device).unsqueeze(1)
log_pi, mu, std = model(test_data)
samples = model.sample(log_pi=log_pi, mu=mu, std=std)
samples.detach().cpu().numpy()

plt.figure(figsize=[6, 6])
plt.scatter(D.y, D.x, alpha=0.2)
plt.scatter(test_data.detach().cpu().numpy(), samples.detach().cpu().numpy(), alpha=0.2, color='red')
plt.show()