In [None]:
# Load data into jupyter
from mpl_toolkits.mplot3d import axes3d

import math
import numpy as np
import matplotlib.pyplot as plt

from matplotlib.pyplot import savefig

def load_csv_file(filename, skiprows = 0):
  return np.loadtxt(open(filename, "rb"), delimiter=",", skiprows = skiprows)

def plot_2d_configs(x1, x2):
    """
    Scatter-plot the 2D array;
    """
    data = (x1, x2)
    colors = ("blue", "red")
    groups = ("collision-free", "in-collision") 
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    for points, color, group in zip(data, colors, groups):
        print("The shape of point is: %s" % (points.shape, ))
        x = points[:, 0]
        y = points[:, 1]
        ax.scatter(x, y, alpha=0.8, c=color, edgecolors='none', s=30, label=group)

    plt.title('In-collision vs. collision-free data points')
    plt.legend(loc=2)
    plt.show()

def plot_3d_configs(x1, x2):
    """
    Scatter-plot a three dimensional data
    """

    data = (x1, x2)
    colors = ("blue", "red")
    groups = ("collision-free", "in-collision") 

    # Create plot
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    ax = fig.gca(projection='3d')

    for points, color, group in zip(data, colors, groups):
        print("The shape of point is: %s" % (points.shape, ))
        x = points[:, 0]
        y = points[:, 1]
        z = points[:, 2]
        ax.scatter(x, y, z, alpha=0.8, c=color, edgecolors='none', s=30, label=group)

    plt.title('In-collision vs. collision-free data points')
    plt.legend(loc=2)
    plt.show()
    
def plot_3d_single(points, group, color='red'):
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    ax = fig.gca(projection='3d')
    ax.scatter(points[:,0], points[:,1], points[:,2], alpha=0.8, c=color, edgecolors='none', s=30, label=group)
    plt.legend(loc=2)
    plt.show()

def plot_2d_single(points, group, color='red'):
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    ax.scatter(points[:,0], points[:,1], alpha=0.8, c=color, edgecolors='none', s=30, label=group)
    plt.legend(loc=2)
    plt.show()

def test_plot_2d():
    N = 60
    g1 = np.array((0.6 + 0.6 * np.random.rand(N), np.random.rand(N))).T
    g2 = np.array((0.4+0.3 * np.random.rand(N), 0.5*np.random.rand(N))).T
    plot_2d_configs(g1, g2)
    
def test_plot_3d():
    N = 60
    g1 = np.array((0.6 + 0.6 * np.random.rand(N), np.random.rand(N),0.4+0.1*np.random.rand(N))).T
    g2 = np.array((0.4+0.3 * np.random.rand(N), 0.5*np.random.rand(N),0.1*np.random.rand(N))).T
    plot_3d_configs(g1, g2)
    
test_plot_2d()
test_plot_3d()

In [None]:
from os import listdir
from os.path import isfile, join

num_joints = 7
limit = 10000000  # set limit on number of points in the configuration
path = "../data/input/right_joint_states"
file_name = ''

files = [f for f in listdir(path) if isfile(join(path, f))]
for f in files:
    
    joints = int(f.split('_')[1])
    num_samples = int(f.split('_')[2])
    if joints == num_joints and num_samples < limit:
        file_name = join(path, f)
        break
print("Loading data from file: %s" % (file_name,))

In [None]:
import math

"""
Load and preprocess data; 
"""

def normalize_data(X, limits):
    """
    Normalize the revolute joint data from joints limits to [0, 1]
    X: Data before normalization n x d array
    limits: joint limits for the revolute joints, a 2 x d array
    """
    X = X - limits[0, :]
    x_range = limits[1, :] - limits[0, :]
    return X / x_range

def recover_data(normalized_X, limits):
    """
    Recover the normalized X based on joint limits.
    normalized_X: the normalized data points;
    limits: the joint limits
    """
    x_range = limits[1, :] - limits[0, :]
    X = normalized_X * x_range
    return X + limits[0, :]

In [None]:
JOINT_NAMES = ['s0','s1','e0','e1','w0','w1','w2']
JOINT_LIMITS = {
    's0': [-1.7016, 1.7016],
    's1': [-2.147, 1.047],
    'e0': [-3.0541, 3.0541],
    'e1': [-0.05, 2.618],
    'w0': [-3.059, 3.059],
    'w1': [-1.5707, 2.094],
    'w2': [-3.059, 3.059]
}
collisionKey = 'collisionFree' 
HEADERS = ['right_' + joint_name for joint_name in JOINT_NAMES]


selected_joints = JOINT_NAMES[0:num_joints]
joint_limits = [JOINT_LIMITS[joint] for joint in selected_joints]
joint_limits = np.array(joint_limits).T
headers = ['right_' + joint_name for joint_name in selected_joints]

headers_w_collision = headers.copy()
headers_w_collision.append('collisionFree')

print("The joint limits are: %s" % joint_limits)
print("The headers are: %s" % headers)
print("The headers with collisions are: %s" % headers_w_collision)
print("The complete headers are: %s" % HEADERS)

In [None]:
# Hyperparamters
batch_size = 1000
learning_rate = 0.0001
d_input = num_joints
h_dim1 = 256
h_dim2 = 100
generated_sample_size = 1000000
epochs = 10

# Latent_variables
d_output = 2

In [None]:
from torch.utils.data import DataLoader
import pandas as pd

input_data = pd.read_csv(file_name, usecols=headers_w_collision)

# Additional step to preserve the header order when reading data; 
data = input_data[headers_w_collision].values

# np.random.shuffle(data)
print("The first 5 values from the dataset are: %s" % data[0:5, :])
X = normalize_data(data[:, :-1], joint_limits)
y = data[:, -1]

print("The first 5 values after normalization are: %s" % X[0:5, :])

print("The max of X is: %s; The min of X is: %s" % (np.max(X), np.min(X),))

cutoff = math.ceil(X.shape[0] * 0.8) # 80 percent training data
train_X = X[:cutoff, :]
test_X = X[cutoff:, :]
train_y = y[:cutoff]
test_y = y[cutoff:]

train_data = np.hstack((train_X, np.expand_dims(train_y, axis = 1)))
test_data = np.hstack((test_X, np.expand_dims(test_y, axis = 1)))

print("The shape of samples in training is: %s" % (train_data.shape,))
print("The shape of samples in test is: %s" % (test_data.shape,))

train_loader = DataLoader(train_data, batch_size=batch_size,
                        shuffle=True, num_workers=1)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=1)

In [None]:
# Pick 1000 random points to display
indices = np.random.choice(train_X.shape[0], 2000, replace=False)
train_X_random = train_X[indices, :]
train_y_random = train_y[indices]

x_free = train_X_random[train_y_random == 1]
x_col = train_X_random[train_y_random == 0]

plot_2d_configs(x_free[:, 0:2], x_col[:, 0:2]) # s0, s1
plot_2d_configs(x_free[:, 1:3], x_col[:, 1:3]) # s1, e0
plot_2d_configs(x_free[:, (0,2)], x_col[:, (0,2)]) #s0, e0
plot_3d_configs(x_free[:, :3], x_col[:, :3]) # s0, s1, e0
plot_3d_single(x_free[:, :3], 'collision-free', 'blue')
plot_3d_single(x_free[:, :3], 'in-collision', 'red')

In [None]:
from torch.nn import functional as F
from torch import nn
import torch

class VAE(nn.Module):

  def __init__(self, d_in, h_dim1, h_dim2, d_out, keep_prob=0):
    super(VAE, self).__init__()
    # The latent variable is defined as z = N(\mu, \sigma).

    # Encoder network
    self.fc1 = torch.nn.Linear(d_in, h_dim1)
    self.fc2 = torch.nn.Linear(h_dim1, h_dim2)
    self.mean = torch.nn.Linear(h_dim2, d_out)
    self.logvar = torch.nn.Linear(h_dim2, d_out)

    # Decoder network
    self.fc3 = torch.nn.Linear(d_out, h_dim2)
    self.fc4 = torch.nn.Linear(h_dim2, h_dim1)
    self.fc5 = torch.nn.Linear(h_dim1, d_in)

  # Encode the input into a normal distribution
  def encode(self, x):
    h1 = F.elu(self.fc1(x))
    h2 = F.elu(self.fc2(h1))
    return self.mean(h2), self.logvar(h2)

  # Reparametrize the normal;
  def reparameterize(self, mu, logvar):
    std = torch.exp(0.5 * logvar)
    eps = torch.randn_like(std)
    return eps.mul(std).add_(mu)

  def decode(self, z):
    h2 = F.elu(self.fc3(z))
    h1 = F.elu(self.fc4(h2))
    return torch.sigmoid(self.fc5(h1))

  # Forward pass;
  def forward(self, x):
    # Change the array to float first;
    mu, logvar = self.encode(x.view(-1, num_joints))
    # print("The value of mu and logvars are: %s, %s" % (mu, logvar))
    z = self.reparameterize(mu, logvar)
    return self.decode(z), mu, logvar


# Reconstruction + KL divergence losses summed over all elements and batch
def loss_function(recon_x, x, mu, logvar):
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, num_joints), reduction='sum')
    # see Appendix B from VAE paper:
    # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # https://arxiv.org/abs/1312.6114
    # 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD, BCE, KLD

def train(model, optimizer, device, epoch, train_loader):
  model.train()
  train_loss = 0
  bce_loss = 0
  kld_loss = 0
  for batch_idx, batch in enumerate(train_loader):
    configs = batch[:, :-1]
    y = batch[:, -1:]
    configs = configs.to(device).float()
    optimizer.zero_grad()

    # Forward pass
    recon_batch, mu, logvar = model(configs)
    # print("Max in the reconstructed_batch: %s; Max in the original dataset: %s" % (np.max(recon_batch.detach().numpy()), np.max(configs.detach().numpy()),))
    # print("Min in the reconstructed_batch: %s; Min in the original dataset: %s" % (np.min(recon_batch.detach().numpy()), np.min(configs.detach().numpy()),))
    loss, bce, kld = loss_function(recon_batch, configs, mu, logvar)
    # Backward pass
    loss.backward()
    train_loss += loss.item()
    bce_loss += bce.item()
    kld_loss += kld.item()
    optimizer.step()
  print('====> Epoch: {} Average total training_Loss: {:.4f}; BCE loss: {:.4f}; KLD: {:.4f}'.format(
    epoch, train_loss / len(train_loader.dataset), 
    bce_loss / len(train_loader.dataset), 
    kld_loss / len(train_loader.dataset)))

  # return the last batch
  mu = np.hstack((mu.detach().numpy(), y))
  return mu, logvar
def test(model, epoch, device, test_loader):
    model.eval()
    test_loss = 0
    with torch.no_grad():
      for i, batch in enumerate(test_loader):
        configs = batch[:, :-1]
        y = batch[:, -1:]
        configs = configs.to(device).float()
        recon_batch, mu, logvar = model(configs)
        loss, bce, kld = loss_function(recon_batch, configs, mu, logvar)
        test_loss += loss.item()

    test_loss /= len(test_loader.dataset)
    print('====> Epoch: {} Test_loss: {:.4f};'.format(epoch, test_loss))
    mu = np.hstack((mu.detach().numpy(), y))
    return test_loss, mu, logvar

def generate_samples(generated_batch_size, d_output, device, model):
  # Generate sampled outputs;
  with torch.no_grad():
    norms = torch.randn(generated_batch_size, d_output).to(device)
    samples = model.decode(norms).cpu().numpy()
  return samples

In [None]:
from torch import optim

import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VAE(d_input, h_dim1, h_dim2, d_output).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
model_path = "./model"

output_directory = model_path + '/' + str(num_joints) + '-' + str(h_dim1) + '-' + str(h_dim2) + '-' + str(d_output) + '-' + str(batch_size) + '-' + \
          str(learning_rate)
if not os.path.exists(output_directory):
  os.makedirs(output_directory)

test_loss = math.inf
for epoch in range(1, epochs + 1):
  mu, logvar = train(model=model, optimizer=optimizer, device=device, epoch=epoch, train_loader=train_loader)
  # print("The shape of mu is: %s" % (mu.shape, ))
  test_loss, mu_test, logvar_test = test(model=model, epoch=epoch, device=device, test_loader=test_loader)

# Print out metric for evaluation.
print("Final average test loss: {:.4f};".format(test_loss))

mu_free = mu_test[mu_test[:, -1] == 1]
mu_collision = mu_test[mu_test[:, -1] == 0]
plot_2d_configs(mu_free[:, :2], mu_collision[:, :2])
plot_2d_single(mu_free[:, :2], 'collision-free', 'blue')
plot_2d_single(mu_collision[:, :2], 'in-collision', 'red')