In [2]:
import safety_gymnasium
from tqdm import tqdm, trange
import hashlib

In [None]:
def simulation(env, iter_nr = 1000, reward_free = False):
    obs, info = env.reset()
    # Set seeds
    # obs, _ = env.reset(seed=0)
    terminated, truncated = False, False
    ep_ret, ep_cost = 0, 0
    
    for _ in trange(iter_nr):
        assert env.observation_space.contains(obs)
        act = env.action_space.sample()
        assert env.action_space.contains(act)
        obs, reward, cost, terminated, truncated, info = env.step(act)
        # Only add the reward during deployment i.e. not reward-free
        if not reward_free:
            ep_ret += reward
        ep_cost += cost
        if terminated or truncated:
            observation, info = env.reset()

    env.close()
    return ep_ret, ep_cost

: 

In [9]:
env = safety_gymnasium.vector.make('SafetyPointGoal2-v0', render_mode='human', max_episode_steps = 1000, num_envs = 10)
states, _ = env.reset()
# ret, cost = simulation(env, reward_free = True)
# ret, cost

In [11]:
states.shape

(10, 60)

In [None]:
env.observation_space.sample()

array([[ 1.23312221, -2.70954918,  0.77270429,  0.34742047, -0.02208241,
        -1.44108583,  0.47390603,  0.47729077, -0.47908617,  2.9214509 ,
        -1.4650847 , -2.85288133,  0.02211164,  0.84967832,  0.38352878,
         0.21341946,  0.20184158,  0.64443959,  0.84260588,  0.61279609,
         0.46390687,  0.53404606,  0.86231653,  0.7425965 ,  0.39747392,
         0.24895232,  0.29960773,  0.66347245,  0.24840417,  0.55045069,
         0.76504544,  0.40602669,  0.37195532,  0.95871422,  0.8980852 ,
         0.77595923,  0.18315749,  0.01471779,  0.71319898,  0.96362788,
         0.6868792 ,  0.28378882,  0.41853399,  0.29573058,  0.95082134,
         0.29140005,  0.88860927,  0.64014027,  0.15838558,  0.13450077,
         0.89592214,  0.85117475,  0.74791387,  0.59544859,  0.92045642,
         0.49911182,  0.72715261,  0.68247469,  0.28126327,  0.33521253],
       [ 0.56842047, -1.64189232,  1.03872865, -0.46436323,  0.2574241 ,
         0.68695684, -0.39995456, -0.8544463 ,  2.

: 

In [None]:
env.observation_space.sample()

array([[ 5.78813812e-01, -8.20718795e-01, -6.20810500e-01,
        -4.13619772e-01, -5.13893023e-01, -9.07877536e-01,
        -9.21364926e-02,  9.01835490e-01, -1.18659904e+00,
        -1.63576488e+00, -8.45475715e-02,  1.84453042e-01,
         9.92062569e-01,  7.50542474e-01,  7.43735743e-01,
         8.93354088e-01,  4.62202510e-03,  5.60088004e-01,
         6.72021829e-01,  4.15345516e-01,  7.77850538e-02,
         8.00208079e-01,  5.50554298e-01,  7.61942108e-01,
         6.07597374e-01,  1.20131150e-01,  4.99078403e-01,
         1.66404833e-01,  6.55310020e-01,  4.50055590e-02,
         2.95552054e-01,  1.29217486e-01,  4.65456461e-03,
         5.86104482e-01,  8.13901369e-01,  2.12875970e-01,
         5.94656422e-01,  5.71533151e-01,  5.56934342e-01,
         2.40300159e-01,  5.06706705e-01,  7.58554326e-01,
         2.19985993e-01,  4.53217880e-01,  1.44284856e-01,
         7.42931305e-01,  9.30247694e-01,  6.70096928e-01,
         2.10203731e-01,  4.66576675e-01,  4.23491923e-0

: 

In [None]:
env.action_space.sample()

array([[ 0.75103543, -0.52774382]])

: 

In [None]:
for i in dir(env.task):
    print(i)

__abstractmethods__
__class__
__delattr__
__dict__
__dir__
__doc__
__eq__
__format__
__ge__
__getattribute__
__gt__
__hash__
__init__
__init_subclass__
__le__
__lt__
__module__
__ne__
__new__
__reduce__
__reduce_ex__
__repr__
__setattr__
__sizeof__
__slots__
__str__
__subclasshook__
__weakref__
_abc_impl
_add_free_geoms
_add_geoms
_add_mocaps
_build
_build_agent
_build_placements_dict
_build_static_geoms_config
_build_world_config
_ego_xy
_free_geoms
_geoms
_get_viewer
_is_load_static_geoms
_mocaps
_obs_compass
_obs_lidar
_obs_lidar_natural
_obs_lidar_pseudo
_obs_vision
_obstacles
_parse
_placements_dict_from_object
_render_area
_render_compass
_render_lidar
_render_sphere
_set_goal
_update_viewer
_viewers
action_noise
action_space
agent
agent_name
build_goal_position
build_observation_space
build_staged_goal_position
calculate_cost
calculate_reward
compass_conf
cost_conf
data
debug
dist_goal
dist_staged_goal
floor_conf
goal
goal_achieved
hazards
last_dist_goal
lidar_conf
mechanism_con

: 

In [None]:
# env.task.goal = None  # Remove goal
# env.task.dist_goal = None  # Remove goal distance tracking
# env.task.sim_conf['frame_skip'] = 1
# env.task.goal_achieved = False  # Ensure goal completion condition is ignored

: 

In [None]:
dir(env.task)

['__abstractmethods__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_abc_impl',
 '_add_free_geoms',
 '_add_geoms',
 '_add_mocaps',
 '_build',
 '_build_agent',
 '_build_placements_dict',
 '_build_static_geoms_config',
 '_build_world_config',
 '_ego_xy',
 '_free_geoms',
 '_geoms',
 '_get_viewer',
 '_is_load_static_geoms',
 '_mocaps',
 '_obs_compass',
 '_obs_lidar',
 '_obs_lidar_natural',
 '_obs_lidar_pseudo',
 '_obs_vision',
 '_obstacles',
 '_parse',
 '_placements_dict_from_object',
 '_render_area',
 '_render_compass',
 '_render_lidar',
 '_render_sphere',
 '_set_goal',
 '_update_viewer',
 '_viewers',
 'action_noise',
 'action_space',
 'agent',
 'agent_

: 

In [None]:
dir(env.task.calculate_reward)

['__call__',
 '__class__',
 '__delattr__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__func__',
 '__ge__',
 '__get__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__self__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__']

: 

In [None]:
s, _ = env.reset()
for i in range(1000):
    if i > 500:
        print("Hapi hapi hapi")
    act = env.action_space.sample()
    env.step(act)
    ns, _, _, terminated, truncated, _ = env.step(act)
    if terminated:
        print (f"Terminated in iter {i + 1}")
        if s == ns:
            print("States are the same bruh")
    if truncated:
        print (f"Truncated in iter {i + 1}")
    s = ns

Truncated in iter 500


AssertionError: Environment must be reset before stepping.

: 

In [None]:
s

array([0.        , 0.        , 9.81      , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.49973329,
       0.01632916, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.20557594, 0.53956877, 0.33399283,
       0.        , 0.        , 0.        , 0.63889349, 0.71901521,
       0.10300368, 0.46701257, 0.39196807, 0.43011249, 0.47528243,
       0.56379898, 0.08851655, 0.18709485, 0.50941531, 0.32232046,
       0.01725441, 0.27526152, 0.76228501, 0.69628043, 0.20391412,
       0.63579955, 0.6087739 , 0.09102056, 0.50004599, 0.56711921,
       0.20920539, 0.34230943, 0.79591073, 0.4536013 , 0.72291357,
       0.65376539, 0.        , 0.21404971, 0.55550223, 0.53404657])

: 

In [None]:
ns

array([ 2.68898589,  2.28094486,  9.81      ,  0.11105991,  0.01579968,
        0.        , -0.        ,  0.        , -0.79487563,  0.49911525,
        0.02973161,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.17087594,  0.53929468,  0.36841874,
        0.        ,  0.        ,  0.        ,  0.58915459,  0.71979689,
        0.1306423 ,  0.46721905,  0.42580416,  0.42977952,  0.434887  ,
        0.56310377,  0.12821677,  0.15370026,  0.5088759 ,  0.35517565,
        0.        ,  0.27479303,  0.76293766,  0.74515793,  0.22223876,
        0.61809754,  0.63647606,  0.05480677,  0.49994576,  0.56684986,
        0.25036811,  0.28784383,  0.79510634,  0.50726251,  0.72244043,
        0.69838603,  0.        ,  0.18836053,  0.54110128,  0.55616385])

: 

In [None]:
for _ in range(100):
    act = env.action_space.sample()
    new_state, reward, cost, terminated, truncated, info = env.step(act)

: 

In [None]:
reward

-0.0033912437081586866

: 

In [None]:
new_state

array([ 3.33099610e+00,  6.60883549e+00,  1.44367428e+01,  1.84440792e-02,
       -7.61994594e-02,  1.01142211e-02,  1.01282470e-01,  6.29291039e-01,
       -6.88697815e-01, -1.43894203e-01, -4.78830075e-01,  4.02707896e-03,
        8.05895484e-01, -1.63789522e+00,  6.36175930e-01,  7.51492993e-01,
       -5.50383262e-01,  3.63780903e-01,  4.90767097e-01,  8.34865644e-01,
        2.49293026e-01, -4.40914887e-01, -8.81026442e-03,  8.97505678e-01,
        1.43707622e-01,  2.02642017e-01,  5.89343947e-02,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        5.64273398e-01,  5.26614584e-01,  3.98543005e-01,  4.18327333e-01,
        1.52471030e-01,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  5.75992975e-01,  7.74824509e-01,  1.98831534e-01,
        5.49742919e-01,  

: 

In [3]:
import torch
if torch.cuda.is_available():
    dev = "cuda:0"
else:
    dev = "cpu"

In [4]:
dev

'cuda:0'

In [5]:
torch.cuda.device_count()

1

In [6]:
torch.cpu.device_count()

1

In [7]:
import os
os.cpu_count()

12

In [12]:
x = [1, 2, 3, 5, 3, 2]
x[: -1]

[1, 2, 3, 5, 3]

In [24]:
env.world.robot_pos()

AttributeError: 'Builder' object has no attribute 'world'

In [19]:
import safety_gymnasium
import numpy as np

# Create environment
env = safety_gymnasium.make("SafetyPointGoal2-v0")

# Reset environment to get an initial observation
obs, _ = env.reset()

# Print observation shape and first few values
print(f"Observation shape: {obs.shape}")
print(f"First 10 values: {obs[:10]}")  # Print first 10 values for inspection

# Move agent slightly and get new observation
action = np.zeros(env.action_space.shape)  # No movement
obs_new, _, _, _, _, _ = env.step(action)

print(f"Updated first 10 values after no movement: {obs_new[:10]}")

# Now apply movement
action[0] = 0.1  # Move forward slightly
obs_moved, _, _, _, _, _ = env.step(action)

print(f"First 10 values after movement: {obs_moved[:10]}")


Observation shape: (60,)
First 10 values: [0.         0.         9.81       0.         0.         0.
 0.         0.         0.         0.48096577]
Updated first 10 values after no movement: [0.         0.         9.81       0.         0.         0.
 0.         0.         0.         0.48096577]
First 10 values after movement: [ 2.78174686e+00  1.21713370e-10  9.81000000e+00  5.66099157e-02
 -3.39849676e-14  0.00000000e+00  0.00000000e+00  0.00000000e+00
  1.76030170e-12  4.80965770e-01]


In [21]:
obs_new != obs_moved

array([ True,  True,  True,  True,  True, False, False, False,  True,
        True,  True, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False,  True,  True,
        True,  True,  True,  True,  True,  True,  True, False, False,
        True,  True,  True,  True,  True,  True,  True, False, False,
        True,  True,  True, False,  True,  True,  True,  True,  True,
        True,  True,  True,  True, False, False])

In [30]:
a = 44
obs_new[a: a+16]

array([0.        , 0.60296529, 0.76495669, 0.1619914 , 0.        ,
       0.20510806, 0.68728026, 0.56527583, 0.36425112, 0.37815886,
       0.11764418, 0.55061392, 0.73119593, 0.18058201, 0.        ,
       0.        ])

In [15]:
import gymnasium as gym
import numpy as np
from gymnasium.vector import VectorEnvWrapper

class SafeMountainCarWrapper(VectorEnvWrapper):
    def __init__(self, env, safety_threshold=-0.5):
        super().__init__(env)
        self.safety_threshold = safety_threshold

    def step(self, action):
        state, reward, terminated, truncated, info = self.env.step(action)
        
        # Add a safety cost when the car goes too far left
        cost = np.zeros(self.num_envs)
        for i in range(self.num_envs):
            cost[i] = -1 if state[i][0] < self.safety_threshold else 0
        
        return state, reward, cost, terminated, truncated, info
    
    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def render(self):
        return self.env.render()
    
class SafeCartPoleWrapper(VectorEnvWrapper):
    def __init__(self, env, safety_threshold=0.8):
        super().__init__(env)
        self.safety_threshold = safety_threshold

    def step(self, action):
        state, reward, terminated, truncated, info = self.env.step(action)
        
        # Add a safety cost when the car goes too far left
        cost = np.zeros(self.num_envs)
        for i in range(self.num_envs):
            cost[i] = -1 if abs(state[i][0]) > self.safety_threshold else 0
        
        return state, reward, cost, terminated, truncated, info
    
    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def render(self):
        return self.env.render()    

# # Use the wrapper
envs = gym.vector.make("CartPole-v1", num_envs=3)
envs = SafeCartPoleWrapper(envs)

# Test Safe CartPole
obs, _ = envs.reset()
for _ in range(200):
    action = np.array([0, 1, 0])  # Random action
    obs, reward, cost, done, _, info = envs.step(action)
envs.close()

# # Use the wrapper
# envs = gym.vector.make("MountainCarContinuous-v0", num_envs=3)
# envs = SafeMountainCarWrapper(envs)

# # Test Safe MountainCar
# obs, _ = envs.reset()
# for _ in range(200):
#     action = envs.action_space.sample()  # Random action
#     obs, reward, cost, done, _, info = envs.step(action)
# envs.close()


In [18]:
envs.single_action_space.n

2