In [1]:
import gymnasium as gym
from gymnasium import Wrapper

class CustomTerminationWrapper(Wrapper):
    def __init__(self, env, max_steps,max_steps_cut=200,cut_episode_time=200000):
        super().__init__(env)
        self.max_steps = max_steps
        self.current_step = 0
        self.total_time_steps = 0
        self.cut_episode_time = cut_episode_time
        self.max_steps_cut = max_steps_cut
      
    def reset(self, **kwargs):
        self.current_step = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)
        
        self.current_step += 1
        self.total_time_steps += 1
      
        # Override termination condition
        # Terminate only if max steps reached
        if self.total_time_steps > self.cut_episode_time:
          if self.current_step >= self.max_steps_cut:
              terminated = True
          else:
              terminated = False
        
        return observation, reward, terminated, truncated, info
      

In [2]:
from stable_baselines3 import PPO
from stable_baselines3 import DDPG
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
import numpy as np

In [None]:

# Create the environment
max_steps = 999  # Set this to your desired maximum number of steps
max_steps_cutoff = 200
cut_episode_time = 200000
# env = gym.make('MountainCarContinuous-v0',render_mode='rgb_array') 
env = CustomTerminationWrapper(gym.make('MountainCarContinuous-v0',render_mode='rgb_array'), max_steps,max_steps_cutoff,cut_episode_time)
# env = gym.make("whatever")
# Initialize the PPO agent
# model = PPO("MlpPolicy", env, verbose=1,device='cpu')
# The noise objects for DDPG

n_actions = env.action_space.shape[-1]
action_noise = OrnsteinUhlenbeckActionNoise(mean=np.zeros(n_actions), sigma=0.5 * np.ones(n_actions))

model = DDPG("MlpPolicy", env, action_noise=action_noise, buffer_size=5000, verbose=1)

# Train the agent
model.learn(total_timesteps=300000)

# Save the trained model
model.save("ddpg_cartpole")

In [4]:
import os.path as osp

In [24]:
max_steps = 1000  # Set this to your desired maximum number of steps
max_steps_cutoff = 1000
cut_episode_time = -1
# env = CustomTerminationWrapper(gym.make('MountainCarContinuous-v0',render_mode='rgb_array'), max_steps,max_steps_cutoff,cut_episode_time)
env =gym.make('MountainCarContinuous-v0',render_mode='rgb_array')
model = DDPG("MlpPolicy", env)
model = model.load(osp.join("expert_agents","MountainCarContinuous-v0","MountainCarcontinuous-v0.zip"),env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [25]:
# Enjoy trained agent
vec_env = model.get_env()
vec_env.cut_episode_time = -1
vec_env.max_steps = 1000
vec_env.max_steps_cutoff = 1000
obs = vec_env.reset()
for i in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, rewards, dones, info = vec_env.step(action)
    print(obs,i)
    vec_env.render("human")

[[-0.570707   -0.00096482]] 0
[[-0.57250714 -0.00180013]] 1
[[-0.5750882  -0.00258107]] 2
[[-0.57838696 -0.00329878]] 3
[[-0.5823327  -0.00394572]] 4
[[-0.5868486  -0.00451594]] 5
[[-0.59185386 -0.00500523]] 6
[[-0.5972652  -0.00541131]] 7
[[-0.6029989  -0.00573374]] 8
[[-0.6089727  -0.00597384]] 9
[[-0.6151072  -0.00613447]] 10
[[-0.6213269  -0.00621974]] 11
[[-0.6275617  -0.00623478]] 12
[[-0.6337471  -0.00618543]] 13
[[-0.6398251  -0.00607798]] 14
[[-0.645716   -0.00589088]] 15
[[-0.65129936 -0.00558337]] 16
[[-0.6564534  -0.00515402]] 17
[[-0.6609934 -0.00454  ]] 18
[[-0.6647317  -0.00373828]] 19
[[-0.66747725 -0.00274558]] 20
[[-0.6688783  -0.00140109]] 21
[[-6.6856396e-01  3.1435388e-04]] 22
[[-0.66624     0.00232397]] 23
[[-0.66170365  0.00453634]] 24
[[-0.6546734   0.00703023]] 25
[[-0.6451854   0.00948796]] 26
[[-0.63330567  0.01187977]] 27
[[-0.61911786  0.0141878 ]] 28
[[-0.6027234   0.01639443]] 29
[[-0.5842411  0.0184823]] 30
[[-0.5638065   0.02043462]] 31
[[-0.541571    0

KeyboardInterrupt: 