In [1]:
import numpy as np
import torch
from torch.distributions.multivariate_normal import MultivariateNormal

import matplotlib as plt
import matplotlib.image as mpimg

import plotly
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio

from skimage import io
import os

import math
import requests

In [2]:
plotly.__version__
pio.renderers.default = "iframe"

In [3]:
np.__version__

'1.16.6'

In [4]:
np.random.seed(0)

In [5]:
available_gpus = [torch.cuda.device(i) for i in range(torch.cuda.device_count())]
available_gpus

[<torch.cuda.device at 0x7f28615e67d0>]

In [6]:
class Bicycle:
    def __init__(self,x, y,theta, L):
        self.x = x
        self.y = y
        self.theta = theta
        self.steer = 0
        self.L = 0
        self.history = [(x,y,theta,0)]
        
    def get_state(self):
        return self.x, self.y, self.theta, self.steer
    
    def get_rollout_actions(self, actions, delta_t):
        
        
        curstate = (self.x, self.y, self.theta, self.steer)
        states = [curstate]
        for action in actions:
            velocity=action[0]
            steer_rate = action[1]
            x, y, theta, steer = curstate
            x_dot = velocity*math.cos(theta) 
            y_dot = velocity*math.sin(theta) 
            theta_dot = steer
            if(self.L is not 0):
                theta_dot = velocity/(self.L/math.tan(steer))

            x+= x_dot * delta_t
            y += y_dot * delta_t
            theta+= theta_dot * delta_t
            steer+= steer_rate * delta_t

            if(torch.is_tensor(x)):
                x = x.item()
            if(torch.is_tensor(y)):
                y = y.item()
            if(torch.is_tensor(theta)):
                theta = theta.item()
            if(torch.is_tensor(steer)):
                steer = steer.item()
        
            states.append((x, y, theta, steer))
            curstate = (x, y, theta, steer)
            
        return states
    
    
    def dynamics_batch_horizon(self, state, actions, delta_t, dev="cpu"): 
        x = torch.unsqueeze(state[:,0], 1)
        y = torch.unsqueeze(state[:,1], 1)
        theta = torch.unsqueeze(state[:,2], 1)
        steer = torch.unsqueeze(state[:,3], 1)
        # batchified for many controls and states, overall timesteps
        velocity = actions[:,:,0]
        steer_rate = actions[:, :, 1]      
            
        horizon_states = torch.empty((actions.shape[0], actions.shape[1],
                                          state.shape[1]), dtype=state.dtype, device =dev)
        
        steer_dot = steer_rate #steer_dot
        steer_displacement = torch.cumsum(steer_dot * delta_t, axis=1) # sum across all t
        horizon_states[:,:,3] = steer + steer_displacement # all steer positions across the horizon
        
        
        if(self.L is not 0):
            theta_dot  = velocity/(self.L/torch.tan(steer))
        else:
            theta_dot = horizon_states[:,:,3] #theta_dot
            
        theta_displacement = torch.cumsum(theta_dot * delta_t, axis=1) # sum across all t
        horizon_states[:,:,2] = theta + theta_displacement # all theta positions across the horizon
        
        x_dot = velocity*torch.cos(horizon_states[:,:,2]) # x_dot based on theta
        x_displacement = torch.cumsum(x_dot * delta_t, axis=1) # sum across all t
        horizon_states[:,:,0] = x +x_displacement # all x positions across the horizon
        
        y_dot =  velocity*torch.sin(horizon_states[:,:,2]) # y_dot based on theta
        y_displacement = torch.cumsum(y_dot * delta_t, axis=1) # sum across all t
        horizon_states[:,:,1] = y+y_displacement # all x positions across the horizon
        
        # horizon_states should be of shape : K x M

        return horizon_states
    
    def update(self, velocity, steer_rate, delta_t):
        x_dot = velocity*math.cos(self.theta) 
        y_dot = velocity*math.sin(self.theta) 
        theta_dot = self.steer
        if(self.L is not 0):
            theta_dot = velocity/(self.L/math.tan(self.steer))
        
        self.x+= x_dot * delta_t
        self.y += y_dot * delta_t
        self.theta+= theta_dot * delta_t
        self.steer+= steer_rate * delta_t
        
        if(torch.is_tensor(self.x)):
            self.x = self.x.item()
        if(torch.is_tensor(self.y)):
            self.y = self.y.item()
        if(torch.is_tensor(self.theta)):
            self.theta = self.theta.item()
        if(torch.is_tensor(self.steer)):
            self.steer = self.steer.item()
        
        self.history.append((self.x, self.y, self.theta, self.steer))
        
        
    def get_history(self):
        return self.history
        
        
        

In [7]:
class Simulation:
    def __init__(self, robot, controller, env, size=(100,100), timestep=1):
        self.robot = robot
        self.x_size = size[0]
        self.y_size = size[1]
        self.timestep = timestep
        self.steps = 0
        self.env = env
        self.save_snapshot()
        self.steps+=1
        self.controller = controller
        
    def run(self, iterations=10, write_snapshots = True, 
            write_rollouts=False, write_controls=False, 
            write_rollouts_num= 1, write_rate = 5):
        for step in range(iterations):
            velocity, steer_rate = self.controller.find_control(robot.get_state())
            if(write_snapshots and self.steps%write_rate==0):
                rollouts = None
                controls = None
                if(write_rollouts):
                    rollouts = self.controller.states

                if(write_controls):
                    controls = self.controller.ctrl 

                self.save_snapshot(projected_rollouts=rollouts, 
                               selected_controls=controls, n_rollouts = write_rollouts_num)
            self.robot.update(velocity, steer_rate, self.timestep)
            self.steps+=1
        self.save_snapshot(full_history = True)
        
        
    def save_snapshot(self, projected_rollouts=None, selected_controls=None, n_rollouts = 1, full_history=False):
        
        img = io.imread('grid_small.jpg')
        fig = px.imshow(img, color_continuous_scale = "greys", 
                        origin = "lower",width=800, height=800)
        
        goal_2_x0 = self.env.goal_point[0]-2
        goal_2_y0 = self.env.goal_point[1]-2
        goal_2_x1 = self.env.goal_point[0]+2
        goal_2_y1 = self.env.goal_point[1]+2
        
        fig.add_shape(type="circle",
                xref="x", yref="y",
                x0=goal_2_x0, y0=goal_2_y0,
                x1=goal_2_x1, y1=goal_2_y1,
                line=dict(
                    color="RoyalBlue",
                    width=0,
                ),
                fillcolor="Yellow",opacity = 0.75,
            )

        goal_5_x0 = self.env.goal_point[0]-5
        goal_5_y0 = self.env.goal_point[1]-5
        goal_5_x1 = self.env.goal_point[0]+5
        goal_5_y1 = self.env.goal_point[1]+5
        
        fig.add_shape(type="circle",
                xref="x", yref="y",
                x0=goal_5_x0, y0=goal_5_y0,
                x1=goal_5_x1, y1=goal_5_y1,
                line=dict(
                    color="RoyalBlue",
                    width=0,
                ),
                fillcolor="Yellow",opacity = 0.5,
            )
        
        goal_10_x0 = self.env.goal_point[0]-10
        goal_10_y0 = self.env.goal_point[1]-10
        goal_10_x1 = self.env.goal_point[0]+10
        goal_10_y1 = self.env.goal_point[1]+10
        
        fig.add_shape(type="circle",
                xref="x", yref="y",
                x0=goal_10_x0, y0=goal_10_y0,
                x1=goal_10_x1, y1=goal_10_y1,
                line=dict(
                    color="RoyalBlue",
                    width=0,
                ),
                fillcolor="Yellow",opacity = 0.25,
            )
        
        if full_history:
            history = self.robot.get_history()
            fig.add_scatter(x = [coord[0] for coord in history],
                       y = [coord[1] for coord in history],
                       mode = "markers",
                       marker =dict(color="green", opacity=1, size=10))
            


            arrow_list = []
            
            for x1, y1, theta, steer in history:
                arrow = go.layout.Annotation(dict(
                                x=x1+(5*math.cos(theta)),
                                y=y1+(5*math.sin(theta)),
                                xref="x", yref="y",
                                text="",
                                showarrow=True,
                                axref="x", ayref='y',
                                ax=x1,
                                ay=y1,
                                arrowhead=3,
                                arrowwidth=2,
                                arrowcolor='red',)
                            )
                arrow_list.append(arrow)
                
                
        else:
            fig.add_scatter(x = [self.robot.history[-1][0]],
                       y = [self.robot.history[-1][1]],
                       mode = "markers",
                       marker =dict(color="red", opacity=1, size=10))
            
            if(projected_rollouts is not None):
                horizon = len(projected_rollouts)
                x_coords = torch.Tensor.cpu(projected_rollouts[0:n_rollouts, :,0])
                y_coords = torch.Tensor.cpu(projected_rollouts[0:n_rollouts, :,1])
                for rollout in range(n_rollouts):
                     fig.add_trace(go.Scatter(x = x_coords[rollout],
                       y = y_coords[rollout],
                       mode = "lines+markers",
                       marker =dict(color="orange", opacity=1, size=10)))
                    
                    
            if(selected_controls is not None):
                selected_rollout = robot.get_rollout_actions(selected_controls, self.timestep)
                fig.add_trace(go.Scatter(x = [coord[0] for coord in selected_rollout],
                       y = [coord[1] for coord in selected_rollout],
                       mode = "lines+markers",
                       marker =dict(color="green", opacity=1, size=10)))

            arrow_list = []
            x1 , y1, theta, steer = self.robot.history[-1]
            arrow = go.layout.Annotation(dict(
                            x=x1+(5*math.cos(theta)),
                            y=y1+(5*math.sin(theta)),
                            xref="x", yref="y",
                            text="",
                            showarrow=True,
                            axref="x", ayref='y',
                            ax=x1,
                            ay=y1,
                            arrowhead=3,
                            arrowwidth=2,
                            arrowcolor='red',)
                        )
            arrow_list.append(arrow)
            
        fig.update_layout(showlegend=False)
        fig.update_layout(annotations=arrow_list)
        fig.update_layout(coloraxis_showscale=False,
                      margin=dict(l=0, r=0, t=0, b=0))
        fig.update_xaxes(range=[0, 100], autorange=False, scaleratio=1)
        fig.update_yaxes(range=[0, 100], autorange=False, scaleratio=1)
        if not os.path.exists("sim_images"):
                os.mkdir("sim_images")
        
        if full_history:
            fig.write_image("sim_images/sim_full.png")
        else:
            fig.write_image("sim_images/sim_"+str(self.steps)+".png")
            
    def display_run(self):
        img_list = []
        k = 1
        
        if not os.path.exists("sim_images"):
            return
        for step in range(self.steps):
            if os.path.isfile("sim_images/sim_"+str(step)+".png"):
                img = io.imread("sim_images/sim_"+str(step)+".png")
                img_list.append(img)
                os.remove("sim_images/sim_"+str(step)+".png")

        # If no images, don't do anything
        if len(img_list) == 0:
            return

        img_list = np.array(img_list)
        fig = px.imshow(img_list, animation_frame=0)
        fig.update_layout(showlegend=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.show()
        
    def display_history(self):
        if not os.path.exists("sim_images"):
            return
        
        img = io.imread("sim_images/sim_full.png")    

        fig = px.imshow(img)
        fig.update_layout(showlegend=False)
        fig.update_xaxes(visible=False)
        fig.update_yaxes(visible=False)
        fig.show()
        
        
    

In [8]:
class Map:# may have to instantiate map class for more complex costmaps and stuff
    def __init__(self, goal_point):
        self.goal_point = goal_point
    def running_cost_batch_horizon(self, states, actions, dt):
        #batchified for many time points, states and controls
        x = states[:,:,0]
        y = states[:,:,1]
        theta = states[:,:,2]
        steer = states[:,:,3]
        velocity = actions[:,:,0]
        steer_rate = actions[:,:,1]
        return torch.sqrt(((x-self.goal_point[0]) * (x-self.goal_point[0])) + ((y-self.goal_point[1]) * (y-self.goal_point[1])))/20
    def terminal_state_cost_batch(self, state):
        x = state[:,0]
        y = state[:,1]
        theta = state[:,2]
        steer = state[:,3]
        return torch.sqrt(((x-self.goal_point[0]) * (x-self.goal_point[0])) + ((y-self.goal_point[1]) * (y-self.goal_point[1])))
    

In [9]:
class MPPI:
    def __init__(self, robot, state_dim, ctrl_dim, noise_mu, noise_sigma, u_min, u_max, dynamics,
                 running_cost, terminal_state_cost,
                 num_samples = 1000, horizon = 15, lambda_=1., sample_null_action= False,
                 timestep=1, device = "cpu"):
        self.robot = robot
        self.K = num_samples
        self.T = horizon
        self.nx = state_dim # call this m
        self.nu = ctrl_dim # call this n
        self.timestep = timestep
        self.device = device
        self.sample_null_action = sample_null_action
        
        
        
        #tunable params:
        #number of samples, timesteps, control_min, control_max, 
        #dynamics, costmap
        #sampling distribution (noise_sigma), lambda (temperature) = free energy
        self.noise_mu = noise_mu.to(self.device)
        self.noise_sigma = noise_sigma.to(self.device)
        self.dtype = noise_sigma.dtype
        self.noise_distr = MultivariateNormal(self.noise_mu, covariance_matrix = self.noise_sigma)
        self.noise_sigma_inv = torch.inverse(self.noise_sigma)
        self.lambda_ = lambda_ #larger temperatures enable more exploration
        
        self.u_min = torch.tensor(u_min,dtype=self.dtype, device=self.device)
        self.u_max = torch.tensor(u_max, dtype=self.dtype, device = self.device)
        
        self.u_init = torch.zeros(ctrl_dim, dtype=self.dtype, device = self.device)
        
        self.ctrl = self.noise_distr.sample((self.T, ))
        
        self.ctrl.to(self.device)
        
        
        
        self.total_cost = None
        self.total_cost_exponent = None
        self.omega = None
        self.states = None
        self.bounded_samples = None
        
        self.dynamics = dynamics
        self.running_cost = running_cost
        self.terminal_state_cost = terminal_state_cost
    
    def find_control(self, state):
        self.ctrl = torch.roll(self.ctrl, -1, dims=0) #controls of shape (T, N) (t timesteps)
        
        self.ctrl[-1] = self.u_init #u_init is of shape (N)
        #roll controls back one timestep given that first one should have already been used
        
        if(not torch.is_tensor(state)):
            state = torch.tensor(state, dtype=self.dtype, device = self.device)
            
        self.cost_total = self.compute_total_cost(state) #S (Epsilon(k)) = total cost for each rollout 
        # shape: (K,)
        beta = torch.min(self.cost_total) #min total cost, 1 value
        self.total_cost_exponent = torch.exp(-1/self.lambda_ * (self.cost_total - beta)) 
        
        # exponent expression for all k
        eta = torch.sum(self.total_cost_exponent) #summation across all k of exponent, 1 value
        self.omega = (1./eta) * self.total_cost_exponent #weight omega(k) for each K, 1 value 
        #shape: (K,)
    
        perturbations = torch.sum(self.omega[:,None,None] * self.bounded_samples, dim=0) #size: (T,N)
        
        
        #omega is (K,), samples is (K,T, N), multiplication broadcasts to (K, N)
        #ends with summation of K, with t values in perturbations: (T, N)
        
        #add perturbations to controls as per algo for all T
        self.ctrl = self.ctrl+perturbations #shape : (T, N)

        
        return self.ctrl[0] # of size N
    
    def reset(self):
        self.ctrl = self.noise_distr.sample((self.T, ))
                                      
    def compute_total_cost(self, state):
        samples = self.noise_distr.rsample((self.K, self.T)) # sample noise, shape: (K, T, N)
        perturbed_action = self.ctrl + samples # change actions by noise, 
        #shape: (T, N) + (K, T, N)
        if self.sample_null_action:
            perturbed_action[self.K - 1] = 0
            
        bounded_perturbed_action = self._bound_action(perturbed_action) # bound action limits
        self.bounded_samples = bounded_perturbed_action-self.ctrl
        
        action_cost = self.lambda_ * torch.matmul(torch.abs(self.bounded_samples), self.noise_sigma_inv)
        perturbation_cost = torch.sum(self.ctrl * action_cost, dim=(1, 2))
        #matrix multiplication part over all T
        
                                            
        rollout_cost, self.states, actions = self.compute_rollout_costs(state, bounded_perturbed_action)
        #written as q(x_t) and phi(x_t) in the paper
        #states is Tx K x N
        self.cost_total = rollout_cost + perturbation_cost
        return self.cost_total
                                            
    def compute_rollout_costs(self, state, perturbed_actions):
        if len(state.shape) is 1:
            state = torch.unsqueeze(state, 0)
            state = state.repeat(self.K, 1)
            
        states = self.dynamics(state, perturbed_actions, self.timestep, dev = self.device)
        horizon_rollout_costs = self.running_cost(states, perturbed_actions, self.timestep)
        
        rollout_costs = torch.sum(horizon_rollout_costs, axis = 1)
        rollout_costs += self.terminal_state_cost(states[:,-1,:]) # shape: (K,) , takes in last state
        
        return rollout_costs, states, perturbed_actions
                
    def _bound_action(self, action):
        bounded_action = torch.empty_like(action, dtype=self.dtype, device = self.device)
            
        if self.u_max is not None and self.u_min is not None:
            max_compare = torch.ones_like(action, dtype=self.dtype, device = self.device) * self.u_max
            min_compare = torch.ones_like(action, dtype=self.dtype, device = self.device) * self.u_min
            bounded_action = torch.max(torch.min(action, max_compare), min_compare)
                
            return bounded_action
        return action


In [None]:
robot = Bicycle(80, 20, 0, 0)
env = Map(goal_point = (20,90))
mppi_sample_center= torch.tensor([0., 0.])
mppi_sample_covariance = torch.tensor([[25.,0.],[0., 1.]])
step = .2


controller = MPPI(robot, 4, 2, 
                  mppi_sample_center, mppi_sample_covariance,  (0, -2), (10, 2), 
                  robot.dynamics_batch_horizon, env.running_cost_batch_horizon, env.terminal_state_cost_batch,
                 num_samples = 128, horizon = 16, lambda_=.01, sample_null_action = True,
                  timestep = step, device=torch.device('cuda:0'))
s = Simulation(robot, controller, env, timestep = step)
s.run(iterations=100, write_snapshots = True, 
      write_rollouts=True, write_controls = True, write_rollouts_num = 10,
     write_rate = 1)
s.display_run()
s.display_history()