In [1]:
#imports
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt # creating visualizations
import numpy as np # basic math and random numbers
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch # package for building functions with learnable parameters
import torch.nn as nn # prebuilt functions specific to neural networks
import torch.nn.functional as F
from torch.autograd import Variable # storing data while learning
from torch.distributions import constraints, multivariate_normal, Normal
from torch.distributions.distribution import Distribution
from torch.utils.data import DataLoader, Dataset


## First loading the recorded data

In [2]:
class CoordinateDataset(Dataset):
    
    def __init__(self, file_name, with_scaler=False):
        file_out = pd.read_csv(file_name, names=['ball_x', 'ball_y', 'positionLeftShoulderRoll', 'positionLeftShoulderPitch'])
        print(f"Number of samples: {len(file_out)}")
        file_out.drop_duplicates(subset=['ball_x','ball_y'], inplace=True)
        print(f"Number of samples after removing duplicates: {len(file_out)}")
        x = file_out[['ball_x','ball_y']].values
        y = file_out[['positionLeftShoulderRoll', 'positionLeftShoulderPitch']].values
        x_train = x
        if with_scaler:
            sc = StandardScaler()
            x_train = sc.fit_transform(x_train)
        y_train = y
        x_tensor = torch.tensor(x_train, dtype=torch.float32)
        y_tensor = torch.tensor(y_train, dtype=torch.float32)
        self.x_train = Variable(x_tensor)
        self.y_train = Variable(y_tensor, requires_grad=False)
        
    def __len__(self):
        return len(self.y_train)
    
    def __getitem__(self, idx):
        return self.x_train[idx], self.y_train[idx]
        
dataset = CoordinateDataset('../controllers/tutorial2_tracker/data_points.csv')

## Then training the MDN

In [3]:
class MDN(nn.Module):
    def __init__(self, n_input, n_hidden, n_output, n_gaussians):
        super(MDN, self).__init__()
        self.fc1 = nn.Linear(n_input, n_hidden)
        
        #IMPORTANT notes
        # - Use softmax activation for pi  (they need to add up to 1)
        # - Use exponential linear unit for deviations (they should not be negative or close to zero)
        self.pis = nn.Linear(n_hidden, n_gaussians) # Coefficents
        self.mus = nn.Linear(n_hidden, n_gaussians * n_output) # Means
        self.sigmas = nn.Linear(n_hidden, n_gaussians) # Variances / Deviations
        

    def forward(self, x):
        #Program the forward pass
        x = torch.tanh(self.fc1(x))
        pi = F.softmax(self.pis(x), -1)
        sigma = torch.exp(self.sigmas(x))
        mu = self.mus(x)
        
        return pi, sigma, mu

In [44]:
oneDivSqrtTwoPI = 1.0 / np.sqrt(2.0*np.pi)
def gaussian_distribution(y, mu, sigma):
    result = (y - mu) * torch.reciprocal(sigma)
    result = -0.5 * (result * result)
    return (torch.exp(result) * torch.reciprocal(sigma)) * oneDivSqrtTwoPI

def loss_fn(pi, sigma, mu, y):
    N, K = pi.shape
    _, KT = mu.shape
    NO = int(KT / K)
    losses = Variable(torch.zeros(N, K))
    for i in range(K): 
        window_mu = mu[:, i*NO:(i+1)*NO]
        sigma_expanded = sigma[:, i].unsqueeze(-1).repeat(1, NO)
        likelihood_z_x = -torch.log(gaussian_distribution(y, window_mu, sigma_expanded))
        prior_z = pi[:, i]
        losses += torch.mean(prior_z.unsqueeze(-1).repeat(1, NO) * likelihood_z_x)
    loss = torch.mean(losses)
    return loss

In [51]:
from .. import MDN

ValueError: attempted relative import beyond top-level package

In [46]:
NGAUSSIANS = 5
EPOCHS = 10000
model = MDN(n_input=2, n_hidden=20, n_output=2, n_gaussians=NGAUSSIANS)
optimizer = torch.optim.Adam(model.parameters()) # Select optimizer
losses = []

def train_mdn(network, x_train, y_train):
    for epoch in range(EPOCHS):
        pi_variable, sigma_variable, mu_variable = network(x_train)
        loss = loss_fn(pi_variable, sigma_variable, mu_variable, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
        if epoch % 500 == 0:
            print(epoch, loss.item())

In [47]:
train_mdn(model, dataset.x_train, dataset.y_train)

0 1.154797077178955
500 -1.2244654893875122
1000 -1.5054270029067993
1500 -1.9656215906143188


KeyboardInterrupt: 

In [32]:
torch.save(model.state_dict(), f'testmdn_model_nao_{int(EPOCHS/1000)}kepochs_{NGAUSSIANS}gaussians')

In [8]:
class FFN(nn.Module):
    
    def __init__(self, n_input, n_hidden, n_output):
        super(FFN, self).__init__()
        self.fc1 = nn.Linear(n_input, n_hidden)
        self.fc2 = nn.Linear(n_hidden, n_output)
        
    def forward(self, x):
        x = torch.tanh(self.fc1(x))
        return self.fc2(x)
        

In [9]:
model_ffn = FFN(n_input=2, n_hidden=6, n_output=2)

In [10]:
def train_ffn(network, x_train, y_train, optimizer, criterion, n_epoch=10000):
    for epoch in range(n_epoch):
        prediction = network(x_train)
        loss = criterion(prediction, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
        if epoch % 500 == 0:
            print(epoch, loss.item())

In [11]:
optimizer = torch.optim.Adam(model_ffn.parameters(), lr=0.01)
criterion = torch.nn.MSELoss() 

train_ffn(model_ffn, dataset.x_train, dataset.y_train, optimizer, criterion)

0 0.1683187037706375
500 0.014715773053467274
1000 0.014120973646640778
1500 0.016923552379012108
2000 0.013578483834862709
2500 0.013523253612220287
3000 0.013592777773737907
3500 0.013481817208230495
4000 0.008557140827178955
4500 0.006724161561578512
5000 0.007200705818831921
5500 0.007027440704405308
6000 0.0066414386965334415
6500 0.007104900199919939
7000 0.010111912153661251
7500 0.006551699712872505
8000 0.006863217800855637
8500 0.00658069271594286
9000 0.00652605714276433
9500 0.008215435780584812


In [12]:
torch.save(model_ffn.state_dict(), 'ffn_model_nao')