diff --git a/safe_control_gym/controllers/firmware/firmware_wrapper.py b/safe_control_gym/controllers/firmware/firmware_wrapper.py index 19fc48352..60fde3c4e 100644 --- a/safe_control_gym/controllers/firmware/firmware_wrapper.py +++ b/safe_control_gym/controllers/firmware/firmware_wrapper.py @@ -1,22 +1,29 @@ +from __future__ import annotations + import numpy as np import logging import math import os +from typing import Callable import pybullet as p from munch import munchify from scipy.spatial.transform import Rotation as R from safe_control_gym.controllers.base_controller import BaseController +from safe_control_gym.envs.gym_pybullet_drones.quadrotor import Quadrotor import pycffirmware as firm logger = logging.getLogger(__name__) class FirmwareWrapper(BaseController): - ACTION_DELAY = 0 # how many firmware loops run between the controller commanding an action and the drone motors responding to it - SENSOR_DELAY = 0 # how many firmware loops run between experiencing a motion and the sensors registering it - STATE_DELAY = 0 # not yet supported, keep 0 + # Number of firmware loop iterations between the controller commanding an action and the drone + # motors responding to it + ACTION_DELAY = 0 + # Number of firmware loop iterations between experiencing a motion and the sensors registering + # it + SENSOR_DELAY = 0 CONTROLLER = "mellinger" # specifies controller type # Configurations to match firmware. Not recommended to change @@ -25,19 +32,23 @@ class FirmwareWrapper(BaseController): QUAD_FORMATION_X = True MOTOR_SET_ENABLE = True + # Motor and power supply settings + BRUSHED = True + SUPPLY_VOLTAGE = 3 # QUESTION: Is change of battery life worth simulating? + RAD_TO_DEG = 180 / math.pi def __init__( self, - env_func, - firmware_freq, - ctrl_freq, - PWM2RPM_SCALE=0.2685, - PWM2RPM_CONST=4070.3, - KF=3.16e-10, - MIN_PWM=20000, - MAX_PWM=65535, - verbose=False, + env_func: Callable[[], Quadrotor], + firmware_freq: int, + ctrl_freq: int, + PWM2RPM_SCALE: float = 0.2685, + PWM2RPM_CONST: float = 4070.3, + KF: float = 3.16e-10, + MIN_PWM: int = 20000, + MAX_PWM: int = 65535, + verbose: bool = False, **kwargs, ): """Initializes a FirmwareWrapper object. @@ -77,47 +88,6 @@ def __init__( self.env = env_func() - def __repr__(self): - ret = "" - ret += "======= EMULATOR STATUS =======\n" - ret += " \n" - ret += " Tick: {self.tick}\n" - ret += " \n" - ret += " State\n" - ret += " -------------------------------\n" - ret += f" {'Pos':>6}: {round(self.state.position.x, 5):>8}x, {round(self.state.position.y, 5):>8}y, {round(self.state.position.z, 5):>8}z\n" - ret += f" {'Vel':>6}: {round(self.state.velocity.x, 5):>8}x, {round(self.state.velocity.y, 5):>8}y, {round(self.state.velocity.z, 5):>8}z\n" - ret += f" {'Acc':>6}: {round(self.state.acc.x, 5):>8}x, {round(self.state.acc.y, 5):>8}y, {round(self.state.acc.z, 5):>8}z\n" - ret += f" {'RPY':>6}: {round(self.state.attitude.roll, 5):>8}, {round(self.state.attitude.pitch, 5):>8}, {round(self.state.attitude.yaw, 5):>8}\n" - ret += " \n" - - if self.verbose: - ret += " Setpoint\n" - ret += " -------------------------------\n" - ret += f" {'Pos':>6}: {round(self.setpoint.position.x, 5):>8}x, {round(self.setpoint.position.y, 5):>8}y, {round(self.setpoint.position.z, 5):>8}z\n" - # ret += f" {'Pos':>6}: {int((self.setpoint.position.x+1)*10) * ' ' + ' ':<25}x, {int((self.setpoint.position.y+1)*10) * ' ' + ' ':<25}y, {int((self.setpoint.position.z+1)*10) * ' ' + ' ':<25}z\n" - ret += f" {'Vel':>6}: {round(self.setpoint.velocity.x, 5):>8}x, {round(self.setpoint.velocity.y, 5):>8}y, {round(self.setpoint.velocity.z, 5):>8}z\n" - ret += f" {'Acc':>6}: {round(self.setpoint.acceleration.x, 5):>8}x, {round(self.setpoint.acceleration.y, 5):>8}y, {round(self.setpoint.acceleration.z, 5):>8}z\n" - ret += f" {'Thrust':>6}: {round(self.setpoint.thrust, 5):>8}\n" - ret += " \n" - ret += " Control\n" - ret += " -------------------------------\n" - ret += f" {'Roll':>6}: {self.control.roll:>8}\n" - ret += f" {'Pitch':>6}: {self.control.pitch:>8}\n" - ret += f" {'Yaw':>6}: {self.control.yaw:>8}\n" - ret += f" {'Thrust':>6}: {round(self.control.thrust, 5):>8}\n" - ret += " \n" - - ret += " Action\n" - ret += " -------------------------------\n" - ret += f" {'M1':>6}: {round(self.action[0], 3):>8}\n" - ret += f" {'M2':>6}: {round(self.action[1], 3):>8}\n" - ret += f" {'M3':>6}: {round(self.action[2], 3):>8}\n" - ret += f" {'M4':>6}: {round(self.action[3], 3):>8}\n" - ret += " \n" - ret += "===============================\n" - return ret - # region Controller functions def reset(self): """Resets the firmware_wrapper object. @@ -131,9 +101,7 @@ def reset(self): # Initialize history self.action_history = [[0, 0, 0, 0] for _ in range(self.ACTION_DELAY)] self.sensor_history = [[[0, 0, 0], [0, 0, 0]] for _ in range(self.SENSOR_DELAY)] - self.state_history = [ - [[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]] for _ in range(self.STATE_DELAY) - ] + self.state_history = [] # Initialize gyro lpf self.acclpf = [firm.lpf2pData() for _ in range(3)] @@ -215,15 +183,15 @@ def reset(self): def close(self): self.env.close() - def step(self, sim_time, action): + def step(self, sim_time: float, action: np.ndarray): """Step the firmware_wrapper class and its environment. This function should be called once at the rate of ctrl_freq. Step processes and high level commands, and runs the firmware loop and simulator according to the frequencies set. Args: - sim_time (float): the time in seconds since beginning the episode - action (np.ndarray(4)): motor control forces to be applied. Order is: front left, back + sim_time (float): Time in seconds since beginning the episode + action (np.ndarray(4)): Motor control forces to be applied. Order is: front left, back left, back right, front right. Todo: @@ -275,16 +243,13 @@ def step(self, sim_time, action): # Update state state_timestamp = int(self.tick / self.firmware_freq * 1e3) - if self.STATE_DELAY: - raise NotImplementedError("State delay is not implemented. Leave at 0.") - else: - self._update_state( - state_timestamp, - cur_pos, - cur_vel, - cur_acc, - cur_rpy * self.RAD_TO_DEG, - ) + self._update_state( + state_timestamp, + cur_pos, + cur_vel, + cur_acc, + cur_rpy * self.RAD_TO_DEG, + ) # , quat=cur_quat) # Update sensor data sensor_timestamp = int(self.tick / self.firmware_freq * 1e6) @@ -345,7 +310,6 @@ def close_results_dict(self): self.results_dict["done"] = np.vstack(self.results_dict["done"]) self.results_dict["info"] = np.vstack(self.results_dict["info"]) self.results_dict["action"] = np.vstack(self.results_dict["action"]) - self.results_dict = munchify(self.results_dict) # endregion @@ -565,9 +529,8 @@ def _sendFullStateCmd(self, pos, vel, acc, yaw, rpy_rate, timestep): self.setpoint.mode.pitch = firm.modeDisable self.setpoint.mode.yaw = firm.modeDisable - self.setpoint.timestamp = int( - timestep * 1000 - ) # TODO: This may end up skipping control loops + # TODO: This may end up skipping control loops + self.setpoint.timestamp = int(timestep * 1000) self.full_state_cmd_override = True def sendTakeoffCmd(self, height, duration): @@ -694,21 +657,16 @@ def _notifySetpointStop(self): firm.crtpCommanderHighLevelTellState(self.state) self.full_state_cmd_override = False - BRUSHED = True - SUPPLY_VOLTAGE = 3 # QUESTION: Is change of battery life worth simulating? - def _motorsGetPWM(self, thrust): if self.BRUSHED: thrust = thrust / 65536 * 60 volts = -0.0006239 * thrust**2 + 0.088 * thrust percentage = min(1, volts / self.SUPPLY_VOLTAGE) ratio = percentage * self.MAX_PWM - return ratio - else: - raise NotImplementedError( - "Emulator does not support the brushless motor configuration at this time." - ) + raise NotImplementedError( + "Emulator does not support the brushless motor configuration at this time." + ) def _limitThrust(self, val): if val > self.MAX_PWM: @@ -764,6 +722,49 @@ def _powerDistribution(self, control_t): # endregion + # region Printing + def __repr__(self): + ret = "" + ret += "======= EMULATOR STATUS =======\n" + ret += " \n" + ret += f" Tick: {self.tick}\n" + ret += " \n" + ret += " State\n" + ret += " -------------------------------\n" + ret += f" {'Pos':>6}: {round(self.state.position.x, 5):>8}x, {round(self.state.position.y, 5):>8}y, {round(self.state.position.z, 5):>8}z\n" + ret += f" {'Vel':>6}: {round(self.state.velocity.x, 5):>8}x, {round(self.state.velocity.y, 5):>8}y, {round(self.state.velocity.z, 5):>8}z\n" + ret += f" {'Acc':>6}: {round(self.state.acc.x, 5):>8}x, {round(self.state.acc.y, 5):>8}y, {round(self.state.acc.z, 5):>8}z\n" + ret += f" {'RPY':>6}: {round(self.state.attitude.roll, 5):>8}, {round(self.state.attitude.pitch, 5):>8}, {round(self.state.attitude.yaw, 5):>8}\n" + ret += " \n" + + if self.verbose: + ret += " Setpoint\n" + ret += " -------------------------------\n" + ret += f" {'Pos':>6}: {round(self.setpoint.position.x, 5):>8}x, {round(self.setpoint.position.y, 5):>8}y, {round(self.setpoint.position.z, 5):>8}z\n" + ret += f" {'Vel':>6}: {round(self.setpoint.velocity.x, 5):>8}x, {round(self.setpoint.velocity.y, 5):>8}y, {round(self.setpoint.velocity.z, 5):>8}z\n" + ret += f" {'Acc':>6}: {round(self.setpoint.acceleration.x, 5):>8}x, {round(self.setpoint.acceleration.y, 5):>8}y, {round(self.setpoint.acceleration.z, 5):>8}z\n" + ret += f" {'Thrust':>6}: {round(self.setpoint.thrust, 5):>8}\n" + ret += " \n" + ret += " Control\n" + ret += " -------------------------------\n" + ret += f" {'Roll':>6}: {self.control.roll:>8}\n" + ret += f" {'Pitch':>6}: {self.control.pitch:>8}\n" + ret += f" {'Yaw':>6}: {self.control.yaw:>8}\n" + ret += f" {'Thrust':>6}: {round(self.control.thrust, 5):>8}\n" + ret += " \n" + + ret += " Action\n" + ret += " -------------------------------\n" + ret += f" {'M1':>6}: {round(self.action[0], 3):>8}\n" + ret += f" {'M2':>6}: {round(self.action[1], 3):>8}\n" + ret += f" {'M3':>6}: {round(self.action[2], 3):>8}\n" + ret += f" {'M4':>6}: {round(self.action[3], 3):>8}\n" + ret += " \n" + ret += "===============================\n" + return ret + + # endregion + # region Utils def _get_quaternion_from_euler(roll, pitch, yaw): diff --git a/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py b/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py index 613472ef4..5efd1792a 100644 --- a/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py +++ b/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py @@ -5,6 +5,7 @@ """ import os +import logging import math from copy import deepcopy import casadi as cs @@ -17,8 +18,10 @@ from safe_control_gym.math_and_models.symbolic_systems import SymbolicModel from safe_control_gym.envs.gym_pybullet_drones.base_aviary import BaseAviary from safe_control_gym.envs.gym_pybullet_drones.quadrotor_utils import QuadType, cmd2pwm, pwm2rpm -from safe_control_gym.math_and_models.normalization import normalize_angle -from safe_control_gym.math_and_models.transformations import projection_matrix, transform_trajectory, csRotXYZ +from safe_control_gym.math_and_models.transformations import transform_trajectory, csRotXYZ + +logger = logging.getLogger(__name__) + class Quadrotor(BaseAviary): """1D and 2D quadrotor environment task. @@ -27,110 +30,57 @@ class Quadrotor(BaseAviary): multiple cost functions, stabilization and trajectory tracking references. """ + NAME = "quadrotor" AVAILABLE_CONSTRAINTS = deepcopy(GENERAL_CONSTRAINTS) - DISTURBANCE_MODES = { # Set at runtime by QUAD_TYPE - "observation": { - "dim": -1 - }, - "action": { - "dim": -1 - }, - "dynamics": { - "dim": -1 - } + DISTURBANCE_MODES = { # Set at runtime by QUAD_TYPE + "observation": {"dim": -1}, + "action": {"dim": -1}, + "dynamics": {"dim": -1}, } INERTIAL_PROP_RAND_INFO = { - "M": { # Nominal: 0.027 - 'distrib': "uniform", - 'low': 0.022, - 'high': 0.032 + "M": { # Nominal: 0.027 + "distrib": "uniform", + "low": 0.022, + "high": 0.032, }, - "Ixx": { # Nominal: 1.4e-5 - 'distrib': "uniform", - 'low': 1.3e-5, - 'high': 1.5e-5 + "Ixx": { # Nominal: 1.4e-5 + "distrib": "uniform", + "low": 1.3e-5, + "high": 1.5e-5, }, - "Iyy": { # Nominal: 1.4e-5 - 'distrib': "uniform", - 'low': 1.3e-5, - 'high': 1.5e-5 + "Iyy": { # Nominal: 1.4e-5 + "distrib": "uniform", + "low": 1.3e-5, + "high": 1.5e-5, + }, + "Izz": { # Nominal: 2.17e-5 + "distrib": "uniform", + "low": 2.07e-5, + "high": 2.27e-5, }, - "Izz": { # Nominal: 2.17e-5 - 'distrib': "uniform", - 'low': 2.07e-5, - 'high': 2.27e-5 - } } INIT_STATE_RAND_INFO = { - "init_x": { - 'distrib': "uniform", - 'low': -0.5, - 'high': 0.5 - }, - "init_x_dot": { - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 - }, - "init_y": { - 'distrib': "uniform", - 'low': -0.5, - 'high': 0.5 - }, - "init_y_dot": { - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 - }, - "init_z": { - 'distrib': "uniform", - 'low': 0.1, - 'high': 1.5 - }, - "init_z_dot": { - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 - }, - "init_phi": { - 'distrib': "uniform", - 'low': -0.3, - 'high': 0.3 - }, - "init_theta": { - 'distrib': "uniform", - 'low': -0.3, - 'high': 0.3 - }, - "init_psi": { - 'distrib': "uniform", - 'low': -0.3, - 'high': 0.3 - }, - "init_p": { - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 - }, + "init_x": {"distrib": "uniform", "low": -0.5, "high": 0.5}, + "init_x_dot": {"distrib": "uniform", "low": -0.01, "high": 0.01}, + "init_y": {"distrib": "uniform", "low": -0.5, "high": 0.5}, + "init_y_dot": {"distrib": "uniform", "low": -0.01, "high": 0.01}, + "init_z": {"distrib": "uniform", "low": 0.1, "high": 1.5}, + "init_z_dot": {"distrib": "uniform", "low": -0.01, "high": 0.01}, + "init_phi": {"distrib": "uniform", "low": -0.3, "high": 0.3}, + "init_theta": {"distrib": "uniform", "low": -0.3, "high": 0.3}, + "init_psi": {"distrib": "uniform", "low": -0.3, "high": 0.3}, + "init_p": {"distrib": "uniform", "low": -0.01, "high": 0.01}, "init_theta_dot": { # Only used in 2D quad. - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 + "distrib": "uniform", + "low": -0.01, + "high": 0.01, }, - "init_q": { - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 - }, - "init_r": { - 'distrib': "uniform", - 'low': -0.01, - 'high': 0.01 - } + "init_q": {"distrib": "uniform", "low": -0.01, "high": 0.01}, + "init_r": {"distrib": "uniform", "low": -0.01, "high": 0.01}, } TASK_INFO = { @@ -145,20 +95,21 @@ class Quadrotor(BaseAviary): "proj_normal": [0, 1, 1], } - def __init__(self, - init_state=None, - inertial_prop=None, - # custom args - quad_type: QuadType = QuadType.TWO_D, - norm_act_scale=0.1, - obs_goal_horizon=0, - rew_state_weight=1.0, - rew_act_weight=0.0001, - rew_exponential=True, - done_on_out_of_bound=True, - info_mse_metric_state_weight=None, - **kwargs - ): + def __init__( + self, + init_state=None, + inertial_prop=None, + # custom args + quad_type: QuadType = QuadType.TWO_D, + norm_act_scale=0.1, + obs_goal_horizon=0, + rew_state_weight=1.0, + rew_act_weight=0.0001, + rew_exponential=True, + done_on_out_of_bound=True, + info_mse_metric_state_weight=None, + **kwargs, + ): """Initialize a quadrotor environment. Args: @@ -184,42 +135,69 @@ def __init__(self, self.done_on_out_of_bound = done_on_out_of_bound if info_mse_metric_state_weight is None: if self.QUAD_TYPE == QuadType.ONE_D: - self.info_mse_metric_state_weight = np.array([1,0], ndmin=1, dtype=float) + self.info_mse_metric_state_weight = np.array([1, 0], ndmin=1, dtype=float) elif self.QUAD_TYPE == QuadType.TWO_D: - self.info_mse_metric_state_weight = np.array([1,0,1,0,0,0], ndmin=1, dtype=float) + self.info_mse_metric_state_weight = np.array( + [1, 0, 1, 0, 0, 0], ndmin=1, dtype=float + ) elif self.QUAD_TYPE == QuadType.THREE_D: - self.info_mse_metric_state_weight = np.array([1,0,1,0,1,0,0,0,0,0,0,0], ndmin=1, dtype=float) + self.info_mse_metric_state_weight = np.array( + [1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0], ndmin=1, dtype=float + ) else: - raise ValueError("[ERROR] in Quadrotor.__init__(), not implemented quad type.") + raise ValueError(f"Quad type {self.QUAD_TYPE} not implemented.") else: - if (self.QUAD_TYPE == QuadType.ONE_D and len(info_mse_metric_state_weight)==2) or \ - (self.QUAD_TYPE == QuadType.TWO_D and len(info_mse_metric_state_weight)==6) or \ - (self.QUAD_TYPE == QuadType.THREE_D and len(info_mse_metric_state_weight)==12): - self.info_mse_metric_state_weight = np.array(info_mse_metric_state_weight, ndmin=1, dtype=float) + if ( + (self.QUAD_TYPE == QuadType.ONE_D and len(info_mse_metric_state_weight) == 2) + or (self.QUAD_TYPE == QuadType.TWO_D and len(info_mse_metric_state_weight) == 6) + or (self.QUAD_TYPE == QuadType.THREE_D and len(info_mse_metric_state_weight) == 12) + ): + self.info_mse_metric_state_weight = np.array( + info_mse_metric_state_weight, ndmin=1, dtype=float + ) else: - raise ValueError("[ERROR] in Quadrotor.__init__(), wrong info_mse_metric_state_weight argument size.") - - # BaseAviary constructor, called after defining the custom args, - # since some BenchmarkEnv init setup can be task(custom args)-dependent. + raise ValueError("Wrong info_mse_metric_state_weight argument size.") + + # BaseAviary constructor, called after defining the custom args, + # since some BenchmarkEnv init setup can be task(custom args)-dependent. super().__init__(init_state=init_state, inertial_prop=inertial_prop, **kwargs) # Store initial state info. self.INIT_STATE_LABELS = { QuadType.ONE_D: ["init_x", "init_x_dot"], - QuadType.TWO_D: ["init_x", "init_x_dot", "init_z", "init_z_dot", "init_theta", "init_theta_dot"], - QuadType.THREE_D: ["init_x", "init_x_dot", "init_y", "init_y_dot", "init_z", "init_z_dot", - "init_phi", "init_theta", "init_psi", "init_p", "init_q", "init_r"] + QuadType.TWO_D: [ + "init_x", + "init_x_dot", + "init_z", + "init_z_dot", + "init_theta", + "init_theta_dot", + ], + QuadType.THREE_D: [ + "init_x", + "init_x_dot", + "init_y", + "init_y_dot", + "init_z", + "init_z_dot", + "init_phi", + "init_theta", + "init_psi", + "init_p", + "init_q", + "init_r", + ], } if init_state is None: - for init_name in self.INIT_STATE_RAND_INFO: # Default zero state. - self.__dict__[init_name.upper()] = 0. + for init_name in self.INIT_STATE_RAND_INFO: # Default zero state. + self.__dict__[init_name.upper()] = 0.0 else: if isinstance(init_state, np.ndarray): # Full state as numpy array . for i, init_name in enumerate(self.INIT_STATE_LABELS[self.QUAD_TYPE]): self.__dict__[init_name.upper()] = init_state[i] elif isinstance(init_state, dict): # Partial state as dictionary. for init_name in self.INIT_STATE_LABELS[self.QUAD_TYPE]: - self.__dict__[init_name.upper()] = init_state.get(init_name, 0.) + self.__dict__[init_name.upper()] = init_state.get(init_name, 0.0) else: raise ValueError("[ERROR] in Quadrotor.__init__(), init_state incorrect format.") @@ -237,7 +215,7 @@ def __init__(self, # Only randomize Iyy for the 2D quadrotor. self.INERTIAL_PROP_RAND_INFO.pop("Ixx", None) self.INERTIAL_PROP_RAND_INFO.pop("Izz", None) - + # Override inertial properties of passed as arguments. if inertial_prop is None: pass @@ -254,75 +232,99 @@ def __init__(self, self.J[2, 2] = inertial_prop.get("Izz", self.J[2, 2]) else: raise ValueError("[ERROR] in Quadrotor.__init__(), inertial_prop incorrect format.") - + # Set prior/symbolic info. self._setup_symbolic() - + # Create X_GOAL and U_GOAL references for the assigned task. self.U_GOAL = np.ones(self.action_dim) * self.MASS * self.GRAVITY_ACC / self.action_dim if self.TASK == Task.STABILIZATION: if self.QUAD_TYPE == QuadType.ONE_D: self.X_GOAL = np.hstack( - [self.TASK_INFO["stabilization_goal"][1], - 0.0]) # x = {z, z_dot}. + [self.TASK_INFO["stabilization_goal"][1], 0.0] + ) # x = {z, z_dot}. elif self.QUAD_TYPE == QuadType.TWO_D: - self.X_GOAL = np.hstack([ - self.TASK_INFO["stabilization_goal"][0], 0.0, - self.TASK_INFO["stabilization_goal"][1], 0.0, 0.0, 0.0 - ]) # x = {x, x_dot, z, z_dot, theta, theta_dot}. + self.X_GOAL = np.hstack( + [ + self.TASK_INFO["stabilization_goal"][0], + 0.0, + self.TASK_INFO["stabilization_goal"][1], + 0.0, + 0.0, + 0.0, + ] + ) # x = {x, x_dot, z, z_dot, theta, theta_dot}. elif self.QUAD_TYPE == QuadType.THREE_D: - self.X_GOAL = np.hstack([ - self.TASK_INFO["stabilization_goal"][0], 0.0, - self.TASK_INFO["stabilization_goal"][1], 0.0, - self.TASK_INFO["stabilization_goal"][2], 0.0, - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 - ]) # x = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. + self.X_GOAL = np.hstack( + [ + self.TASK_INFO["stabilization_goal"][0], + 0.0, + self.TASK_INFO["stabilization_goal"][1], + 0.0, + self.TASK_INFO["stabilization_goal"][2], + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + ] + ) # x = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. elif self.TASK == Task.TRAJ_TRACKING: - POS_REF, \ - VEL_REF, \ - SPEED = self._generate_trajectory(traj_type=self.TASK_INFO["trajectory_type"], - traj_length=self.EPISODE_LEN_SEC, - num_cycles=self.TASK_INFO["num_cycles"], - traj_plane=self.TASK_INFO["trajectory_plane"], - position_offset=self.TASK_INFO["trajectory_position_offset"], - scaling=self.TASK_INFO["trajectory_scale"], - sample_time=self.CTRL_TIMESTEP - ) # Each of the 3 returned values is of shape (Ctrl timesteps, 3) + POS_REF, VEL_REF, SPEED = self._generate_trajectory( + traj_type=self.TASK_INFO["trajectory_type"], + traj_length=self.EPISODE_LEN_SEC, + num_cycles=self.TASK_INFO["num_cycles"], + traj_plane=self.TASK_INFO["trajectory_plane"], + position_offset=self.TASK_INFO["trajectory_position_offset"], + scaling=self.TASK_INFO["trajectory_scale"], + sample_time=self.CTRL_TIMESTEP, + ) # Each of the 3 returned values is of shape (Ctrl timesteps, 3) if self.QUAD_TYPE == QuadType.ONE_D: - self.X_GOAL = np.vstack([ - POS_REF[:, 2], # z - VEL_REF[:, 2] # z_dot - ]).transpose() + self.X_GOAL = np.vstack( + [ + POS_REF[:, 2], # z + VEL_REF[:, 2], # z_dot + ] + ).transpose() elif self.QUAD_TYPE == QuadType.TWO_D: - self.X_GOAL = np.vstack([ - POS_REF[:, 0], # x - VEL_REF[:, 0], # x_dot - POS_REF[:, 2], # z - VEL_REF[:, 2], # z_dot - np.zeros(POS_REF.shape[0]), # zeros - np.zeros(VEL_REF.shape[0]) - ]).transpose() + self.X_GOAL = np.vstack( + [ + POS_REF[:, 0], # x + VEL_REF[:, 0], # x_dot + POS_REF[:, 2], # z + VEL_REF[:, 2], # z_dot + np.zeros(POS_REF.shape[0]), # zeros + np.zeros(VEL_REF.shape[0]), + ] + ).transpose() elif self.QUAD_TYPE == QuadType.THREE_D: # Additional transformation of the originally planar trajectory. POS_REF_TRANS, VEL_REF_TRANS = transform_trajectory( - POS_REF, VEL_REF, trans_info={ - "point": self.TASK_INFO["proj_point"], + POS_REF, + VEL_REF, + trans_info={ + "point": self.TASK_INFO["proj_point"], "normal": self.TASK_INFO["proj_normal"], - }) - self.X_GOAL = np.vstack([ - POS_REF_TRANS[:, 0], # x - VEL_REF_TRANS[:, 0], # x_dot - POS_REF_TRANS[:, 1], # y - VEL_REF_TRANS[:, 1], # y_dot - POS_REF_TRANS[:, 2], # z - VEL_REF_TRANS[:, 2], # z_dot - np.zeros(POS_REF_TRANS.shape[0]), # zeros - np.zeros(POS_REF_TRANS.shape[0]), - np.zeros(POS_REF_TRANS.shape[0]), - np.zeros(VEL_REF_TRANS.shape[0]), - np.zeros(VEL_REF_TRANS.shape[0]), - np.zeros(VEL_REF_TRANS.shape[0]) - ]).transpose() + }, + ) + self.X_GOAL = np.vstack( + [ + POS_REF_TRANS[:, 0], # x + VEL_REF_TRANS[:, 0], # x_dot + POS_REF_TRANS[:, 1], # y + VEL_REF_TRANS[:, 1], # y_dot + POS_REF_TRANS[:, 2], # z + VEL_REF_TRANS[:, 2], # z_dot + np.zeros(POS_REF_TRANS.shape[0]), # zeros + np.zeros(POS_REF_TRANS.shape[0]), + np.zeros(POS_REF_TRANS.shape[0]), + np.zeros(VEL_REF_TRANS.shape[0]), + np.zeros(VEL_REF_TRANS.shape[0]), + np.zeros(VEL_REF_TRANS.shape[0]), + ] + ).transpose() # Equilibrium point at hover for linearization. self.X_EQ = np.zeros(self.state_dim) @@ -331,27 +333,26 @@ def __init__(self, # IROS 2022 - Load maze. self.OBSTACLES = [] self.GATES = [] - if 'obstacles' in kwargs: - self.OBSTACLES = kwargs['obstacles'] - if 'gates' in kwargs: - self.GATES = kwargs['gates'] - if 'randomized_gates_and_obstacles' in kwargs and kwargs['randomized_gates_and_obstacles']: + if "obstacles" in kwargs: + self.OBSTACLES = kwargs["obstacles"] + self.OBSTACLES = np.array(self.OBSTACLES) + if "gates" in kwargs: + self.GATES = kwargs["gates"] + self.GATES = np.array(self.GATES) + self.NUM_GATES = len(self.GATES) + self.n_obstacles = len(self.OBSTACLES) + if kwargs.get("randomized_gates_and_obstacles", False): self.RANDOMIZED_GATES_AND_OBS = True - if 'gates_and_obstacles_randomization_info' not in kwargs: - raise ValueError("[ERROR] Missing 'gates_and_obstacles_randomization_info' in YAML configuration.") - self.GATES_AND_OBS_RAND_INFO = kwargs['gates_and_obstacles_randomization_info'] + if "gates_and_obstacles_randomization_info" not in kwargs: + raise ValueError( + "Missing 'gates_and_obstacles_randomization_info' in YAML configuration." + ) + self.GATES_AND_OBS_RAND_INFO = kwargs["gates_and_obstacles_randomization_info"] else: self.RANDOMIZED_GATES_AND_OBS = False # - if 'done_on_collision' in kwargs: - self.DONE_ON_COLLISION = kwargs['done_on_collision'] - else: - self.DONE_ON_COLLISION = False - # - if 'done_on_completion' in kwargs: - self.DONE_ON_COMPLETION = kwargs['done_on_completion'] - else: - self.DONE_ON_COMPLETION = False + self.DONE_ON_COLLISION = kwargs.get("done_on_collision", False) + self.DONE_ON_COMPLETION = kwargs.get("done_on_completion", False) def reset(self): """(Re-)initializes the environment to start an episode. @@ -375,25 +376,31 @@ def reset(self): d_args = rand_info_copy["obstacles"].pop("args", []) d_kwargs = rand_info_copy["obstacles"] for obstacle in self.OBSTACLES: - obs_height = 0.525 # URDF dependent, places 'obstacle.urdf' at z == 0. + obs_height = 0.525 # URDF dependent, places 'obstacle.urdf' at z == 0. if self.RANDOMIZED_GATES_AND_OBS: - offset = np.array([distrib(*d_args, **d_kwargs), distrib(*d_args, **d_kwargs), obs_height]) + offset = np.array( + [distrib(*d_args, **d_kwargs), distrib(*d_args, **d_kwargs), obs_height] + ) pose_disturbance = np.array([0, 0, distrib(*d_args, **d_kwargs)]) else: offset = np.array([0, 0, obs_height]) pose_disturbance = np.array([0, 0, 0]) - TMP_ID = p.loadURDF(os.path.join(self.URDF_DIR, "obstacle.urdf"), - np.array(obstacle[0:3]) + offset, - p.getQuaternionFromEuler(np.array(obstacle[3:6])+pose_disturbance), - physicsClientId=self.PYB_CLIENT) - p.addUserDebugText(str(TMP_ID), - textPosition=[0, 0, 0.5], - textColorRGB=[1, 0, 0], - lifeTime=self.EPISODE_LEN_SEC, - textSize=1.5, - parentObjectUniqueId=TMP_ID, - parentLinkIndex=-1, - physicsClientId=self.PYB_CLIENT) + TMP_ID = p.loadURDF( + os.path.join(self.URDF_DIR, "obstacle.urdf"), + np.array(obstacle[0:3]) + offset, + p.getQuaternionFromEuler(np.array(obstacle[3:6]) + pose_disturbance), + physicsClientId=self.PYB_CLIENT, + ) + p.addUserDebugText( + str(TMP_ID), + textPosition=[0, 0, 0.5], + textColorRGB=[1, 0, 0], + lifeTime=self.EPISODE_LEN_SEC, + textSize=1.5, + parentObjectUniqueId=TMP_ID, + parentLinkIndex=-1, + physicsClientId=self.PYB_CLIENT, + ) self.OBSTACLES_IDS.append(TMP_ID) # self.GATES_IDS = [] @@ -406,34 +413,41 @@ def reset(self): for gate in self.GATES: if gate[6] == 0: urdf_file = "portal.urdf" - gate_height = 1. # URDF dependent, places 'portal.urdf' at z == 0. + gate_height = 1.0 # URDF dependent, places 'portal.urdf' at z == 0. elif gate[6] == 1: urdf_file = "low_portal.urdf" - gate_height = 0.525 # URDF dependent, places 'low_portal.urdf' at z == 0. + gate_height = 0.525 # URDF dependent, places 'low_portal.urdf' at z == 0. else: raise ValueError("[ERROR] Unknown gate type.") if self.RANDOMIZED_GATES_AND_OBS: - offset = np.array([distrib(*d_args, **d_kwargs), distrib(*d_args, **d_kwargs), gate_height]) + offset = np.array( + [distrib(*d_args, **d_kwargs), distrib(*d_args, **d_kwargs), gate_height] + ) pose_disturbance = np.array([0, 0, distrib(*d_args, **d_kwargs)]) else: offset = np.array([0, 0, gate_height]) pose_disturbance = np.array([0, 0, 0]) - self.EFFECTIVE_GATES_POSITIONS.append(list(np.array(gate[0:3]) + offset) + list(np.array(gate[3:6]) + pose_disturbance)) - TMP_ID = p.loadURDF(os.path.join(self.URDF_DIR, urdf_file), - np.array(gate[0:3]) + offset, - p.getQuaternionFromEuler(np.array(gate[3:6])+pose_disturbance), - physicsClientId=self.PYB_CLIENT) - p.addUserDebugText(str(TMP_ID), - textPosition=[0, 0, 0.5], - textColorRGB=[1, 0, 0], - lifeTime=self.EPISODE_LEN_SEC, - textSize=1.5, - parentObjectUniqueId=TMP_ID, - parentLinkIndex=-1, - physicsClientId=self.PYB_CLIENT) + self.EFFECTIVE_GATES_POSITIONS.append( + list(np.array(gate[0:3]) + offset) + list(np.array(gate[3:6]) + pose_disturbance) + ) + TMP_ID = p.loadURDF( + os.path.join(self.URDF_DIR, urdf_file), + np.array(gate[0:3]) + offset, + p.getQuaternionFromEuler(np.array(gate[3:6]) + pose_disturbance), + physicsClientId=self.PYB_CLIENT, + ) + p.addUserDebugText( + str(TMP_ID), + textPosition=[0, 0, 0.5], + textColorRGB=[1, 0, 0], + lifeTime=self.EPISODE_LEN_SEC, + textSize=1.5, + parentObjectUniqueId=TMP_ID, + parentLinkIndex=-1, + physicsClientId=self.PYB_CLIENT, + ) self.GATES_IDS.append(TMP_ID) # - self.NUM_GATES = len(self.GATES) self.current_gate = 0 # # Deactivate select collisions, e.g. between the ground plane and the drone @@ -443,7 +457,7 @@ def reset(self): # linkIndexB=-1, # enableCollision=0, # physicsClientId=self.PYB_CLIENT) - # + # # Initialize IROS-specific attributes. self.stepped_through_gate = False self.currently_collided = False @@ -456,57 +470,64 @@ def reset(self): "M": self.MASS, "Ixx": self.J[0, 0], "Iyy": self.J[1, 1], - "Izz": self.J[2, 2] + "Izz": self.J[2, 2], } if self.RANDOMIZED_INERTIAL_PROP: - prop_values = self._randomize_values_by_info( - prop_values, self.INERTIAL_PROP_RAND_INFO) + prop_values = self._randomize_values_by_info(prop_values, self.INERTIAL_PROP_RAND_INFO) if any(phy_quantity < 0 for phy_quantity in prop_values.values()): - raise ValueError("[ERROR] in Quadrotor.reset(), negative randomized inertial properties.") + raise ValueError( + "[ERROR] in Quadrotor.reset(), negative randomized inertial properties." + ) self.OVERRIDDEN_QUAD_MASS = prop_values["M"] self.OVERRIDDEN_QUAD_INERTIA = [prop_values["Ixx"], prop_values["Iyy"], prop_values["Izz"]] - + # Override inertial properties. p.changeDynamics( self.DRONE_IDS[0], linkIndex=-1, # Base link. mass=self.OVERRIDDEN_QUAD_MASS, localInertiaDiagonal=self.OVERRIDDEN_QUAD_INERTIA, - physicsClientId=self.PYB_CLIENT) + physicsClientId=self.PYB_CLIENT, + ) # Randomize initial state. - init_values = {init_name: self.__dict__[init_name.upper()] - for init_name in self.INIT_STATE_LABELS[self.QUAD_TYPE]} + init_values = { + init_name: self.__dict__[init_name.upper()] + for init_name in self.INIT_STATE_LABELS[self.QUAD_TYPE] + } if self.RANDOMIZED_INIT: init_values = self._randomize_values_by_info(init_values, self.INIT_STATE_RAND_INFO) - INIT_XYZ = [init_values.get("init_"+k, 0.) for k in ["x", "y", "z"]] - INIT_VEL = [init_values.get("init_"+k+"_dot", 0.) for k in ["x", "y", "z"]] - INIT_RPY = [init_values.get("init_"+k, 0.) for k in ["phi", "theta", "psi"]] + INIT_XYZ = [init_values.get("init_" + k, 0.0) for k in ["x", "y", "z"]] + INIT_VEL = [init_values.get("init_" + k + "_dot", 0.0) for k in ["x", "y", "z"]] + INIT_RPY = [init_values.get("init_" + k, 0.0) for k in ["phi", "theta", "psi"]] if self.QUAD_TYPE == QuadType.TWO_D: - INIT_ANG_VEL = [0, init_values.get("init_theta_dot", 0.), 0] + INIT_ANG_VEL = [0, init_values.get("init_theta_dot", 0.0), 0] else: - INIT_ANG_VEL = [init_values.get("init_"+k, 0.) for k in ["p", "q", "r"]] - p.resetBasePositionAndOrientation(self.DRONE_IDS[0], INIT_XYZ, - p.getQuaternionFromEuler(INIT_RPY), - physicsClientId=self.PYB_CLIENT) - p.resetBaseVelocity(self.DRONE_IDS[0], INIT_VEL, INIT_ANG_VEL, - physicsClientId=self.PYB_CLIENT) + INIT_ANG_VEL = [init_values.get("init_" + k, 0.0) for k in ["p", "q", "r"]] + p.resetBasePositionAndOrientation( + self.DRONE_IDS[0], + INIT_XYZ, + p.getQuaternionFromEuler(INIT_RPY), + physicsClientId=self.PYB_CLIENT, + ) + p.resetBaseVelocity( + self.DRONE_IDS[0], INIT_VEL, INIT_ANG_VEL, physicsClientId=self.PYB_CLIENT + ) # Update BaseAviary internal variables before calling self._get_observation(). self._update_and_store_kinematic_information() obs, info = self._get_observation(), self._get_reset_info() obs, info = super().after_reset(obs, info) - + # Return either an observation and dictionary or just the observation. if self.INFO_IN_RESET: return obs, info else: return obs - def step(self, action): """Advances the environment by one control step. - + Pass the commanded RPMs and the adversarial force to the superclass .step(). The PyBullet simulation is stepped PYB_FREQ/CTRL_FREQ times in BaseAviary. @@ -530,8 +551,7 @@ def step(self, action): if passive_disturb or adv_disturb: disturb_force = np.zeros(self.DISTURBANCE_MODES["dynamics"]["dim"]) if passive_disturb: - disturb_force = self.disturbances["dynamics"].apply( - disturb_force, self) + disturb_force = self.disturbances["dynamics"].apply(disturb_force, self) if adv_disturb and self.adv_action is not None: disturb_force = disturb_force + self.adv_action # Clear the adversary action, wait for the next one. @@ -547,17 +567,18 @@ def step(self, action): elif self.QUAD_TYPE == QuadType.THREE_D: disturb_force = np.asarray(disturb_force).flatten() - # Advance the simulation. + # Advance the simulation. super()._advance_simulation(rpm, disturb_force) # Standard Gym return. obs = self._get_observation() info = self._get_info() - done = self._get_done() # IROS 2022 - After _get_info() to use this step's 'self' attributes. - rew = self._get_reward() # IROS 2022 - After _get_info() to use this step's 'self' attributes. + # IROS 2022 - After _get_info() to use this step's 'self' attributes. + done = self._get_done() + rew = self._get_reward() obs, rew, done, info = super().after_step(obs, rew, done, info) return obs, rew, done, info - - def render(self, mode='human'): + + def render(self, mode="human"): """Retrieves a frame from PyBullet rendering. Args: @@ -567,14 +588,16 @@ def render(self, mode='human'): ndarray: A multidimensional array with the RGB frame captured by PyBullet's camera. """ - [w, h, rgb, dep, seg] = p.getCameraImage(width=self.RENDER_WIDTH, - height=self.RENDER_HEIGHT, - shadow=1, - viewMatrix=self.CAM_VIEW, - projectionMatrix=self.CAM_PRO, - renderer=p.ER_TINY_RENDERER, - flags=p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, - physicsClientId=self.PYB_CLIENT) + [w, h, rgb, dep, seg] = p.getCameraImage( + width=self.RENDER_WIDTH, + height=self.RENDER_HEIGHT, + shadow=1, + viewMatrix=self.CAM_VIEW, + projectionMatrix=self.CAM_PRO, + renderer=p.ER_TINY_RENDERER, + flags=p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, + physicsClientId=self.PYB_CLIENT, + ) # Image.fromarray(np.reshape(rgb, (h, w, 4)), 'RGBA').show() return np.reshape(rgb, (h, w, 4)) @@ -589,14 +612,14 @@ def _setup_symbolic(self): Iyy = self.J[1, 1] dt = self.CTRL_TIMESTEP # Define states. - z = cs.MX.sym('z') - z_dot = cs.MX.sym('z_dot') + z = cs.MX.sym("z") + z_dot = cs.MX.sym("z_dot") if self.QUAD_TYPE == QuadType.ONE_D: nx, nu = 2, 1 # Define states. X = cs.vertcat(z, z_dot) # Define input thrust. - T = cs.MX.sym('T') + T = cs.MX.sym("T") U = cs.vertcat(T) # Define dynamics equations. X_dot = cs.vertcat(z_dot, T / m - g) @@ -605,93 +628,105 @@ def _setup_symbolic(self): elif self.QUAD_TYPE == QuadType.TWO_D: nx, nu = 6, 2 # Define states. - x = cs.MX.sym('x') - x_dot = cs.MX.sym('x_dot') - theta = cs.MX.sym('theta') - theta_dot = cs.MX.sym('theta_dot') + x = cs.MX.sym("x") + x_dot = cs.MX.sym("x_dot") + theta = cs.MX.sym("theta") + theta_dot = cs.MX.sym("theta_dot") X = cs.vertcat(x, x_dot, z, z_dot, theta, theta_dot) # Define input thrusts. - T1 = cs.MX.sym('T1') - T2 = cs.MX.sym('T2') + T1 = cs.MX.sym("T1") + T2 = cs.MX.sym("T2") U = cs.vertcat(T1, T2) # Define dynamics equations. - X_dot = cs.vertcat(x_dot, - cs.sin(theta) * (T1 + T2) / m, z_dot, - cs.cos(theta) * (T1 + T2) / m - g, theta_dot, - l * (T2 - T1) / Iyy / np.sqrt(2)) + X_dot = cs.vertcat( + x_dot, + cs.sin(theta) * (T1 + T2) / m, + z_dot, + cs.cos(theta) * (T1 + T2) / m - g, + theta_dot, + l * (T2 - T1) / Iyy / np.sqrt(2), + ) # Define observation. Y = cs.vertcat(x, x_dot, z, z_dot, theta, theta_dot) elif self.QUAD_TYPE == QuadType.THREE_D: nx, nu = 12, 4 Ixx = self.J[0, 0] Izz = self.J[2, 2] - J = cs.blockcat([[Ixx, 0.0, 0.0], - [0.0, Iyy, 0.0], - [0.0, 0.0, Izz]]) - Jinv = cs.blockcat([[1.0/Ixx, 0.0, 0.0], - [0.0, 1.0/Iyy, 0.0], - [0.0, 0.0, 1.0/Izz]]) - gamma = self.KM/self.KF - x = cs.MX.sym('x') - y = cs.MX.sym('y') - phi = cs.MX.sym('phi') # Roll - theta = cs.MX.sym('theta') # Pitch - psi = cs.MX.sym('psi') # Yaw - x_dot = cs.MX.sym('x_dot') - y_dot = cs.MX.sym('y_dot') - p = cs.MX.sym('p') # Body frame roll rate - q = cs.MX.sym('q') # body frame pith rate - r = cs.MX.sym('r') # body frame yaw rate + J = cs.blockcat([[Ixx, 0.0, 0.0], [0.0, Iyy, 0.0], [0.0, 0.0, Izz]]) + Jinv = cs.blockcat( + [[1.0 / Ixx, 0.0, 0.0], [0.0, 1.0 / Iyy, 0.0], [0.0, 0.0, 1.0 / Izz]] + ) + gamma = self.KM / self.KF + x = cs.MX.sym("x") + y = cs.MX.sym("y") + phi = cs.MX.sym("phi") # Roll + theta = cs.MX.sym("theta") # Pitch + psi = cs.MX.sym("psi") # Yaw + x_dot = cs.MX.sym("x_dot") + y_dot = cs.MX.sym("y_dot") + p = cs.MX.sym("p") # Body frame roll rate + q = cs.MX.sym("q") # body frame pith rate + r = cs.MX.sym("r") # body frame yaw rate # PyBullet Euler angles use the SDFormat for rotation matrices. - Rob = csRotXYZ(phi, theta, psi) # rotation matrix transforming a vector in the body frame to the world frame. + Rob = csRotXYZ( + phi, theta, psi + ) # rotation matrix transforming a vector in the body frame to the world frame. # Define state variables. X = cs.vertcat(x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r) # Define inputs. - f1 = cs.MX.sym('f1') - f2 = cs.MX.sym('f2') - f3 = cs.MX.sym('f3') - f4 = cs.MX.sym('f4') + f1 = cs.MX.sym("f1") + f2 = cs.MX.sym("f2") + f3 = cs.MX.sym("f3") + f4 = cs.MX.sym("f4") U = cs.vertcat(f1, f2, f3, f4) - # From Ch. 2 of Luis, Carlos, and Jérôme Le Ny. "Design of a trajectory tracking controller for a - # nanoquadcopter." arXiv preprint arXiv:1608.05786 (2016). + # From Ch. 2 of Luis, Carlos, and Jérôme Le Ny. "Design of a trajectory tracking + # controller for a nanoquadcopter." arXiv preprint arXiv:1608.05786 (2016). # Defining the dynamics function. - # We are using the velocity of the base wrt to the world frame expressed in the world frame. - # Note that the reference expresses this in the body frame. - oVdot_cg_o = Rob @ cs.vertcat(0, 0, f1+f2+f3+f4)/m - cs.vertcat(0, 0, g) + # We are using the velocity of the base wrt to the world frame expressed in the world + # frame. Note that the reference expresses this in the body frame. + oVdot_cg_o = Rob @ cs.vertcat(0, 0, f1 + f2 + f3 + f4) / m - cs.vertcat(0, 0, g) pos_ddot = oVdot_cg_o pos_dot = cs.vertcat(x_dot, y_dot, z_dot) - Mb = cs.vertcat(l/cs.sqrt(2.0)*(f1+f2-f3-f4), - l/cs.sqrt(2.0)*(-f1+f2+f3-f4), - gamma*(f1-f2+f3-f4)) - rate_dot = Jinv @ (Mb - (cs.skew(cs.vertcat(p,q,r)) @ J @ cs.vertcat(p,q,r))) - ang_dot = cs.blockcat([[1, cs.sin(phi)*cs.tan(theta), cs.cos(phi)*cs.tan(theta)], - [0, cs.cos(phi), -cs.sin(phi)], - [0, cs.sin(phi)/cs.cos(theta), cs.cos(phi)/cs.cos(theta)]]) @ cs.vertcat(p, q, r) - X_dot = cs.vertcat(pos_dot[0], pos_ddot[0], pos_dot[1], pos_ddot[1], pos_dot[2], pos_ddot[2], ang_dot, rate_dot) + Mb = cs.vertcat( + l / cs.sqrt(2.0) * (f1 + f2 - f3 - f4), + l / cs.sqrt(2.0) * (-f1 + f2 + f3 - f4), + gamma * (f1 - f2 + f3 - f4), + ) + rate_dot = Jinv @ (Mb - (cs.skew(cs.vertcat(p, q, r)) @ J @ cs.vertcat(p, q, r))) + ang_dot = cs.blockcat( + [ + [1, cs.sin(phi) * cs.tan(theta), cs.cos(phi) * cs.tan(theta)], + [0, cs.cos(phi), -cs.sin(phi)], + [0, cs.sin(phi) / cs.cos(theta), cs.cos(phi) / cs.cos(theta)], + ] + ) @ cs.vertcat(p, q, r) + X_dot = cs.vertcat( + pos_dot[0], + pos_ddot[0], + pos_dot[1], + pos_ddot[1], + pos_dot[2], + pos_ddot[2], + ang_dot, + rate_dot, + ) Y = cs.vertcat(x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r) # Define cost (quadratic form). - Q = cs.MX.sym('Q', nx, nx) - R = cs.MX.sym('R', nu, nu) - Xr = cs.MX.sym('Xr', nx, 1) - Ur = cs.MX.sym('Ur', nu, 1) + Q = cs.MX.sym("Q", nx, nx) + R = cs.MX.sym("R", nu, nu) + Xr = cs.MX.sym("Xr", nx, 1) + Ur = cs.MX.sym("Ur", nu, 1) cost_func = 0.5 * (X - Xr).T @ Q @ (X - Xr) + 0.5 * (U - Ur).T @ R @ (U - Ur) # Define dynamics and cost dictionaries. dynamics = {"dyn_eqn": X_dot, "obs_eqn": Y, "vars": {"X": X, "U": U}} cost = { "cost_func": cost_func, - "vars": { - "X": X, - "U": U, - "Xr": Xr, - "Ur": Ur, - "Q": Q, - "R": R - } + "vars": {"X": X, "U": U, "Xr": Xr, "Ur": Ur, "Q": Q, "R": R}, } # Setup symbolic model. self.symbolic = SymbolicModel(dynamics=dynamics, cost=cost, dt=dt) @@ -700,43 +735,55 @@ def _set_action_space(self): """Returns the action space of the environment. Returns: - gym.spaces: The quadrotor environment's action space, of size 1 or 2 depending on QUAD_TYPE. + gym.spaces: The quadrotor environment's action space, of size 1 or 2 depending on + QUAD_TYPE. """ # Define action/input dimension, labels, and units. if self.QUAD_TYPE == QuadType.ONE_D: - action_dim = 1 - self.ACTION_LABELS = ['T'] - self.ACTION_UNITS = ['N'] if not self.NORMALIZED_RL_ACTION_SPACE else ['-'] + action_dim = 1 + self.ACTION_LABELS = ["T"] + self.ACTION_UNITS = ["N"] if not self.NORMALIZED_RL_ACTION_SPACE else ["-"] elif self.QUAD_TYPE == QuadType.TWO_D: - action_dim = 2 - self.ACTION_LABELS = ['T1', 'T2'] - self.ACTION_UNITS = ['N', 'N'] if not self.NORMALIZED_RL_ACTION_SPACE else ['-', '-'] + action_dim = 2 + self.ACTION_LABELS = ["T1", "T2"] + self.ACTION_UNITS = ["N", "N"] if not self.NORMALIZED_RL_ACTION_SPACE else ["-", "-"] elif self.QUAD_TYPE == QuadType.THREE_D: action_dim = 4 - self.ACTION_LABELS = ['T1', 'T2', 'T3', 'T4'] - self.ACTION_UNITS = ['N', 'N', 'N', 'N'] if not self.NORMALIZED_RL_ACTION_SPACE else ['-', '-', '-', '-'] + self.ACTION_LABELS = ["T1", "T2", "T3", "T4"] + self.ACTION_UNITS = ( + ["N", "N", "N", "N"] + if not self.NORMALIZED_RL_ACTION_SPACE + else ["-", "-", "-", "-"] + ) if self.NORMALIZED_RL_ACTION_SPACE: # Normalized thrust (around hover thrust). self.hover_thrust = self.GRAVITY_ACC * self.MASS / action_dim - self.action_space = spaces.Box(low=-np.ones(action_dim), - high=np.ones(action_dim), - dtype=np.float32) + self.action_space = spaces.Box( + low=-np.ones(action_dim), high=np.ones(action_dim), dtype=np.float32 + ) else: # Direct thrust control. n_motors = 4 / action_dim - a_low = self.KF * n_motors * (self.PWM2RPM_SCALE * self.MIN_PWM + self.PWM2RPM_CONST)**2 - a_high = self.KF * n_motors * (self.PWM2RPM_SCALE * self.MAX_PWM + self.PWM2RPM_CONST)**2 - self.action_space = spaces.Box(low=np.full(action_dim, a_low, np.float32), - high=np.full(action_dim, a_high, np.float32), - dtype=np.float32) + a_low = ( + self.KF * n_motors * (self.PWM2RPM_SCALE * self.MIN_PWM + self.PWM2RPM_CONST) ** 2 + ) + a_high = ( + self.KF * n_motors * (self.PWM2RPM_SCALE * self.MAX_PWM + self.PWM2RPM_CONST) ** 2 + ) + self.action_space = spaces.Box( + low=np.full(action_dim, a_low, np.float32), + high=np.full(action_dim, a_high, np.float32), + dtype=np.float32, + ) def _set_observation_space(self): """Returns the observation space of the environment. Returns: - gym.spaces: The bounded observation (state) space, of size 2, 6, or 12 depending on QUAD_TYPE. + gym.spaces: The bounded observation (state) space, of size 2, 6, or 12 depending on + QUAD_TYPE. """ self.x_threshold = 5 @@ -751,53 +798,113 @@ def _set_observation_space(self): # obs/state = {z, z_dot}. low = np.array([self.GROUND_PLANE_Z, -np.finfo(np.float32).max]) high = np.array([self.z_threshold, np.finfo(np.float32).max]) - self.STATE_LABELS = ['z', 'z_dot'] - self.STATE_UNITS = ['m', 'm/s'] + self.STATE_LABELS = ["z", "z_dot"] + self.STATE_UNITS = ["m", "m/s"] elif self.QUAD_TYPE == QuadType.TWO_D: # obs/state = {x, x_dot, z, z_dot, theta, theta_dot}. - low = np.array([ - -self.x_threshold, -np.finfo(np.float32).max, - self.GROUND_PLANE_Z, -np.finfo(np.float32).max, - -self.theta_threshold_radians, -np.finfo(np.float32).max - ]) - high = np.array([ - self.x_threshold, np.finfo(np.float32).max, - self.z_threshold, np.finfo(np.float32).max, - self.theta_threshold_radians, np.finfo(np.float32).max - ]) - self.STATE_LABELS = ['x', 'x_dot', 'z', 'z_dot', 'theta', 'theta_dot'] - self.STATE_UNITS = ['m', 'm/s', 'm', 'm/s', 'rad', 'rad/s'] + low = np.array( + [ + -self.x_threshold, + -np.finfo(np.float32).max, + self.GROUND_PLANE_Z, + -np.finfo(np.float32).max, + -self.theta_threshold_radians, + -np.finfo(np.float32).max, + ] + ) + high = np.array( + [ + self.x_threshold, + np.finfo(np.float32).max, + self.z_threshold, + np.finfo(np.float32).max, + self.theta_threshold_radians, + np.finfo(np.float32).max, + ] + ) + self.STATE_LABELS = ["x", "x_dot", "z", "z_dot", "theta", "theta_dot"] + self.STATE_UNITS = ["m", "m/s", "m", "m/s", "rad", "rad/s"] elif self.QUAD_TYPE == QuadType.THREE_D: # obs/state = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. - low = np.array([ - -self.x_threshold, -np.finfo(np.float32).max, - -self.y_threshold, -np.finfo(np.float32).max, - self.GROUND_PLANE_Z, -np.finfo(np.float32).max, - -self.phi_threshold_radians, -self.theta_threshold_radians, -self.psi_threshold_radians, - -np.finfo(np.float32).max, -np.finfo(np.float32).max, -np.finfo(np.float32).max - ]) - high = np.array([ - self.x_threshold, np.finfo(np.float32).max, - self.y_threshold, np.finfo(np.float32).max, - self.z_threshold, np.finfo(np.float32).max, - self.phi_threshold_radians, self.theta_threshold_radians, self.psi_threshold_radians, - np.finfo(np.float32).max, np.finfo(np.float32).max, np.finfo(np.float32).max - ]) - self.STATE_LABELS = ['x', 'x_dot', 'y', 'y_dot', 'z', 'z_dot', - 'phi', 'theta', 'psi', 'p', 'q', 'r'] - self.STATE_UNITS = ['m', 'm/s', 'm', 'm/s', 'm', 'm/s', - 'rad', 'rad', 'rad', 'rad/s', 'rad/s', 'rad/s'] + low = np.array( + [ + -self.x_threshold, + -np.finfo(np.float32).max, + -self.y_threshold, + -np.finfo(np.float32).max, + self.GROUND_PLANE_Z, + -np.finfo(np.float32).max, + -self.phi_threshold_radians, + -self.theta_threshold_radians, + -self.psi_threshold_radians, + -np.finfo(np.float32).max, + -np.finfo(np.float32).max, + -np.finfo(np.float32).max, + ] + ) + high = np.array( + [ + self.x_threshold, + np.finfo(np.float32).max, + self.y_threshold, + np.finfo(np.float32).max, + self.z_threshold, + np.finfo(np.float32).max, + self.phi_threshold_radians, + self.theta_threshold_radians, + self.psi_threshold_radians, + np.finfo(np.float32).max, + np.finfo(np.float32).max, + np.finfo(np.float32).max, + ] + ) + self.STATE_LABELS = [ + "x", + "x_dot", + "y", + "y_dot", + "z", + "z_dot", + "phi", + "theta", + "psi", + "p", + "q", + "r", + ] + self.STATE_UNITS = [ + "m", + "m/s", + "m", + "m/s", + "m", + "m/s", + "rad", + "rad", + "rad", + "rad/s", + "rad/s", + "rad/s", + ] # Define the state space for the dynamics. self.state_space = spaces.Box(low=low, high=high, dtype=np.float32) # Concatenate reference for RL. - if self.COST == Cost.RL_REWARD and self.TASK == Task.TRAJ_TRACKING and self.obs_goal_horizon > 0: + if ( + self.COST == Cost.RL_REWARD + and self.TASK == Task.TRAJ_TRACKING + and self.obs_goal_horizon > 0 + ): # Include future goal state(s). # e.g. horizon=1, obs = {state, state_target} mul = 1 + self.obs_goal_horizon low = np.concatenate([low] * mul) high = np.concatenate([high] * mul) - elif self.COST == Cost.RL_REWARD and self.TASK == Task.STABILIZATION and self.obs_goal_horizon > 0: + elif ( + self.COST == Cost.RL_REWARD + and self.TASK == Task.STABILIZATION + and self.obs_goal_horizon > 0 + ): low = np.concatenate([low] * 2) high = np.concatenate([high] * 2) @@ -811,7 +918,7 @@ def _setup_disturbances(self): self.DISTURBANCE_MODES["action"]["dim"] = self.action_dim self.DISTURBANCE_MODES["dynamics"]["dim"] = int(self.QUAD_TYPE) super()._setup_disturbances() - + def _preprocess_control(self, action): """Converts the action passed to .step() into motors' RPMs (ndarray of shape (4,)). @@ -829,7 +936,7 @@ def _preprocess_control(self, action): else: thrust = np.clip(action, self.action_space.low, self.action_space.high) if not np.array_equal(thrust, np.array(action)) and self.VERBOSE: - print("[WARNING]: action was clipped in Quadrotor._preprocess_control().") + logger.warning("Action was clipped in Quadrotor._preprocess_control().") self.current_preprocessed_action = thrust # Apply disturbances. if "action" in self.disturbances: @@ -837,7 +944,9 @@ def _preprocess_control(self, action): if self.adversary_disturbance == "action": thrust = thrust + self.adv_action # convert to quad motor rpm commands - pwm = cmd2pwm(thrust, self.PWM2RPM_SCALE, self.PWM2RPM_CONST, self.KF, self.MIN_PWM, self.MAX_PWM) + pwm = cmd2pwm( + thrust, self.PWM2RPM_SCALE, self.PWM2RPM_CONST, self.KF, self.MIN_PWM, self.MAX_PWM + ) rpm = pwm2rpm(pwm, self.PWM2RPM_SCALE, self.PWM2RPM_CONST) return rpm @@ -855,32 +964,22 @@ def _get_observation(self): self.state = np.hstack([pos[2], vel[2]]).reshape((2,)) elif self.QUAD_TYPE == QuadType.TWO_D: # {x, x_dot, z, z_dot, theta, theta_dot}. - self.state = np.hstack( - [pos[0], vel[0], pos[2], vel[2], rpy[1], ang_v[1]] - ).reshape((6,)) + self.state = np.hstack([pos[0], vel[0], pos[2], vel[2], rpy[1], ang_v[1]]).reshape((6,)) elif self.QUAD_TYPE == QuadType.THREE_D: - Rob = np.array(p.getMatrixFromQuaternion(self.quat[0])).reshape((3,3)) + Rob = np.array(p.getMatrixFromQuaternion(self.quat[0])).reshape((3, 3)) Rbo = Rob.T ang_v_body_frame = Rbo @ ang_v # {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p, q, r}. self.state = np.hstack( - # [pos[0], vel[0], pos[1], vel[1], pos[2], vel[2], rpy, ang_v] # Note: world ang_v != body frame pqr [pos[0], vel[0], pos[1], vel[1], pos[2], vel[2], rpy, ang_v_body_frame] ).reshape((12,)) - # if not np.array_equal(self.state, - # np.clip(self.state, self.observation_space.low, self.observation_space.high)): - # if self.GUI and self.VERBOSE: - # print( - # "[WARNING]: observation was clipped in Quadrotor._get_observation()." - # ) - # Apply observation disturbance. obs = deepcopy(self.state) if "observation" in self.disturbances: - obs = self.disturbances["observation"].apply(obs, self) + obs = self.disturbances["observation"].apply(obs, self) # Concatenate goal info (references state(s)) for RL. - obs = self.extend_obs(obs, self.ctrl_step_counter+1) + obs = self.extend_obs(obs, self.ctrl_step_counter + 1) return obs def _get_reward(self): @@ -902,7 +1001,7 @@ def _get_reward(self): dist = np.sum(self.rew_state_weight * state_error * state_error) dist += np.sum(self.rew_act_weight * act_error * act_error) if self.TASK == Task.TRAJ_TRACKING: - wp_idx = min(self.ctrl_step_counter, self.X_GOAL.shape[0]-1) + wp_idx = min(self.ctrl_step_counter, self.X_GOAL.shape[0] - 1) state_error = state - self.X_GOAL[wp_idx] dist = np.sum(self.rew_state_weight * state_error * state_error) dist += np.sum(self.rew_act_weight * act_error * act_error) @@ -915,19 +1014,29 @@ def _get_reward(self): # Control cost. if self.COST == Cost.QUADRATIC: if self.TASK == Task.STABILIZATION: - return float(-1 * self.symbolic.loss(x=self.state, - Xr=self.X_GOAL, - u=self.current_preprocessed_action, - Ur=self.U_GOAL, - Q=self.Q, - R=self.R)["l"]) + return float( + -1 + * self.symbolic.loss( + x=self.state, + Xr=self.X_GOAL, + u=self.current_preprocessed_action, + Ur=self.U_GOAL, + Q=self.Q, + R=self.R, + )["l"] + ) if self.TASK == Task.TRAJ_TRACKING: - return float(-1 * self.symbolic.loss(x=self.state, - Xr=self.X_GOAL[self.ctrl_step_counter,:], - u=self.current_preprocessed_action, - Ur=self.U_GOAL, - Q=self.Q, - R=self.R)["l"]) + return float( + -1 + * self.symbolic.loss( + x=self.state, + Xr=self.X_GOAL[self.ctrl_step_counter, :], + u=self.current_preprocessed_action, + Ur=self.U_GOAL, + Q=self.Q, + R=self.R, + )["l"] + ) # IROS 2022 - Competition sparse reward signal. if self.COST == Cost.COMPETITION: @@ -962,7 +1071,10 @@ def _get_done(self): """ # Done if goal reached for stabilization task with quadratic cost. if self.TASK == Task.STABILIZATION and self.COST == Cost.QUADRATIC: - self.goal_reached = bool(np.linalg.norm(self.state - self.X_GOAL) < self.TASK_INFO["stabilization_goal_tolerance"]) + self.goal_reached = bool( + np.linalg.norm(self.state - self.X_GOAL) + < self.TASK_INFO["stabilization_goal_tolerance"] + ) if self.goal_reached: return True @@ -984,8 +1096,9 @@ def _get_done(self): if self.QUAD_TYPE == QuadType.THREE_D: mask = np.array([1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0]) # Element-wise or to check out-of-bound conditions. - out_of_bound = np.logical_or(self.state < self.state_space.low, - self.state > self.state_space.high) + out_of_bound = np.logical_or( + self.state < self.state_space.low, self.state > self.state_space.high + ) # Mask out un-included dimensions (i.e. velocities) out_of_bound = np.any(out_of_bound * mask) # Early terminate if needed. @@ -1012,42 +1125,29 @@ def _get_info(self): if self.TASK == Task.STABILIZATION and self.COST == Cost.QUADRATIC: info["goal_reached"] = self.goal_reached # Add boolean flag for the goal being reached. # Add MSE. - state = deepcopy(self.state) + state = deepcopy(self.state) if self.TASK == Task.STABILIZATION: state_error = state - self.X_GOAL elif self.TASK == Task.TRAJ_TRACKING: - # TODO: should use angle wrapping + # TODO: should use angle wrapping # state[4] = normalize_angle(state[4]) - wp_idx = min(self.ctrl_step_counter, self.X_GOAL.shape[0]-1) + wp_idx = min(self.ctrl_step_counter, self.X_GOAL.shape[0] - 1) state_error = state - self.X_GOAL[wp_idx] # Filter only relevant dimensions. state_error = state_error * self.info_mse_metric_state_weight - info["mse"] = np.sum(state_error ** 2) + info["mse"] = np.sum(state_error**2) # Note: constraint_values and constraint_violations populated in benchmark_env. # IROS 2022 - Per-step info. # Collisions - # tmp1 = [p.getContactPoints(bodyA=obs_id, - # bodyB=self.DRONE_IDS[0], - # # linkIndexA=-1, linkIndexB=-1, - # physicsClientId=self.PYB_CLIENT) - # for obs_id in self.OBSTACLES_IDS] - # tmp2 = [p.getContactPoints(bodyA=gates_id, - # bodyB=self.DRONE_IDS[0], - # # linkIndexA=-1, linkIndexB=-1, - # physicsClientId=self.PYB_CLIENT) - # for gates_id in self.GATES_IDS] - # tmp3 = p.getContactPoints(bodyA=self.PLANE_ID, - # bodyB=self.DRONE_IDS[0], - # # linkIndexA=-1, linkIndexB=-1, - # physicsClientId=self.PYB_CLIENT) - # print(tmp1, tmp2, tmp3) for GATE_OBS_ID in self.GATES_IDS + self.OBSTACLES_IDS + [self.PLANE_ID]: - ret = p.getContactPoints(bodyA=GATE_OBS_ID, - bodyB=self.DRONE_IDS[0], - # linkIndexA=-1, linkIndexB=-1, - physicsClientId=self.PYB_CLIENT) + ret = p.getContactPoints( + bodyA=GATE_OBS_ID, + bodyB=self.DRONE_IDS[0], + # linkIndexA=-1, linkIndexB=-1, + physicsClientId=self.PYB_CLIENT, + ) if ret: info["collision"] = (GATE_OBS_ID, True) self.currently_collided = True @@ -1057,60 +1157,69 @@ def _get_info(self): self.currently_collided = False # # Gates progress (note: allow 0.5 seconds for initial drop if objects are not on the gound). - if self.pyb_step_counter > 0.5*self.PYB_FREQ and self.NUM_GATES > 0 and self.current_gate < self.NUM_GATES: + if ( + self.pyb_step_counter > 0.5 * self.PYB_FREQ + and self.NUM_GATES > 0 + and self.current_gate < self.NUM_GATES + ): x, y, _, _, _, rot = self.EFFECTIVE_GATES_POSITIONS[self.current_gate] if self.GATES[self.current_gate][6] == 0: - height = 1. # URDF dependent. + height = 1.0 # URDF dependent. elif self.GATES[self.current_gate][6] == 1: - height = 0.525 # URDF dependent. + height = 0.525 # URDF dependent. else: - raise ValueError("[ERROR] Unknown gate type.") - half_length = 0.1875 # Obstacle URDF dependent. - delta_x = 0.05*np.cos(rot) - delta_y = 0.05*np.sin(rot) - fr = [[x,y, height-half_length]] - to = [[x,y, height+half_length]] - for i in [1,2, 3]: - fr.append([x+i*delta_x, y+i*delta_y, height-half_length]) - fr.append([x-i*delta_x, y-i*delta_y, height-half_length]) - to.append([x+i*delta_x, y+i*delta_y, height+half_length]) - to.append([x-i*delta_x, y-i*delta_y, height+half_length]) - # for i in range(len(fr)): - # p.addUserDebugLine(lineFromXYZ=fr[i], - # lineToXYZ=to[i], - # lineColorRGB=[1, 0, 0], - # lifeTime=100 * self.CTRL_TIMESTEP, - # physicsClientId=self.PYB_CLIENT) - rays = p.rayTestBatch(rayFromPositions=fr, - rayToPositions=to, - physicsClientId=self.PYB_CLIENT) + raise ValueError("Unknown gate type.") + half_length = 0.1875 # Obstacle URDF dependent. + delta_x = 0.05 * np.cos(rot) + delta_y = 0.05 * np.sin(rot) + fr = [[x, y, height - half_length]] + to = [[x, y, height + half_length]] + for i in [1, 2, 3]: + fr.append([x + i * delta_x, y + i * delta_y, height - half_length]) + fr.append([x - i * delta_x, y - i * delta_y, height - half_length]) + to.append([x + i * delta_x, y + i * delta_y, height + half_length]) + to.append([x - i * delta_x, y - i * delta_y, height + half_length]) + rays = p.rayTestBatch( + rayFromPositions=fr, rayToPositions=to, physicsClientId=self.PYB_CLIENT + ) self.stepped_through_gate = False for r in rays: - if r[2] < .9999: + if r[2] < 0.9999: self.current_gate += 1 self.stepped_through_gate = True break - if self.current_gate < self.NUM_GATES: - VISIBILITY_RANGE = 0.45 - info["current_target_gate_id"] = self.current_gate - closest_points = p.getClosestPoints(bodyA=self.GATES_IDS[self.current_gate], - bodyB=self.DRONE_IDS[0], - distance=VISIBILITY_RANGE, - # linkIndexA=-1, linkIndexB=-1, - physicsClientId=self.PYB_CLIENT) + # LSY Drone Racing + # Always add the nominal gate positions. If any gates are in range, update the position. If + # any obstacles are in range, also update the obstacle positions. + VISIBILITY_RANGE = 0.45 + info["gates_pos"] = self.GATES[:6].copy() + info["obstacles_pos"] = self.OBSTACLES.copy() + info["gates_in_range"] = np.zeros(self.NUM_GATES, dtype=bool) + info["obstacles_in_range"] = np.zeros(self.n_obstacles, dtype=bool) + for i in range(self.NUM_GATES): + closest_points = p.getClosestPoints( + bodyA=self.GATES_IDS[i], + bodyB=self.DRONE_IDS[0], + distance=VISIBILITY_RANGE, + physicsClientId=self.PYB_CLIENT, + ) if len(closest_points) > 0: - info["current_target_gate_in_range"] = True - info["current_target_gate_pos"] = self.EFFECTIVE_GATES_POSITIONS[self.current_gate] - else: - info["current_target_gate_in_range"] = False - info["current_target_gate_pos"] = self.GATES[self.current_gate][0:6] - info["current_target_gate_type"] = self.GATES[self.current_gate][6] - else: - info["current_target_gate_id"] = -1 - info["current_target_gate_in_range"] = False - info["current_target_gate_pos"] = [] - info["current_target_gate_type"] = -1 - # + info["gates_pos"][i][:6] = self.EFFECTIVE_GATES_POSITIONS[i] + info["gates_in_range"][i] = True + for i in range(self.n_obstacles): + closest_points = p.getClosestPoints( + bodyA=self.OBSTACLES_IDS[i], + bodyB=self.DRONE_IDS[0], + distance=VISIBILITY_RANGE, + physicsClientId=self.PYB_CLIENT, + ) + if len(closest_points) > 0: + info["obstacles_pos"][i] = self.OBSTACLES[i] + info["obstacles_in_range"][i] = True + + info["gate_types"] = self.GATES[:, 6] + info["current_gate_id"] = self.current_gate if self.current_gate < self.NUM_GATES else -1 + # Final goal position reached info["at_goal_position"] = False info["task_completed"] = False @@ -1118,18 +1227,25 @@ def _get_info(self): if self.QUAD_TYPE == QuadType.THREE_D: quad_xyz = np.array([self.state[0], self.state[2], self.state[4]]) goal_xyz = np.array([self.X_GOAL[0], self.X_GOAL[2], self.X_GOAL[4]]) - if np.linalg.norm(quad_xyz - goal_xyz) < self.TASK_INFO["stabilization_goal_tolerance"]: + if ( + np.linalg.norm(quad_xyz - goal_xyz) + < self.TASK_INFO["stabilization_goal_tolerance"] + ): self.at_goal_pos = True self.steps_at_goal_pos += 1 else: self.at_goal_pos = False self.steps_at_goal_pos = 0 - if self.steps_at_goal_pos > self.CTRL_FREQ*2: # Remain near goal position for 2''. + if ( + self.steps_at_goal_pos > self.CTRL_FREQ * 2 + ): # Remain near goal position for 2''. self.task_completed = True info["at_goal_position"] = self.at_goal_pos info["task_completed"] = self.task_completed else: - print('[WARNING] "at_goal_position" and "task_completed" are only intended for used with the 3D quadrotor.') + logger.warning( + '"at_goal_position" and "task_completed" are only intended for used with the 3D quadrotor.' + ) return info @@ -1146,7 +1262,7 @@ def _get_reset_info(self): "quadrotor_mass": self.MASS, "quadrotor_ixx_inertia": self.J[0, 0], "quadrotor_iyy_inertia": self.J[1, 1], - "quadrotor_izz_inertia": self.J[2, 2] + "quadrotor_izz_inertia": self.J[2, 2], } info["x_reference"] = self.X_GOAL info["u_reference"] = self.U_GOAL @@ -1154,7 +1270,7 @@ def _get_reset_info(self): info["symbolic_constraints"] = self.constraints.get_all_symbolic_models() else: info["symbolic_constraints"] = {} - + # IROS 2022 - Reset info. info["ctrl_timestep"] = self.CTRL_TIMESTEP info["ctrl_freq"] = self.CTRL_FREQ @@ -1162,22 +1278,10 @@ def _get_reset_info(self): info["quadrotor_kf"] = self.KF info["quadrotor_km"] = self.KM info["gate_dimensions"] = { - "tall": { - "shape": "square", - "height": 1., - "edge": 0.45 - }, - "low": { - "shape": "square", - "height": 0.525, - "edge": 0.45 - } - } - info["obstacle_dimensions"] = { - "shape": "cylinder", - "height": 1.05, - "radius": 0.05 + "tall": {"shape": "square", "height": 1.0, "edge": 0.45}, + "low": {"shape": "square", "height": 0.525, "edge": 0.45}, } + info["obstacle_dimensions"] = {"shape": "cylinder", "height": 1.05, "radius": 0.05} info["nominal_gates_pos_and_type"] = self.GATES info["nominal_obstacles_pos"] = self.OBSTACLES @@ -1194,9 +1298,8 @@ def _get_reset_info(self): else: info["gates_and_obs_randomization"] = {} info["disturbances"] = self.DISTURBANCES - - # INFO 2022 - Debugging. - info["urdf_dir"] = self.URDF_DIR info["pyb_client"] = self.PYB_CLIENT + info["urdf_dir"] = self.URDF_DIR + info.update(self._get_info()) return info