In [1]:
!pip install mujoco mujoco-python-viewer pyvirtualdisplay opencv-python mediapy
!sudo apt-get install xvfb
!pip install xvfbwrapper

Collecting mujoco
  Downloading mujoco-2.3.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.6 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/4.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/4.6 MB[0m [31m6.7 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/4.6 MB[0m [31m29.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m4.6/4.6 MB[0m [31m51.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.6/4.6 MB[0m [31m42.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mujoco-python-viewer
  Downloading mujoco_python_viewer-0.1.3-py3-none-any.whl (10 kB)
Collecting pyvirtualdisplay
  Downloading PyVirtualDisplay-3.0-py3-none-any.whl (15 kB)
Collecting mediapy
  Downloading mediapy-1.1.9-py3-none-any.wh

In [2]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir('/content/drive/MyDrive/RL/code') # "RL" 폴더 이름 수정할 것

Mounted at /content/drive


### Augmented Random Search

In [3]:
import mujoco
import numpy as np
import matplotlib.pyplot as plt
import mediapy as media
from mujoco_parser import MuJoCoParserClass
from snapbot_env import SnapbotMarkovDecisionProcessClass
np.set_printoptions(precision=2,suppress=True,linewidth=100)
plt.rc('xtick',labelsize=6); plt.rc('ytick',labelsize=6)
%config InlineBackend.figure_format = 'retina'
%matplotlib inline
print ("MuJoCo version:[%s]"%(mujoco.__version__))

MuJoCo version:[2.3.7]


### Construct Snapbot env

In [4]:
xml_path = '../asset/snapbot/scene_snapbot.xml'
env = MuJoCoParserClass(name='Snapbot',rel_xml_path=xml_path,VERBOSE=False)
mdp = SnapbotMarkovDecisionProcessClass(
    env,HZ=50,history_total_sec=1.0,history_intv_sec=0.1,VERBOSE=True)
# Update maximum torque
max_torque = 2
mdp.env.ctrl_ranges[:,0] = -max_torque
mdp.env.ctrl_ranges[:,1] = +max_torque

[Snapbot] Instantiated
   [info] dt:[0.0200] HZ:[50], env-HZ:[500], mujoco_nstep:[10], state_dim:[35], o_dim:[350], a_dim:[8]
   [history] total_sec:[1.00]sec, n:[50], intv_sec:[0.10]sec, intv_tick:[5]
   [history] ticks:[ 0  5 10 15 20 25 30 35 40 45]


### ARS

In [5]:
class Hp():

    def __init__(self):
        self.n_step           = 100
        self.episode_length   = 250
        self.learning_rate    = 0.02
        self.n_direction      = 50
        self.n_best_direction = 5
        assert self.n_best_direction <= self.n_direction
        self.noise = 0.01
        self.seed  = 0

class Normalizer():

    def __init__(self, n_input):
        self.n = np.zeros(n_input)
        self.mean = np.zeros(n_input)
        self.mean_diff = np.zeros(n_input)
        self.var = np.zeros(n_input)

    def observe(self, x):
        self.n += 1.
        last_mean = self.mean.copy()
        self.mean += (x - self.mean) / self.n
        self.mean_diff += (x - last_mean) * (x - self.mean)
        self.var = (self.mean_diff / self.n).clip(min = 1e-2)

    def normalize(self, inputs):
        obs_mean = self.mean
        obs_std = np.sqrt(self.var)
        return (inputs - obs_mean) / obs_std

class Policy():

    def __init__(self, input_size, output_size):
        self.theta = np.zeros((output_size, input_size))

    def evaluate(self, input, delta = None, direction = None):
        if direction is None:
            action = self.theta.dot(input)
        elif direction == "positive":
            action = (self.theta + hyp.noise*delta).dot(input)
        else:
            action = (self.theta - hyp.noise*delta).dot(input)
        return 2*np.tanh(action)

    def sample_deltas(self):
        return [np.random.randn(*self.theta.shape) for _ in range(hyp.n_direction)]

    def update(self, rollouts, sigma_r):
        step = np.zeros(self.theta.shape)
        for r_pos, r_neg, d in rollouts:
            step += (r_pos - r_neg) * d
        self.theta += hyp.learning_rate / (hyp.n_best_direction * sigma_r) * step

def explore(mdp, normalizer, policy, direction = None, delta = None):
    state = mdp.reset()
    done = False
    num_plays = 0.
    sum_rewards = 0
    while not done and num_plays < hyp.episode_length:
        normalizer.observe(state)
        state = normalizer.normalize(state)
        action = policy.evaluate(state, delta, direction)
        state, reward, done, _ = mdp.step(action)
        reward = max(min(reward, 1), -1)
        sum_rewards += reward
        num_plays += 1
    return sum_rewards


def train(mdp, policy, normalizer, hyp):


    for step in range(hyp.n_step):

        # Initializing the perturbations deltas and the positive/negative rewards
        deltas = policy.sample_deltas()
        positive_rewards = [0] * hyp.n_direction
        negative_rewards = [0] * hyp.n_direction

        # Getting the positive rewards in the positive directions
        for k in range(hyp.n_direction):
            positive_rewards[k] = explore(
                mdp, normalizer, policy, direction = "positive", delta = deltas[k])

        # Getting the negative rewards in the negative/opposite directions
        for k in range(hyp.n_direction):
            negative_rewards[k] = explore(
                mdp, normalizer, policy, direction = "negative", delta = deltas[k])

        # Gathering all the positive/negative rewards to compute the standard deviation of these rewards
        all_rewards = np.array(positive_rewards + negative_rewards)
        sigma_r = all_rewards.std()

        # Sorting the rollouts by the max(r_pos, r_neg) and selecting the best directions
        scores = {k:max(r_pos, r_neg) for k,(r_pos,r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
        order = sorted(scores.keys(), key = lambda x:scores[x], reverse = True)[:hyp.n_best_direction]
        rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]

        # Updating our policy
        policy.update(rollouts, sigma_r)

        # Printing the final reward of the policy after the update
        reward_evaluation = explore(mdp, normalizer, policy)
        print ("step:[%d/%d] reward:[%.4f]"%(step,hyp.n_step,reward_evaluation))

        # Evaluation
        eval_every = 5
        if (step-1)%eval_every == 0:
            mdp.init_viewer()
            state,done = mdp.reset(),False
            tick,sum_rewards_eval = 0,0
            img_list = []
            while not done and tick < hyp.episode_length:
                normalizer.observe(state)
                state = normalizer.normalize(state)
                action = policy.theta.dot(state)
                state, reward, done, _ = mdp.step(action)
                reward = max(min(reward, 1), -1)
                sum_rewards_eval += reward
                tick += 1
                # Render
                if ((tick-1)%10) == 0:
                    mdp.render(
                        TRACK_TORSO=True,PLOT_WORLD_COORD=True,PLOT_TORSO_COORD=True,
                        PLOT_SENSOR=True,PLOT_CONTACT=True,PLOT_TIME=True)
                    img = mdp.grab_image(resize_rate=0.2)
                    img_list.append(img)
            mdp.close_viewer()
            media.show_video(img_list,fps=mdp.HZ//10)
            print ("  sum_rewards_eval:[%.2f]"%(sum_rewards_eval))
print ("Ready.")

Ready.


### Train

In [6]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(400, 300))
display.start()

hyp = Hp()
np.random.seed(hyp.seed)
policy = Policy(mdp.o_dim,mdp.a_dim)
normalizer = Normalizer(mdp.o_dim)
train(mdp, policy, normalizer, hyp)

step:[0/100] reward:[-3.8275]
step:[1/100] reward:[-16.5140]


0
This browser does not support the video tag.


  sum_rewards_eval:[0.16]
step:[2/100] reward:[1.2288]


KeyboardInterrupt: ignored