In [1]:

# DAgger Implementation for RLBench
# Paper: https://www.ri.cmu.edu/pub_files/2011/4/Ross-AISTATS11-NoRegret.pdf
# Author: Krishan Rana

from rlbench.environment import Environment
from rlbench.action_modes import ArmActionMode, ActionMode
from rlbench.observation_config import ObservationConfig
from rlbench.tasks import ReachTarget, PushButtons
import pdb
import numpy as np
from spatialmath import SE3, SO3
import pdb
import roboticstoolbox as rb
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random

class RRMC():
    def __init__(self):
        self.panda = rb.models.DH.Panda()

    def fkine(self):
        # Tip pose in world coordinate frame
        wTe = SE3(env._task.robot.arm.get_tip().get_position())*SE3.Eul(env._task.robot.arm.get_tip().get_orientation())
        return wTe
    
    def target_pose(self):
        # Target pose in world coordinate frame
        wTt = SE3(env._task.target.get_position())*SE3.Eul(env._task.target.get_orientation())
        return wTt
    
    def p_servo(self, gain=1):
        
        wTe = self.fkine();
        wTt = self.target_pose();
    
        # Pose difference
        eTt = wTe.inv() * wTt
        # Translational velocity error
        ev = eTt.t
        # Angular velocity error
        ew = eTt.rpy() * np.pi/180

        # Form error vector
        e = np.r_[ev, ew]
        v = gain * e
        return v
    
    def compute_action(self):
        
        try:
            v = self.p_servo(gain=0.3)
            v[3:] *= 10
            q = env._task.robot.arm.get_joint_positions()
            action = np.linalg.pinv(self.panda.jacobe(q)) @ v

        except np.linalg.LinAlgError:
            action = np.zeros(env_task.action_size)
            print('Fail')

        return action
        
        
class Model(nn.Module):
    
    def __init__(self, act_dim):
        super().__init__()
        self.conv1 = nn.Conv2d(3,6,5)
        self.pool = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(6,16,5)
        self.fc1 = nn.Linear(2704, 520)
        self.fc2 = nn.Linear(520,108)
        self.fc3 = nn.Linear(108,54)
        self.fc4 = nn.Linear(54, act_dim)
        
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = torch.tanh(self.fc4(x))
        return x
    

class Agent:
    
    def __init__(self):
        self.pi = Model(env_task.action_size).to(device)
        
    def train(self):
        print("Training...")
        batch = random.sample(experience_dataset, 32)
        batch_obs = torch.as_tensor([demo[1] for demo in batch], dtype=torch.float32).to(device).permute(0,3,1,2)
        predicted_actions = self.pi(batch_obs)
        ground_truth_actions = torch.as_tensor([demo[0] for demo in batch], dtype=torch.float32).to(device).detach()
        loss = criterion(predicted_actions, ground_truth_actions)
        loss.backward()
        optimizer.step()
        
    def get_action(self, obs):
        with torch.no_grad():
            obs = torch.as_tensor(obs, dtype=torch.float32).to(device).unsqueeze(0)
            obs = obs.permute(0,3,1,2)
            act = self.pi(obs).cpu().numpy()[0]
            return act
    
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

obs_config = ObservationConfig()
obs_config.set_all(False)
obs_config.wrist_camera.rgb = True
obs_config.joint_positions  = True

action_mode = ActionMode(ArmActionMode.ABS_JOINT_VELOCITY)
env_task = Environment(action_mode, '', obs_config, False)
env_task.launch()

env = env_task.get_task(ReachTarget)
agent = Agent()


obs = env.reset()
control_prior = RRMC()
criterion = nn.MSELoss()
optimizer = optim.SGD(agent.pi.parameters(), lr=0.01, momentum=0.9)
total_steps = 1000
episode_length = 40
experience_dataset = []

descriptions, state = env.reset()
obs = state.wrist_rgb

for i in range(total_steps):
    if i%episode_length == 0 and i > 100:
        descriptions, state = env.reset()
        agent.train()
    action = agent.get_action(obs)
    action = control_prior.compute_action()
    pdb.set_trace()
    next_state, reward, done = env.step(action)
    nobs = state.wrist_rgb
    #experience_dataset.append([control_prior.act(obs), obs])
    experience_dataset.append([action, obs])
    obs = nobs
    
print('Done')
env.shutdown()
    

AttributeError: 'ReachTarget' object has no attribute '_task'