In [1]:
import CalSim as cs
import numpy as np
from controllers.force_controllers import *
from trajectories import *
import matplotlib.pyplot as plt

# ENv imports:
from CalSim.core.controller import ControllerManager, Controller
from CalSim.core.state_estimation import ObserverManager
from CalSim.core.dynamics import Dynamics, DiscreteDynamics

import trimesh
import time

In [2]:
def points_to_obstacles(point_cloud_arr, radius):
    qObs = point_cloud_arr.T
    rObs = [radius] * point_cloud_arr.shape[0]
    numObs = point_cloud_arr.shape[0]
    print(numObs)
    return cs.ObstacleManager(qObs, rObs, NumObs = numObs)

In [3]:
import time

class Environment:
    def __init__(self, dynamics, controller = None, observer = None, obstacleManager = None, T = 10, pointCloud = None):
        """
        Initializes a simulation environment
        Args:
            dynamics (Dynamics): system dynamics object
            controller (Controller): system controller object
            observer (Observer): system state estimation object
            obstacleManager (ObstacleManager): obstacle objects
            T (Float): simulation time
        """
        print("Hi")
        #store system parameters
        self.dynamics = dynamics #store system dynamics
        self.obstacleManager = obstacleManager #store manager for any obstacles present
        self.pointCloud = None

        #if observer and controller are none, create default objects
        if observer is not None:
            self.observer = observer
        else:
            #create a default noise-free observer manager
            self.observer = ObserverManager(dynamics)
        if controller is not None:
            self.controller = controller
        else:
            #create a default zero input controller manager (using the skeleton Controller class)
            self.controller = ControllerManager(self.observer, Controller, None, None, None)
        
        #define environment parameters
        self.iter = 0 #number of iterations
        self.t = 0 #time in seconds 
        self.done = False
        
        #Store system state
        self.x = self.dynamics.get_state() #Actual state of the system
        self.x0 = self.x #store initial condition for use in reset
        self.xObsv = None #state as read by the observer
        self.ptCloud = None #point cloud state as read by vision

        if not pointCloud is None:
            self.pointCloud = pointCloud
            self.tree = KDTree(self.pointCloud)
        
        #Define simulation parameters
        if not self.dynamics.check_discrete():
            #If the dynamics are not discrete, set frequency params. as cont. time
            self.SIM_FREQ = 1000 #integration frequency in Hz
            self.CONTROL_FREQ = 50 #control frequency in Hz
            self.SIMS_PER_STEP = self.SIM_FREQ//self.CONTROL_FREQ
            self.TOTAL_SIM_TIME = T #total simulation time in s
            self.TOTAL_ITER = self.TOTAL_SIM_TIME*self.CONTROL_FREQ + 1 #total number of iterations
        else:
            #if dynamics are discrete, set the frequency to be 1 (1 time step)
            self.SIM_FREQ = 1
            self.CONTROL_FREQ = 1
            self.SIMS_PER_STEP = 1
            self.TOTAL_SIM_TIME = T #T now represents total sime time
            self.TOTAL_ITER = self.TOTAL_SIM_TIME*self.CONTROL_FREQ + 1 #total number of iterations
        
        #Define history arrays
        self.xHist = np.zeros((self.dynamics.sysStateDimn, self.TOTAL_ITER))
        self.uHist = np.zeros((self.dynamics.sysInputDimn, self.TOTAL_ITER))
        self.tHist = np.zeros((1, self.TOTAL_ITER))

        self.total_step_time = 0
        self.total_steps = 0
        
    def reset(self):
        """
        Reset the gym environment to its inital state.
        """
        #Reset gym environment parameters
        self.iter = 0 #number of iterations
        self.t = 0 #time in seconds
        self.done = False
        
        #Reset system state
        self.x = self.x0 #retrieves initial condiiton
        self.xObsv = None #reset observer state
        
        #Define history arrays
        self.xHist = np.zeros((self.dynamics.sysStateDimn, self.TOTAL_ITER))
        self.uHist = np.zeros((self.dynamics.sysInputDimn, self.TOTAL_ITER))
        self.tHist = np.zeros((1, self.TOTAL_ITER))
    
    def k_closest_points_kdtree(self, position, k):
        distances, indices = self.tree.query(position, k=k)
        return self.pointCloud[indices]

    def step(self):
        """
        Step the sim environment by one integration
        """
        start_time = time.time()
        #retrieve current state information
        if self.pointCloud is not None:
            closest_points = self.k_closest_points_kdtree(self.xHist[0:3, self.iter].T, 10)
            new_obstacle_manager = points_to_obstacles(closest_points, 0.1)
            self.controller.depthCam = new_obstacle_manager
            self.obstacleManager = new_obstacle_manager

        self._get_observation() #updates the observer
        
        #solve for the control input using the observed state
        self.controller.set_input(self.t)

        #update the deterministic system data, iterations, and history array
        self._update_data()
        
        #Zero order hold over the controller frequency and step dynamics
        for i in range(self.SIMS_PER_STEP):
            self.dynamics.integrate(self.controller.get_input(), self.t, 1/self.SIM_FREQ) #integrate dynamics
            self.t += 1/self.SIM_FREQ #increment the time
        
        self.total_step_time += (time.time() - start_time)
        self.total_steps += 1
    
    def _update_data(self):
        """
        Update history arrays and deterministic state data
        """
        #append the input, time, and state to their history queues
        self.xHist[:, self.iter] = self.x.reshape((self.dynamics.sysStateDimn, ))
        self.uHist[:, self.iter] = (self.controller.get_input()).reshape((self.dynamics.sysInputDimn, ))
        self.tHist[:, self.iter] = self.t
        
        #update the actual state of the system
        self.x = self.dynamics.get_state()
        
        #update the number of iterations of the step function
        self.iter +=1
    
    def _get_observation(self):
        """
        Updates self.xObsv using the observer data
        Useful for debugging state information.
        """
        self.xObsv = self.observer.get_state()
    
    def _get_reward(self):
        """
        Calculate the total reward for ths system and update the reward parameter.
        Only implement for use in reinforcement learning.
        """
        return 0
    
    def _is_done(self):
        """
        Check if the simulation is complete
        Returns:
            boolean: whether or not the time has exceeded the total simulation time
        """
        #check if we have exceeded the total number of iterations
        if self.iter >= self.TOTAL_ITER:
            return True
        return False
    
    def run(self, N = 1, run_vis = True, verbose = False, obsManager = None):
        """
        Function to run the simulation N times
        Inputs:
            N (int): number of simulation examples to run
            run_vis (bool): run the visualization of the results
        Returns:
            self.xHist, self.uHist, self.tHist
        """
        start_time = time.time()
        #loop over an overall simulation N times
        for i in range(N):
            self.reset()
            print("Running Simulation.")
            while not self._is_done():
                if verbose:
                    print("Simulation Time Remaining: ", self.TOTAL_SIM_TIME - self.t)
                self.step() #step the environment while not done
            if run_vis:
                self.visualize() #render the result
        #return the history arrays
        print("Avg step time:", self.total_step_time/ self.total_steps)
        print("Total time:", time.time() - start_time)
        return self.xHist, self.uHist, self.tHist
            
    def visualize(self):
        """
        Provide visualization of the environment
        Inputs:
            obsManager (Obstacle Manager, optional): manager to plot obstacles in the animation
        """
        self.dynamics.show_animation(self.xHist, self.uHist, self.tHist, obsManager = self.obstacleManager)
        self.dynamics.show_plots(self.xHist, self.uHist, self.tHist, obsManager = self.obstacleManager)

In [12]:
obst_rad = 0.1
pos0 = np.array([[-1.270207247, -1.988134284, -1]]).T
vel0 = np.zeros((3, 1))
omega0 = np.zeros((3, 1))
R0 = np.eye(3).reshape((9, 1))
x0 = np.vstack((pos0, R0, omega0, vel0))

#create a dynamics object for the double integrator
dynamics = cs.Qrotor3D(x0)

#create an observer
observerManager = cs.ObserverManager(dynamics)

#create an obstacle - old code
# qObs = np.array([[0, 1, 1], [0, 0.5, 2]]).T
# rObs = [0.25, 0.25]
# obstacleManager = cs.ObstacleManager(qObs, rObs, NumObs = 2)

mesh = trimesh.load_mesh(f"/scratch/rhm4nj/cral/datasets/Replica-Dataset/ReplicaSDK/room_0/mesh.ply")
coord = np.array(mesh.vertices) 

def downsample_point_cloud_random(point_cloud, num_points):
    if point_cloud.shape[0] <= num_points:
        return point_cloud  # Already small enough

    indices = np.random.choice(point_cloud.shape[0], num_points, replace=False)
    return point_cloud[indices]

coord = downsample_point_cloud_random(coord, min(coord.shape[0], 20_000))

obstacleManager = points_to_obstacles(coord, obst_rad)
depthManager = cs.DepthCamManager(observerManager, obstacleManager, mean = None, sd = None)

20000


In [13]:
point_cloud = np.transpose(depthManager.get_depth_cam_i(0).calc_ptcloud_world())

In [14]:
import k3d

#visualize point cloud with k3d
print(point_cloud.shape, depthManager.get_depth_cam_i(0).numPts, depthManager.get_depth_cam_i(0).obstacleManager.NumObs)
print("Created point cloud.")
# point_cloud = downsample_point_cloud_random(point_cloud, 50000)
# print("Created smaller point cloud.")

plot = k3d.plot()
print("Created k3d plot.")
points = k3d.points(point_cloud, point_size=obst_rad)
print("Created points for k3d plot.")

plot += points
print("Added points to plot.")
plot.display()
print("Plotted point cloud.")



(80000, 3) 8 20000
Created point cloud.
Created k3d plot.
Created points for k3d plot.
Added points to plot.


Output()

Plotted point cloud.


In [15]:

point_cloud = np.transpose(depthManager.get_depth_cam_i(0).calc_ptcloud_world())
#create a trajectory
posD = np.array([[-0.283352436, 0.56045748, -1]]).T
xD = np.vstack((posD, R0, omega0, vel0))
trajManager = TrajectoryManager(x0, xD, Ts = 0.05, N = 1)

#create a controller manager with a basic FF controller
controllerManager = cs.ControllerManager(observerManager, QRotorGeometricPD, None, trajManager, depthManager)


No barrier/lyapunov object passed in.


In [None]:
#create an environment
env = Environment(dynamics, controllerManager, observerManager, obstacleManager, T = 1)
env.reset()

#run the simulation
run_vis = False
anim = env.run(verbose=True, run_vis=False)
print("DONE")
if run_vis:
    env.visualize()

# with tree
# Avg step time: 1.23531642147139
# Total time: 64.61537647247314

Hi


TypeError: 'float' object cannot be interpreted as an integer

In [None]:


# np.savetxt(trajectory, anim[0][0:3, :].T)
# new_plot = k3d.plot()
# points = k3d.points(point_cloud, color=0x0000FF, point_size=0.001)
# trajectory = k3d.points(anim[0][0:3, :].T, color=0xFFFF00, point_size = 0.01)

# new_plot += points
# new_plot += trajectory
# new_plot.display()

# # print(anim)
# # print(anim[0])

# #Possible trajectory
# # print(anim[1])

# #Possible timesteps
# # print(anim[2])
# # print(anim[2])


NameError: name 'trajectory' is not defined