In [2]:
import numpy as np
import math
import random
import matplotlib
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import time

Matplotlib created a temporary config/cache directory at /tmp/matplotlib-yunr5gyn because the default path (/home/jetson/.config/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [4]:
from Arm_Lib import Arm_Device
real_arm = Arm_Device()  # Get DOFBOT object

In [5]:
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

Defines Arm related functions for moving the arm. 

In [28]:
def set_angles(arm, angles, sec_per_angle):
    """Sets all the angles of arm
    Args:
        arm - the real robot
        angles - the angles to set
        t - the amount of time to move each angle
    """
    angles = (angles * 180/np.pi).astype(int)
    print("here", angles)
    t = angles*sec_per_angle
    for joint, (angle, movetime) in enumerate(zip(angles, t)):
        time.sleep(1)
        print(f"Command: Joint {joint+1} -> {angle} deg in {movetime} ms")
        arm.Arm_serial_servo_write(joint+1, angle, movetime)
        time.sleep(1)
        
def read_all_joints_in_radians(arm):
    q = np.array([arm.Arm_serial_servo_read(id) for id in range(1,6)])*np.pi/180
    return q

In [9]:
real_arm.Arm_serial_servo_write(1, 170, 1000)

In [22]:
# search for gpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using {device} (device)')

Using cuda (device)


Define the architechure for the Actor and Critic Networks

## Initialize Dofbot

In [10]:
ex = np.array([1,0,0])
ey = np.array([0,1,0])
ez = np.array([0,0,1])
l0 = 0.061 # base to servo 1
l1 = 0.0435 # servo 1 to servo 2
l2 = 0.08285 # servo 2 to servo 3
l3 = 0.08285 # servo 3 to servo 4
l4 = 0.07385 # servo 4 to servo 5
l5 = 0.05457 # servo 5 to gripper
P01 = ( l0 + l1 ) * ez 
P12 = np.zeros (3) # translation between 1 and 2 frame in 1 frame
P23 = l2 * ex # translation between 2 and 3 frame in 2 frame
P34 = - l3 * ez # translation between 3 and 4 frame in 3 frame
P45 = np.zeros (3) # translation between 4 and 5 frame in 4 frame
P5T = -( l4 + l5 ) * ex 
print(P01,P12,P23,P34,P45,P5T)

P = np.array([P01, P12, P23, P34, P45, P5T]).T
H = np.array([ez, -ey, -ey, -ey, -ex]).T
limits = np.array([0,180] * 6).reshape(6, 2)
limits[4, :] = [0, 270]
print(f"limits: \n{limits}")

[0.     0.     0.1045] [0. 0. 0.] [0.08285 0.      0.     ] [-0.      -0.      -0.08285] [0. 0. 0.] [-0.12842 -0.      -0.     ]
limits: 
[[  0 180]
 [  0 180]
 [  0 180]
 [  0 180]
 [  0 270]
 [  0 180]]


In [11]:
from Robot import UR5Arm

During the Training Cycle, the following steps are performed for each epoch:
1. Initialize the Robot to a random state.
2. select a random end effector position and orientation that
   converges to valid joint angles using inverse kinematics
3. Run the following in each training epoch that lasts 10 seconds per iteration
4. The robot can takes 1 action in a discrete action space for each joint:
   - the DQN network outputs a value between -10 and 10 and the arm is commanded to
     move each servo that angle amount with velocity V
5. Repeat

First we need to establish the bounds for this robot. 
We move the robot to the home position and measure the maximal length the end effector can reach.

In [12]:
def generate_P_from_limits(limits, y_bound = 0.05):
    """Tests the target EE configuration process
    Generates a random end effector point based on Position vector limits
    Args:
        limits is a tuple of the smallest and largest norms of P_ees generated
    """
    match = False
    tries = 0
    while not match and tries<100:
        target_norm = np.random.uniform(limits[0], limits[1])
        print(f"Generated Target Norm: {target_norm:.4f}")
        random = np.random.rand(3)
        norm_vec = np.linalg.norm(random)
        out = random * target_norm/norm_vec
        print(out)
        if out[1] > y_bound:
            print(f"P generated: {out}")
            return out 
        print("Retrying: y value not within bounds")
        tries += 1
    print("No match was found, maybe the y_bound was too strict?")
    return False

In [13]:
def generate_Q_from_limits(dofbot: UR5Arm, limits, y_bound = 0.05):
    # generate random 5 joint angles
    # test whether the end effector position is in the bound
    tries = 100
    curr_try = 1
    while True:
        init = np.floor(np.random.rand(5)*np.array([180,180,180,180,270]))
        guess = init[None].T * np.pi/180
        R_guess, P_guess = fwdkin(dofbot, guess)
       
        if P_guess[1] > y_bound:
            P_len = np.linalg.norm(P_guess)
            if P_len >= limits[0] and P_len <= limits[1]:
                print(f"Generated Position Candidate in {curr_try} tries")
                return guess
        curr_try += 1
        if curr_try >= tries: break
    raise(ValueError, "No match was found, maybe the y_bound was too strict?")

    # verify that this point is in the bound
 

In [14]:
# define the neural networks
"""
The neural network learns an approximation of the value function
with a neural network
"""
class PPO(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(PPO, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)


In [15]:
import random
from collections import namedtuple, deque

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [16]:
from numpy.linalg import norm
def linearity(P_start, P_end, P_curr):
    """returns the linearity score 
    (| P12 x P01 |) / |P12|
    """
    P12 = P_end - P_start
    P01 = P_start - P_curr
    d = norm(np.cross(P12, P01))/norm(P12)
    return d

### Press the Third Button the DOFBOT
(Set the robot to the home position)

In [17]:
virtual_arm = UR5Arm(P, H, limits)
real_arm = Arm_Device()

In [20]:
from kinematics import fwdkin, invkin
j = read_all_joints_in_radians(real_arm)[None].T
R_home, P_home = fwdkin(virtual_arm, j)
limits = 0.07, P_home[2]
print(limits)

(0.07, 0.3985291515494993)


In [24]:
# def train(episodes: int, virtual_arm: UR5Arm, real_arm: Arm, limits):

ms_per_angle = 6
# coefficient for time weight
ee_distance_limits = ()
end_effector_distance_tolerance = 1e-4
y_bound = 0.07
t_coeff = 0.1
conv_coef = 0.9
l_coeff = 0.1  # weight of linearity score
time_per_trial = 10  # in seconds

BATCH_SIZE = 128  # BATCH_SIZE is the number of transitions sampled from the replay buffer
GAMMA = 0.99      # GAMMA is the discount factor as mentioned in the previous section
EPS_START = 0.9   # EPS_START is the starting value of epsilon
EPS_END = 0.05    # EPS_END is the final value of epsilon
EPS_DECAY = 1000  # EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
TAU = 0.005       # TAU is the update rate of the target network
LR = 1e-4         # LR is the learning rate of the ``AdamW`` optimizer

# Get number of actions from gym action space
n_actions = 5
# Get the number of state observations
n_observations = 3

policy_net = PPO(n_observations, n_actions).to(device)

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)


In [25]:
import torch
import torch.nn.functional as F
from torch.distributions import MultivariateNormal

def optimize_model(policy_net, states, actions, old_probs, advantages, epsilon_clip=0.2):
    # Convert lists to PyTorch tensors
    states = torch.tensor(states, dtype=torch.float32).to(device)
    actions = torch.tensor(actions, dtype=torch.float32).to(device)
    old_probs = torch.tensor(old_probs, dtype=torch.float32).to(device)
    advantages = torch.tensor(advantages, dtype=torch.float32).to(device)

    # Get current policy distribution
    policy, _ = policy_net(states)
    probs = policy.pdf(actions)

    # Calculate ratio and surrogate loss
    ratio = probs / old_probs
    unclipped_loss = ratio * advantages
    clipped_loss = torch.clamp(ratio, 1.0 - epsilon_clip, 1.0 + epsilon_clip) * advantages
    surrogate_loss = -torch.min(unclipped_loss, clipped_loss).mean()

    # Update the policy network
    optimizer.zero_grad()
    surrogate_loss.backward()
    optimizer.step()

In [26]:
def create_input(curr_angles, R_curr, P_curr, target_end_effector_pose):
    pass

In [34]:
# set gripper to position 0
real_arm.Arm_serial_servo_write(6, 10, 500)
# optimization and ML constants
episodes = 100
for ep in range(episodes):
    print(f"Starting Training Loop for Episode {ep}")
    """ generate start and end positions """
    P_start = generate_P_from_limits(limits, y_bound)
    P_end = generate_P_from_limits(limits, y_bound)
    
    Q_start = generate_Q_from_limits(virtual_arm, limits, y_bound)
    Q_end = generate_Q_from_limits(virtual_arm, limits, y_bound)

    # initialize the arm to a random state. 
    print(Q_start.flatten())
    set_angles(real_arm, Q_start.flatten(), ms_per_angle)
    
    # allow the arm to move for 5 seconds
    time_start = time.time()
    current_time = 0
    reward = 0

    curr_angles = read_all_joints_in_radians(real_arm)
    R_curr, P_curr = fwdkin(virtual_arm, curr_angles[None].T)
    exit(1)
    while(current_time - time_start < time_per_trial):
        start_time_step = time.time()

        observations = create_input(curr_angles, R_curr, P_curr, target_end_effector_pose)
        dq = policy_net(observations).T

        next_angles = curr_angles + dq
        dofbot_limits = virtual_arm.get_limits()
        if next_angles.any() > dofbot_limits[:,0]:
            next_angles = max(dofbot_limits[:,0], next_angles)
            reward -= 10
            
        if next_angles.any() < dofbot_limits[:,1]:
            next_angles = min(dofbot_limits[:,1], next_angles)
            reward -= 10
            
        set_angles(real_arm, next_angles, dq * ms_per_angle)

        next_angles = read_all_joints()
        R_next, P_next = fwdkin(dofbot, curr_angles)
        
        current_time = time.time()
        elapsed_time = current_time - start_time_step
        
        dist = np.linalg.norm(P_end - P_curr) 
        lin_score = linearity(P_start, P_end, P_curr)
        # loss should be based on the euclidean distance, time it took, and linearity of the path
        reward -= conv_coef * dist - tcoef * elapsed_time - l_coef * lin_score

        """
        TODO: add replay Memory for stability
        memory.push(state, action, next_state, reward)
        
        TODO: add replayMemory for stability
        optimize_model_batch
        """
        
        if dist.all() < end_effector_distance_tolerance:
            break

        # PPO update without replay buffer
        """ 
        To-do 
        check this and see what this is doing
        """
        optimize_model(policy_net, observations, dq, old_probs, advantages)
        
        current_time = time.time()
    


Starting Training Loop for Episode 0
Generated Target Norm: 0.3731
[0.21046199 0.30357971 0.05275944]
P generated: [0.21046199 0.30357971 0.05275944]
Generated Target Norm: 0.3052
[0.01910396 0.04282973 0.30154195]
Retrying: y value not within bounds
Generated Target Norm: 0.3003
[0.22758848 0.15268135 0.12282767]
P generated: [0.22758848 0.15268135 0.12282767]
Generated Position Candidate in 2 tries
Generated Position Candidate in 2 tries
[2.16420827 0.41887902 1.67551608 0.45378561 4.39822972]
here [124  24  96  26 252]
Command: Joint 1 -> 124 deg in 744 ms
Arm_serial_servo_write I2C error
Command: Joint 2 -> 24 deg in 144 ms
Arm_serial_servo_write I2C error
Command: Joint 3 -> 96 deg in 576 ms
Arm_serial_servo_write I2C error
Command: Joint 4 -> 26 deg in 156 ms
Arm_serial_servo_write I2C error
Command: Joint 5 -> 252 deg in 1512 ms
Arm_serial_servo_write I2C error


NameError: name 'target_end_effector_pose' is not defined

In [None]:
import pickle as pkl