In [None]:
import gym
from gym import Wrapper
import numpy as np
import copy

class NonstationaryEnv(Wrapper):
    RAND_PARAMS = ['dof_damping_1_dim', 'gravity', 'body_mass', 'geom_friction', 'density']

    def __init__(self, env, rand_params=['gravity'], log_scale_limit=3.0):
        super().__init__(env)
        self.normalize_context = True
        self.log_scale_limit = log_scale_limit
        self.rand_params = rand_params
        self.save_parameters()
        self.min_param, self.max_param = self.get_minmax_parameter(log_scale_limit)
        self.cur_parameter_vector = self.env_parameter_vector_
        self.cur_step_ind = 0
        self.setted_env_params = None
        self.setted_env_changing_period = None
        self.setted_env_changing_interval = None
        self.min_action = env.action_space.low
        self.max_action = env.action_space.high
        self.range_action = self.max_action - self.min_action
        self._debug_state = None

    def get_minmax_parameter(self, log_scale_limit):
        min_param = {}
        max_param = {}
        bound = lambda x, y: np.array(1.5) ** (np.ones(shape=x) * ((-1 if y == 'low' else 1) * log_scale_limit))

        if 'body_mass' in self.rand_params:
            min_multiplyers = bound(self.model.body_mass.shape, 'low')
            max_multiplyers = bound(self.model.body_mass.shape, 'high')
            min_param['body_mass'] = self.init_params['body_mass'] * min_multiplyers
            max_param['body_mass'] = self.init_params['body_mass'] * max_multiplyers

        # Other parameters go here

        return min_param, max_param

    def denormalization(self, action):
        return (action + 1) / 2 * self.range_action + self.min_action

    def normalization(self, action):
        return (action - self.min_action) / self.range_action * 2 - 1

    def step(self, action):
        self.cur_step_ind += 1
        if self.setted_env_params is not None and self.cur_step_ind % self.setted_env_changing_interval == 0:
            # Handling environment parameter changes here
            pass

        try:
            res = super(NonstationaryEnv, self).step(action)
            self._debug_state = res[0]
            return res
        except Exception as e:
            # Exception handling here
            pass

    def set_nonstationary_para(self, setting_env_params, changine_period, changing_interval):
        self.setted_env_changing_period = changine_period
        self.setted_env_params = setting_env_params
        self.setted_env_changing_interval = changing_interval

    def reset_nonstationary(self):
        self.set_nonstationary_para(None, None, None)

    def reset(self, **kwargs):
        self.cur_step_ind = 0
        return super(NonstationaryEnv, self).reset(**kwargs)

    def sample_tasks(self, n_tasks, dig_range=None, linspace=False):
        tasks = []
        for _ in range(n_tasks):
            # Task sampling code here
            new_params = {}

            if 'body_mass' in self.rand_params:
                # Randomize body mass within specified range
                min_mass, max_mass = 0.5, 2.0  # Modify the range as needed
                new_params['body_mass'] = np.random.uniform(min_mass, max_mass)

            # Add other parameters and their randomization here

            tasks.append(new_params)

        return tasks

    def cross_params(self, param_a, param_b):
        # Cross parameters code here
        pass

    def set_task(self, task):
        for param, param_val in task.items():
            if param == 'gravity':
                param_variable = getattr(self.unwrapped.model.opt, param)
                param_variable[:] = param_val
            elif param == 'density':
                self.unwrapped.model.opt.density = float(param_val)
            elif param == 'wind':
                param_variable = getattr(self.unwrapped.model.opt, param)
                param_variable[:2] = param_val
            else:
                param_variable = getattr(self.unwrapped.model, param)
                param_variable[:] = param_val

        self.cur_params = task
        self.cur_parameter_vector = self.env_parameter_vector_

    def get_task(self):
        return self.cur_params

    def save_parameters(self):
        self.init_params = {}
        if 'body_mass' in self.rand_params:
            self.init_params['body_mass'] = self.unwrapped.model.body_mass

        # Initialize other parameters here

        self.cur_params = copy.deepcopy(self.init_params)

    @property
    def env_parameter_vector(self):
        return self.cur_parameter_vector

    @property
    def env_parameter_vector_(self):
        keys = [key for key in self.rand_params]
        if len(keys) == 0:
            return []
        vec_ = [self.cur_params[key] for key in keys]
        cur_vec = np.hstack(vec_)
        if not self.normalize_context:
            return cur_vec
        vec_range = self.param_max - self.param_min
        vec_range[vec_range == 0] = 1.0
        cur_vec = (cur_vec - self.param_min) / vec_range
        return cur_vec

    @property
    def env_parameter_length(self):
        length = np.sum([np.size(self.cur_params[key]) for key in self.rand_params])
        return length

    @property
    def param_max(self):
        keys = [key for key in self.rand_params]
        vec_ = [self.max_param[key] for key in keys]
        if len(vec_) == 0:
            return []
        return np.hstack(vec_)

    @property
    def param_min(self):
        keys = [key for key in self.rand_params]
        vec_ = [self.min_param[key] for key in keys]
        if len(vec_) == 0:
            return []
        return np.hstack(vec_)

    @property
    def _elapsed_steps(self):
        return self.cur_step_ind

    @property
    def _max_episode_steps(self):
        if hasattr(self.env, '_max_episode_steps'):
            return self.env._max_episode_steps
        return 1000


In [None]:
!pip install stable_baselines3

  and should_run_async(code)




In [None]:
!pip install glfw



In [None]:
import gym
import matplotlib.pyplot as plt
from stable_baselines3 import SAC
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import DummyVecEnv


# Define the environment creation function
def create_env():
    base_env = gym.make('CartPole-v0')  # Corrected the environment name
    return NonstationaryEnv(base_env, ['dof_damping_1_dim', 'gravity', 'body_mass', 'geom_friction', 'density'])

# Create the environment
env = DummyVecEnv([lambda: create_env()])  # Corrected the environment creation

# Define and create the SAC model
model = SAC("MlpPolicy", env, verbose=1)

# Training parameters
total_timesteps = 100000

# Train the model
model.learn(total_timesteps=total_timesteps)

# Evaluate the trained model
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Mean Reward: {mean_reward:.2f}")

# Save the model
model.save("sac_nonstationary")



  logger.warn(
  deprecation(
  deprecation(


AttributeError: ignored

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Generate random data for the CR curve (example)
num_points = 100
x = np.linspace(0, 1, num_points)
cr_curve = np.random.rand(num_points)

# Function to apply a moving average filter for smoothing
def moving_average(data, window_size):
    cumsum = np.cumsum(data)
    cumsum[window_size:] = cumsum[window_size:] - cumsum[:-window_size]
    return cumsum[window_size - 1:] / window_size

# Smooth the CR curve using a moving average filter
window_size = 5  # Adjust this to control the level of smoothing
smoothed_curve = moving_average(cr_curve, window_size)

# Plot the original CR curve and the smoothed curve
plt.figure(figsize=(10, 6))
plt.plot(x, cr_curve, label='Original CR Curve')
plt.plot(x[window_size - 1:], smoothed_curve, label=f'Smoothed (Window Size {window_size})')
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend()
plt.title('Characteristic Curve and Smoothened Curve')
plt.grid(True)
plt.show()
