<a href="https://colab.research.google.com/github/pbdevpros/CS237B_HW3/blob/P2ii/il_dist_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
import numpy as np
import os
from gym_carlo.envs.geometry import Point

scenario_names = ['intersection', 'circularroad', 'lanechange']
obs_sizes = {'intersection': 5, 'circularroad': 4, 'lanechange': 3}
goals = {'intersection': ['left','straight','right'], 'circularroad': ['inner','outer'], 'lanechange': ['left','right']}
steering_lims = {'intersection': [-0.5,0.5], 'circularroad': [-0.15,0.15], 'lanechange': [-0.15, 0.15]}

def maybe_makedirs(path_to_create):
    """This function will create a directory, unless it exists already,
    at which point the function will return.
    The exception handling is necessary as it prevents a race condition
    from occurring.
    Inputs:
        path_to_create - A string path to a directory you'd like created.
    """
    try: 
        os.makedirs(path_to_create)
    except OSError:
        if not os.path.isdir(path_to_create):
            raise


def load_data(args):
    data_name = args.goal.lower()
    scenario_name = args.scenario.lower()
      
    assert scenario_name in goals.keys(), '--scenario argument is invalid!'
    data = {}
    if data_name == 'all':
        np_data = [np.load('data/' + scenario_name + '_' + dn + '.npy') for dn in goals[scenario_name]]
        u = np.vstack([np.ones((np_data[i].shape[0],1))*i for i in range(len(np_data))])
        np_data = np.vstack(np_data)
        data['u_train'] = np.array(u).astype('uint8').reshape(-1,1)
    else:
        assert data_name in goals[scenario_name], '--data argument is invalid!'
        np_data = np.load('data/' + scenario_name + '_' + data_name + '.npy')

    data['x_train'] = np_data[:,:-2].astype('float32')
    data['y_train'] = np_data[:,-2:].astype('float32') # control is always 2D: throttle and steering
    
    return data
    
   
def optimal_act_circularroad(env, d):
    if env.ego.speed > 10:
        throttle = 0.06 + np.random.randn()*0.02
    else:
        throttle = 0.6 + np.random.randn()*0.1
        
    # setting the steering is not fun. Let's practice some trigonometry
    r1 = 30.0 # inner building radius (not used rn)
    r2 = 39.2 # inner ring radius
    R = 32.3 # desired radius
    if d==1: R += 4.9
    Rp = np.sqrt(r2**2 - R**2) # distance between current "target" point and the current desired point
    theta = np.arctan2(env.ego.y - 60, env.ego.x - 60)
    target = Point(60 + R*np.cos(theta) + Rp*np.cos(3*np.pi/2-theta), 60 + R*np.sin(theta) - Rp*np.sin(3*np.pi/2-theta)) # this is pure magic (or I need to draw it to explain)
    desired_heading = np.arctan2(target.y - env.ego.y, target.x - env.ego.x) % (2*np.pi)
    h = np.array([env.ego.heading, env.ego.heading - 2*np.pi])
    hi = np.argmin(np.abs(desired_heading - h))
    if desired_heading >= h[hi]: steering = 0.15 + np.random.randn()*0.05
    else: steering = -0.15 + np.random.randn()*0.05
    return np.array([steering, throttle]).reshape(1,-1)
    
    
def optimal_act_lanechange(env, d):
    if env.ego.speed > 10:
        throttle = 0.06 + np.random.randn()*0.02
    else:
        throttle = 0.8 + np.random.randn()*0.1
        
    if d==0:
        target = Point(37.55, env.ego.y + env.ego.speed*3)
    elif d==1:
        target = Point(42.45, env.ego.y + env.ego.speed*3)
    desired_heading = np.arctan2(target.y - env.ego.y, target.x - env.ego.x) % (2*np.pi)
    h = np.array([env.ego.heading, env.ego.heading - 2*np.pi])
    hi = np.argmin(np.abs(desired_heading - h))
    if desired_heading >= h[hi]: steering = 0.15 + np.random.randn()*0.05
    else: steering = -0.15 + np.random.randn()*0.05
    return np.array([steering, throttle]).reshape(1,-1)

ModuleNotFoundError: ignored

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow_probability import distributions as tfd
import argparse
# from utils import *

tf.config.run_functions_eagerly(True)
import tensorflow_probability as tfp
tfd = tfp.distributions

# Training Loop

In [None]:
def nn(data, args):
    """
    Trains a feedforward NN. 
    """
    params = {
        'train_batch_size': 4096*32,
    }
    in_size = data['x_train'].shape[-1]
    out_size = data['y_train'].shape[-1]
    
    nn_model = NN(in_size, out_size)
    if args.restore:
        nn_model.load_weights('./policies/' + args.scenario.lower() + '_' + args.goal.lower() + '_ILDIST')
    optimizer = tf.keras.optimizers.Adam(learning_rate=args.lr)

    train_loss = tf.keras.metrics.Mean(name='train_loss')

    @tf.function
    def train_step(x, y):
        ######### Your code starts here #########
        # We want to perform a single training step (for one batch):
        # 1. Make a forward pass through the model
        # 2. Calculate the loss for the output of the forward pass
        # 3. Based on the loss calculate the gradient for all weights
        # 4. Run an optimization step on the weights.
        # Helpful Functions: tf.GradientTape(), tf.GradientTape.gradient(), tf.keras.Optimizer.apply_gradients
        with tf.GradientTape() as tape:
            # forward pass
            y_est = nn_model(x, training=True) # use dropout
            # compute the loss
            current_loss = loss(y_est, y)
        grads = tape.gradient(current_loss, nn_model.trainable_variables)
        optimizer.apply_gradients(zip(grads, nn_model.trainable_variables))
        ########## Your code ends here ##########

        train_loss(current_loss)

    @tf.function
    def train(train_data):
        for x, y in train_data:
            train_step(x, y)


    train_data = tf.data.Dataset.from_tensor_slices((data['x_train'], data['y_train'])).shuffle(100000).batch(params['train_batch_size'])

    for epoch in range(args.epochs):
        # Reset the metrics at the start of the next epoch
        train_loss.reset_states()

        train(train_data)

        template = 'Epoch {}, Loss: {}'
        print(template.format(epoch + 1, train_loss.result()))
    nn_model.save_weights('./policies/' + args.scenario.lower() + '_' + args.goal.lower() + '_ILDIST')

# Loss Function

In [None]:
class MixtureDensityModelErrorFinal(tf.keras.losses.Loss):

    def __init__(self, **kwargs):
        super(MixtureDensityModelErrorFinal, self).__init__()

    def call(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.cast(y_pred, dtype=tf.float32)
        self.z_mu = y_pred[:, :2]
        self.z_sigma = y_pred[:, 2:]
        epsilon = 0.00001
        # print(self.z_mu.shape)
        # print(self.z_sigma.shape)
        # B, N = self.z_sigma.shape
        # self.z_sigma = tf.reshape(self.z_sigma, (B, int(N/2), int(N/2)))
        # covariance = self.z_sigma @ tf.transpose(self.z_sigma, perm=[0, 2, 1])
        scale_tril = tfp.math.fill_triangular(self.z_sigma) + epsilon
        # sigma = tf.matmul(scale_tril, tf.transpose(scale_tril, perm=[0, 2, 1]))
        # print(covariance.shape)
        mvn = tfd.MultivariateNormalTriL(loc=self.z_mu, scale_tril=scale_tril, allow_nan_stats=False)
        # E = tf.reduce_mean(tf.math.log(mvn.prob(y_true)), 0)
        E = tf.reduce_mean(mvn.log_prob(y_true), 0)
        return -1 * E


In [None]:
def loss(y_est, y):
    y = tf.cast(y, dtype=tf.float32)
    ######### Your code starts here #########
    # We want to compute the negative log-likelihood loss between y_est and y where
    # - y_est is the output of the network for a batch of observations,
    # - y is the actions the expert took for the corresponding batch of observations
    # At the end your code should return the scalar loss value.
    # HINT: You may find the classes of tensorflow_probability.distributions (imported as tfd) useful.
    #       In particular, you can use MultivariateNormalFullCovariance or MultivariateNormalTriL, but they are not the only way.
    # loss_object = MixtureDensityModelError(num_means=2, num_kernels=3)
    loss_object = MixtureDensityModelErrorFinal()
    sample_weights = tf.constant(([0.8, 0.2]))
    y = y * sample_weights
    y_est = y_est * tf.constant(([0.8, 0.2, 0.1, 0.1, 0.1]))
    return loss_object(y, y_est)
    
    ########## Your code ends here ##########

# Model

In [None]:
class NN(tf.keras.Model):
    def __init__(self, in_size, out_size):
        super(NN, self).__init__()
        
        ######### Your code starts here #########
        # We want to define and initialize the weights & biases of the neural network.
        # - in_size is dim(O)
        # - out_size is dim(A) = 2
        # IMPORTANT: out_size is still 2 in this case, because the action space is 2-dimensional. But your network will output some other size as it is outputing a distribution!
        # HINT: You should use either of the following for weight initialization:
        #         - tf.keras.initializers.GlorotUniform (this is what we tried)
        #         - tf.keras.initializers.GlorotNormal
        #         - tf.keras.initializers.he_uniform or tf.keras.initializers.he_normal
        self.internal_layers = [
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(24, kernel_initializer=tf.keras.initializers.GlorotUniform(), activation='relu'),
            # tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(24, kernel_initializer=tf.keras.initializers.GlorotUniform(), activation='relu'),
            # tf.keras.layers.Dropout(0.2),
            # tf.keras.layers.Dense(12, kernel_initializer=tf.keras.initializers.GlorotUniform(), activation='relu'),
        ]
        # num_outputs = (out_size + 2) * 3 # Removed after using only 6 ouputs
        num_outputs = out_size + 3
        self.layer_output = tf.keras.layers.Dense(num_outputs, kernel_initializer=tf.keras.initializers.GlorotUniform(), activation='relu')
        ########## Your code ends here ##########

    def call(self, x):
        x = tf.cast(x, dtype=tf.float32)
        ######### Your code starts here #########
        # We want to perform a forward-pass of the network. Using the weights and biases, this function should give the network output for x where:
        # x is a (?, |O|) tensor that keeps a batch of observations
        # IMPORTANT: First two columns of the output tensor must correspond to the mean vector!
        for i in range(len(self.internal_layers)):
            layer = self.internal_layers[i]
            x = layer(x)
        return self.layer_output(x)
        ########## Your code ends here ##########

# Run Training Loop

In [None]:
args = parser
args.scenario = "intersection"
args.restore = False
args.goal = "left"
args.epochs = 3
args.lr = 0.0002
data = load_data(args)
# nn(data, args)