In [13]:
from dm_control import suite, viewer
import numpy as np

In [15]:
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib widget

In [11]:
seed = 0
env = suite.load(domain_name="cartpole", task_name="balance",
                 task_kwargs={"random": seed})

In [4]:
superclass = type(env.task).mro()[1]

In [5]:
env.task.initialize_episode

<bound method Balance.initialize_episode of <dm_control.suite.cartpole.Balance object at 0x7fdc71da90d0>>

In [21]:
env.physics.named.data.cvel

FieldIndexer(cvel):
0  world [ 0         0         0         0         0         0       ]
1   cart [ 0         0         0         0         0         0       ]
2 pole_1 [ 0         0         0         0         0         0       ]

In [6]:
import types

def initialize_episode(self, physics):
    # replace https://github.com/deepmind/dm_control/blob/03bebdf9eea0cbab480aa4882adbc4184b850835/dm_control/suite/cartpole.py#L182
    # for some reason we don't need physics.reset_context() here since dm_control doesn't use it?
    physics.named.data.qpos["slider"][0] = np.array([0.0])
    physics.named.data.qpos["hinge_1"][0] = np.array([3.1415926 / 2])
    physics.named.data.qvel["slider"][0] = np.array([0.0])
    physics.named.data.qvel["hinge_1"][0] = np.array([0.0])
    
    # call function from super class which should be a dm_control.suite.base.Task
    # replaces `super(Balance, self).initialize_episode(physics)`
    superclass = type(env.task).mro()[1]
    superclass.initialize_episode(self, physics)
    return

# make `initialize_episode` a bound method instead of just a static function
# https://tryolabs.com/blog/2013/07/05/run-time-method-patching-python/
env.task.initialize_episode = types.MethodType(initialize_episode, env.task)

# why is the docstrings for MethodType not in https://docs.python.org/3/library/types.html?highlight=types?
# where is it and how does help know where it is?

In [26]:
import torch
from torch.utils.data import Dataset
from dm_control import suite
import numpy as np
import types
import os


class CartpoleDataset(Dataset):
    def __init__(self, root_dir=None,body=None, regen=False,
                 batch_size=100, chunk_len=5, time_limit=10, seed=0):
        super().__init__()
        self.seed = seed
        self.body = body
        
        root_dir = root_dir or os.path.expanduser(f'~/datasets/ODEDynamics/{self.__class__}/')
        filename = os.path.join(root_dir, f"trajectories_N{batch_size}_T{time_limit}.pz")
        if os.path.exists(filename) and not regen:
            ts, zs = torch.load(filename)
        else:
            ts, zs = self.generate_trajectory_data(batch_size, time_limit)
            os.makedirs(root_dir, exist_ok=True)
            torch.save((ts, zs),filename)
        self.Ts, self.Zs = self.chunk_training_data(ts, zs, chunk_len)

    def __len__(self):
        return self.Zs.shape[0]

    def __getitem__(self, i):
        return (self.Zs[i, 0], self.Ts[i]), self.Zs[i]
    
    def _initialize_env(self, time_limit, seed):
        env = suite.load(domain_name="cartpole", task_name="balance", task_kwargs={"random": seed})

        # set time limit a la https://github.com/deepmind/dm_control/blob/03bebdf9eea0cbab480aa4882adbc4184b850835/dm_control/rl/control.py#L76
        if time_limit == float('inf'):
            env._step_limit = float('inf')
        else:
            # possible soure of error in the future if our agent does action repeats
            env._step_limit = time_limit / (env._physics.timestep() * env._n_sub_steps)

        def initialize_episode(self, physics):
            # replace https://github.com/deepmind/dm_control/blob/03bebdf9eea0cbab480aa4882adbc4184b850835/dm_control/suite/cartpole.py#L182
            #physics.named.data.qpos["slider"][0] = np.array([0.0])
            #physics.named.data.qpos["hinge_1"][0] = np.array([3.1415926 / 2])
            #physics.named.data.qvel["slider"][0] = np.array([0.0])
            #physics.named.data.qvel["hinge_1"][0] = np.array([0.0])

            #we'll use the default for Cartpole for now
            nv = physics.model.nv
            if self._swing_up:
                physics.named.data.qpos['slider'] = .01*self.random.randn()
                physics.named.data.qpos['hinge_1'] = np.pi + .01*self.random.randn()
                physics.named.data.qpos[2:] = .1*self.random.randn(nv - 2)
            else:
                physics.named.data.qpos['slider'] = self.random.uniform(-.1, .1)
                physics.named.data.qpos[1:] = self.random.uniform(-.034, .034, nv - 1)
            physics.named.data.qvel[:] = 0.01 * self.random.randn(physics.model.nv)

            # call function from super class which should be a dm_control.suite.base.Task
            # replaces `super(Balance, self).initialize_episode(physics)`
            superclass = type(env.task).mro()[1]
            superclass.initialize_episode(self, physics)
            return

        # make `initialize_episode` a bound method instead of just a static function
        # https://tryolabs.com/blog/2013/07/05/run-time-method-patching-python/
        env.task.initialize_episode = types.MethodType(initialize_episode, env.task)
        
        return env

    def generate_trajectory_data(self, batch_size, time_limit):
        cartesian_trajs = []
        generalized_trajs = []        
        for i in range(batch_size):
            if i % 100 == 0:
                print(i)
            # new random seed each run based on `self.seed`
            env = self._initialize_env(time_limit, self.seed + i)
            cartesian_traj, generalized_traj = self._evolve(env)
            cartesian_trajs.append(cartesian_traj)
            generalized_trajs.append(generalized_traj)
                       
        cartesian_trajs, generalized_trajs = map(np.stack, [cartesian_trajs, generalized_trajs])
        cartesian_trajs, generalized_trajs = map(torch.from_numpy, [cartesian_trajs, generalized_trajs])
        time = torch.linspace(0,
                              int(env._physics.timestep() * env._n_sub_steps * env._step_limit),     
                              int(env._step_limit) + 1 - 1) # add one because of initial state from env.reset(), subtract one because we throw away the last state to chunk our data evenly
        time = time.view(1, -1).expand(batch_size, len(time))
        
        # ignore generalized_trajs for now
        return time, cartesian_trajs
    
    def _get_com(self, env, body):
        # we only keep x and y
        return np.copy(env.physics.named.data.xipos[body][:-1])
    
    def _get_cvel(self, env, body):
        # last three entries corresponding to translational velocity
        # we only keep x and y
        return np.copy(env.physics.named.data.cvel[body][-3:-1])
    
    def _get_com_mom(self, env, body):
        return self._get_cvel(env, body) * env.physics.named.model.body_mass[body]
    
    def _get_q(self, env, body):
        return np.copy(env.physics.named.data.qpos[body])
    
    def _get_p(self, env, body):
        return np.copy(env.physics.named.data.qvel[body])
    
    def _evolve(self, env):
        # get the cartesian coordinate of pole and cart and their velocities
        time_step = env.reset()
        # positions
        pole_com = [self._get_com(env, "pole_1")]
        cart_com = [self._get_com(env, "cart")]
        pole_q = [self._get_q(env, "hinge_1")]
        cart_q = [self._get_q(env, "slider")]
        # momentums
        pole_com_mom = [self._get_com_mom(env, "pole_1")]
        cart_com_mom = [self._get_com_mom(env, "cart")]
        pole_p = [self._get_p(env, "hinge_1")]
        cart_p = [self._get_p(env, "slider")]
                    
        action_spec = env.action_spec()
        while not time_step.last():
            # Take no action, just let the system evolve
            action = np.zeros(action_spec.shape)
            time_step = env.step(action)
            
            pole_com.append(self._get_com(env, "pole_1"))
            cart_com.append(self._get_com(env, "cart"))
            pole_q.append(self._get_q(env, "hinge_1"))
            cart_q.append(self._get_q(env, "slider"))
            
            pole_com_mom.append(self._get_com_mom(env, "pole_1"))
            cart_com_mom.append(self._get_com_mom(env, "cart"))
            pole_p.append(self._get_p(env, "hinge_1"))
            cart_p.append(self._get_p(env, "slider"))

        pole_com, pole_com_mom, cart_com, cart_com_mom = map(np.stack, [pole_com, pole_com_mom, cart_com, cart_com_mom])
        pole_q, pole_p, cart_q, cart_p = map(np.stack, [pole_q, pole_p, cart_q, cart_p])
        
        com = np.stack([cart_com, pole_com])
        com_mom = np.stack([cart_com_mom, pole_com_mom])
    
        q = np.stack([cart_q, pole_q])
        p = np.stack([cart_p, pole_p])    
        
        cartesian_traj = np.stack([com, com_mom])
        generalized_traj = np.stack([q, p])
        cartesian_traj = np.transpose(cartesian_traj, (2, 0, 1, 3))
        generalized_traj = np.transpose(generalized_traj, (2, 0, 1, 3))
        # output should be (number of time steps) x (q or p) x (number of bodys) x (dimension of quantity)
        
        # toss the final time step because we want to be able to chunk evenly
        cartesian_traj = cartesian_traj[:-1]
        generalized_traj = generalized_traj[:-1]
        return cartesian_traj, generalized_traj


    def chunk_training_data(self, ts, zs, chunk_len):
        """ Randomly samples chunks of trajectory data, returns tensors shaped for training.
        Inputs: [ts (batch_size, traj_len)] [zs (batch_size, traj_len, *z_dim)]
        outputs: [chosen_ts (batch_size, chunk_len)] [chosen_zs (batch_size, chunk_len, *z_dim)]"""
        batch_size, traj_len, *z_dim = zs.shape
        n_chunks = traj_len // chunk_len
        chunk_idx = torch.randint(0, n_chunks, (batch_size,), device=zs.device).long()
        chunked_ts = torch.stack(ts.chunk(n_chunks, dim=1))
        chunked_zs = torch.stack(zs.chunk(n_chunks, dim=1))
        chosen_ts = chunked_ts[chunk_idx, range(batch_size)]
        chosen_zs = chunked_zs[chunk_idx, torch.arange(batch_size).long()]
        return chosen_ts, chosen_zs

In [27]:
CartpoleDataset(batch_size=100)

0


<__main__.CartpoleDataset at 0x7f9c8a07df10>

In [7]:
viewer.launch(env)

In [7]:
env.physics.model.nq

2

In [8]:
env.physics.named.model.body_mass

FieldIndexer(body_mass):
0  world [ 0       ]
1   cart [ 1       ]
2 pole_1 [ 0.1     ]

In [9]:
# default initialization
env.physics.named.model.qpos0

FieldIndexer(qpos0):
0  slider [ 0       ]
1 hinge_1 [ 0       ]

In [10]:
env.physics.named.model.body_inertia

FieldIndexer(body_inertia):
           x         y         z         
0  world [ 0         0         0       ]
1   cart [ 0.0108    0.0167    0.0208  ]
2 pole_1 [ 0.00915   0.00915   0.000101]

In [11]:
env.physics.named.data.xaxis

FieldIndexer(xaxis):
            x         y         z         
0  slider [ 1         0         0       ]
1 hinge_1 [ 0         1         0       ]

In [12]:
# See http://www.mujoco.org/book/APIreference.html#mjData for references on how to get the quantities needed
action_spec = env.action_spec()

def evolve():
    time_step = env.reset()
    
    #pole_xpos = [np.copy(env.physics.named.data.xpos["pole_1"])]
    #cart_xpos = [np.copy(env.physics.named.data.xpos["cart"])]
    
    # see https://github.com/deepmind/dm_control/issues/131 for how to change initialization and parameters
    

    # see bottom of page 10 of the dm_control text report https://arxiv.org/pdf/1801.00690.pdf
    # need to reset context to ensure that everything is updated after setting initialization
    with env.physics.reset_context():
        env.physics.named.data.qpos["slider"][0] = np.array([0.0])
        env.physics.named.data.qpos["hinge_1"][0] = np.array([3.1415926 / 2])
        env.physics.named.data.qvel["slider"][0] = np.array([0.0])
        env.physics.named.data.qvel["hinge_1"][0] = np.array([0.0])
        #env.physics.set_state(np.array([0., 0., 0., 0.,]))
        
    print(env.physics.get_state())
        
    plt.imshow(env.physics.render(camera_id=0))
    
    
    pole_com = [np.copy(env.physics.named.data.xipos["pole_1"])]
    cart_com = [np.copy(env.physics.named.data.xipos["cart"])]
    hinge_anchor = [np.copy(env.physics.named.data.xanchor["hinge_1"])]
        
    while not time_step.last():
        action = np.zeros(action_spec.shape)
        time_step = env.step(action)
        
        # why are pole and cart xpos the same?
        #pole_xpos.append(np.copy(env.physics.named.data.xpos["pole_1"]))
        #cart_xpos.append(np.copy(env.physics.named.data.xpos["cart"]))
        pole_com.append(np.copy(env.physics.named.data.xipos["pole_1"]))
        cart_com.append(np.copy(env.physics.named.data.xipos["cart"]))
        hinge_anchor.append(np.copy(env.physics.named.data.xanchor["hinge_1"]))
        
    return np.stack(pole_com), np.stack(cart_com), np.stack(hinge_anchor)

In [13]:
pole_com, cart_com, hinge_anchor = evolve()

[0.        1.5707963 0.        0.       ]


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [14]:
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.cm as cm

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
colors = cm.rainbow(np.linspace(0, 1, len(pole_com)))

ax.scatter(*pole_com.T, color=colors)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<mpl_toolkits.mplot3d.art3d.Path3DCollection at 0x7fdc6dee4d90>

In [15]:
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
colors = cm.rainbow(np.linspace(0, 1, len(cart_com)))

ax.scatter(*cart_com.T, color=colors)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<mpl_toolkits.mplot3d.art3d.Path3DCollection at 0x7fdc6df0e4d0>

In [16]:
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
colors = cm.rainbow(np.linspace(0, 1, len(hinge_anchor)))

ax.scatter(*hinge_anchor.T, color=colors)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<mpl_toolkits.mplot3d.art3d.Path3DCollection at 0x7fdc6df0e810>

In [17]:
plt.scatter(np.zeros(len(colors)), np.arange(len(colors)) / len(colors), color=colors)

TypeError: loop of ufunc does not support argument 0 of type NoneType which has no callable sqrt method

Forward in time corresponds to going from blue to red

In [18]:
fig, ax = plt.subplots(figsize=(5,5))
colors = cm.rainbow(np.linspace(0, 1, len(pole_com)))
ax.scatter(pole_com[:, 0], pole_com[:, 2], color=colors)
ax.axis('equal')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

(-0.4547339937506944,
 0.5454635235119376,
 0.47500027276122037,
 1.0250000010467923)

In [23]:
fig, ax = plt.subplots()
ax.plot(pole_com[:, 0], 'r')
ax.plot(pole_com[:, 1], 'g')
ax.plot(pole_com[:, 2], 'b')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

[<matplotlib.lines.Line2D at 0x7fdc6c15ea10>]

In [20]:
fig, ax = plt.subplots()
ax.plot(cart_com[:, 0], 'r')
ax.plot(cart_com[:, 1], 'g')
ax.plot(cart_com[:, 2], 'b')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

[<matplotlib.lines.Line2D at 0x7fdc6c27b6d0>]

In [21]:
fig, ax = plt.subplots()
ax.plot(hinge_anchor[:, 0], 'r')
ax.plot(hinge_anchor[:, 1], 'g')
ax.plot(hinge_anchor[:, 2], 'b')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

[<matplotlib.lines.Line2D at 0x7fdc6c1e6b10>]

In [22]:
# Check if constraint is held
((pole_com - cart_com)**2).sum(-1)

array([0.25, 0.25, 0.25, ..., 0.25, 0.25, 0.25])