# Defining an Environment

Here we will re-define the environment `two_segments_v1`

Every environment consists of
- a Mujoco Specification File (end in .xml)
- a Python File 

The .xml file is used be Mujoco to simulate the system. 

The Python file is used to define how to interface/interact with this Mujoco Simulation, e.g. what we are allowed to alter at every timestep (usually the control).


In [1]:
from dm_control.rl import control
from dm_control import mujoco
from collections import OrderedDict
import numpy as np 
from cc.env.envs.common import ASSETS 
from cc.utils.sample_from_spec import _spec_from_observation
from cc.env import make_env

----
Let's take a closer look at the content of the .py-file.

It contains to objects
- a `mujoco.Physics` object
- a `control.Task`

The `mujoco.Physics` object gives us a way to interact with the Mujoco simulation from Python.

A Mujoco Simulation is defined by a `.xml` file which may contain the following content:

In [2]:
mujoco_xml_string = r"""
<mujoco model="kinematic-chain">
  <include file="./common/skybox.xml"/>
  <include file="./common/visual.xml"/>
  <include file="./common/materials.xml"/>
  
  <option timestep="0.001" integrator="RK4">
    <flag contact="disable" gravity="disable" energy="enable"/>
  </option>
  
  <default>
    <default class="pole">
      <joint type="hinge" axis="0 1 0" stiffness="10" springref="0" damping="1e-1"/>
      <geom type="capsule" fromto="0 0 0 0 0 1" size="0.045" material="self" mass=".1"/>
    </default>
  </default>
  
  <worldbody>
    <light name="light" pos="0 0 36"/>
    
    <camera name="fixed" pos="0 -6 1" zaxis="0 -1 0"/>
    <camera name="lookatcart" mode="targetbody" target="cart" pos="0 -2 2"/>
    
    <geom name="floor" pos="0 0 -.25" size="100 100 .2" type="plane" material="grid"/>
    <geom name="rail1" type="capsule" pos="0  .07 2" zaxis="1 0 0" size="0.02 20" material="decoration" />
    <geom name="rail2" type="capsule" pos="0 -.07 2" zaxis="1 0 0" size="0.02 20" material="decoration" />
    

    <body name="cart" pos="0 0 2">
      <joint name="slider" type="slide" limited="true" axis="1 0 0" range="-999.8 999.8" damping="1e-3"/>
      <geom name="cart" type="box" size="0.1 0.15 0.05" material="self"  mass="1"/>
      <body name="pole_1" childclass="pole" euler="0 180 0" pos="0 0 -0.1">
        <joint name="hinge_1"/>
        <geom name="pole_1"/>
        <body name="pole_2" childclass="pole" pos="0 0 1.1">
          <joint name="hinge_2"/>
          <geom name="pole_2"/>
          <body name="segment_end" pos="0 0 1.0"/>
      	</body>
      </body>
    </body>
  </worldbody>

  <actuator>
    <motor name="slide" joint="slider" gear="5" ctrllimited="false"/>
  </actuator>
  
</mujoco>
"""

In [3]:
class SegmentPhysics(mujoco.Physics):

    def xpos_of_segment_end(self):
        return self.named.data.xpos["segment_end", "x"]

    def set_torque_of_cart(self, u):
        u = np.arctan(u)
        self.set_control(u)


def load_physics():
    return SegmentPhysics.from_xml_string(mujoco_xml_string, assets=ASSETS)

load_physics()

<__main__.SegmentPhysics at 0x7fd9742cb040>

The `control.Task` precisely defines when we can and and when we will interact with the `mujoco.Physics`-object.

In [4]:
class SegmentTask(control.Task):

    def __init__(self, random: int = 1):
        # seed is unused 
        del random 
        super().__init__()
        
    def initialize_episode(self, physics):
        pass 

    def before_step(self, action, physics: SegmentPhysics):
        physics.set_torque_of_cart(action)

    def after_step(self, physics):
        pass 

    def action_spec(self, physics):
        return mujoco.action_spec(physics)

    def get_observation(self, physics) -> OrderedDict:
        obs = OrderedDict()
        obs["xpos_of_segment_end"] = np.atleast_1d(physics.xpos_of_segment_end())
        return obs 

    def get_reward(self, physics):
        return np.array(0.0)

    def observation_spec(self, physics):
        return _spec_from_observation(self.get_observation(physics))

SegmentTask()

<__main__.SegmentTask at 0x7fd974288e50>

With these two components we can register a new Environment at `cc.env.register`

Here, this has already been done and we can simply load it using its string-identifier.

In [5]:
env = make_env("two_segments_v1", random=1)

In [6]:
action=np.array([0.2])
env.step(action)

TimeStep(step_type=<StepType.FIRST: 0>, reward=None, discount=None, observation=OrderedDict([('xpos_of_segment_end', array([2.5717583e-16], dtype=float32))]))

In [7]:
env.step(action)

TimeStep(step_type=<StepType.MID: 1>, reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=OrderedDict([('xpos_of_segment_end', array([4.913601e-06], dtype=float32))]))

Without going into any details: Let's just take a look at some randomly acting controller in this environment.

Press the backspace key to reset the environment.

In [8]:
from cc.env.collect import RandomActor
from cc.utils.visual.viewer import launch_viewer

actor = RandomActor(env.action_spec(), reset_key=True)

# uncomment to launch a viewer
# uncommented as it would otherwise pop up while testing
# launch_viewer(env, actor)



  from .autonotebook import tqdm as notebook_tqdm
