In [None]:
# Entities
import numpy as np
import torch

from processing.RL.DDPG.ddpg import DDPG
from processing.robotics.arm_propagator import ArmPropagator, ElectromagnetEndEffector, RevoluteJoint
from processing.system_animation import animate_system
from processing.utilities.entities import Cylinder

# Attitude
from processing.attitude.attitude_propagator import AttitudePropagator
from processing.attitude.torques.base import TorqueObject
from processing.attitude.torques.eddy_current import EddyCurrentTorque

# Environment
from propagator.bin.environment import Environment

In [None]:
def save_step(t: float, prop: list, at: AttitudePropagator, ar: ArmPropagator, tau: np.ndarray):
    # TODO: Move to within single classes
    # Convert to numpy
    prop = np.array(prop)

    # Save arm
    if ar._timestamps is None:
        ar._timestamps = np.array([t])
        ar._prop_sol = prop[0:12].reshape(-1, 1)
        ar.joint_torques = tau.reshape(-1, 1)
    else:
        ar._timestamps = np.hstack((ar._timestamps, np.array([t])))
        ar._prop_sol = np.hstack((ar._prop_sol, prop[0:12].reshape(-1, 1)))
        ar.joint_torques = np.hstack((ar.joint_torques, tau.reshape(-1, 1)))

    # Save end effector results
    ar.end_effector._timestamps = ar._timestamps
    if ar.end_effector.locations is None:
        ar.end_effector.locations = prop[19:22].reshape(-1, 1)
        ar.end_effector.poses = prop[22:25].reshape(-1, 1)
    else:
        ar.end_effector.locations = np.hstack((ar.end_effector.locations, prop[19:22].reshape(-1, 1)))
        ar.end_effector.poses = np.hstack((ar.end_effector.poses, prop[22:25].reshape(-1, 1)))

    # Save attitude
    at._timestamps = ar._timestamps
    if at._prop_sol is None:
        at._prop_sol = prop[12:19].reshape(-1, 1)
    else:
        at._prop_sol = np.hstack((at._prop_sol, prop[12:19].reshape(-1, 1)))

    return

# System setup

In [None]:
# Generate debris
debris = Cylinder(
    mass=950.0,
    radius=2.5,
    height=5.0,
    thickness=0.1,
    sigma=35000000.0
)

# Generate robotic arms (UR10 standard)
# Joints
scale = 10
joints = [
    RevoluteJoint(0, scale*0.1807, np.pi/2),
    RevoluteJoint(scale*-0.6127, 0, 0),
    RevoluteJoint(scale*-0.57155, 0, 0),
    RevoluteJoint(0, scale*0.17415, np.pi/2),
    RevoluteJoint(0, scale*0.11985, -np.pi/2),
    RevoluteJoint(0, scale*0.11655, 0)
]

# End effector
electromagnet: ElectromagnetEndEffector = ElectromagnetEndEffector(
    n_turns=500.0,
    radius=1.0,
    current=50.0
)

# External moments
# Eddy current
eddy: TorqueObject = EddyCurrentTorque(
    entity=debris,
    chaser_w0=[0.0, 0.0, 0.0],
    electromagnets=[electromagnet]
)

# Save attitude results
attitude = AttitudePropagator(entity=debris, M_ext=eddy)

# Save robotic arm results
base_offset = np.array([10, 0, 5])
arm = ArmPropagator(joints=joints, end_effector=electromagnet, base_offset=base_offset)

# Environment Setup

In [None]:
# Set propagation settings
t_step = 0.1  # Propagation time step [s]

# Set initial conditions
y0_arm = [
    0.0, -0.7, -0.3, 0.0, 0.0, 0.0,   # Initial joint angles
    0.0, 0.0, 0.0, 0.0, 0.0, 0.0      # Initial joint velocities
]
y0_debris = [
    0.1, 0.2, 0.0,                    # Initial debris angular velocity
    0.0, 0.0, 0.0, 1.0                # Initial debris quaternions
]   

# Initialize environment
env = Environment(
    y0_arm + y0_debris,                   # Initial conditions
    debris.Ixx,                           # Debris Ixx
    debris.Iyy,                           # Debris Iyy
    debris.Izz,                           # Debris Izz
    debris.radius,                        # Debris cylinder radius
    debris.height,                        # Debris cylinder height 
    debris.thickness,                     # Debris cylinder thickness
    debris.sigma,                         # Debris conductivity
    electromagnet.n_turns,                # Coil number of turns
    electromagnet.current,                # Coil current
    electromagnet.radius,                 # Coil radius
    arm.base_offset_x,                    # Arm base x-offset
    arm.base_offset_y,                    # Arm base y-offset
    arm.base_offset_z,                    # Arm base z-offset
    scale,                                # Arm scaling factor
    arm.dh_a,                             # Arm joint a parameters
    arm.dh_d,                             # Arm joint d parameters
    arm.dh_alpha                          # Arm joint alpha parameters
    )

# DDPG: Training

Define stopping criteria: collision or completed detumbling

In [None]:
# Define safe sphere for collision check
safe_sphere: float = max(debris.height/2, debris.radius) + 0.5  # [m]

# Define detumbling threshold
detumbling_threshold: float = 0.01  # [rad/s]

# Define max joint range and angular velocity
max_range: float = np.deg2rad(np.array([360, 360, 360, 360, 360, 360]))      # [deg]
max_joint_vel: float = np.deg2rad(np.array([120, 120, 180, 180, 180, 180]))  # [deg/s]

# Max torque
max_torques: np.ndarray = np.array([5000, 3000, 1000, 100, 1, 0.0001])

Define reward and end checks function

In [None]:
def evaluate_step(step_ret, prev_state):
    # Convert to numpy
    step_ret = np.array(step_ret)
    state = np.array([*step_ret[:15], *step_ret[19:]])
    
    # Init "done" and "reward"
    done = 0  # "done" normally negative
    reward = 0
    
    # Check: collision with safe sphere
    for i in range(len(arm.joints) + 1):
        pos: np.ndarray = arm.get_joint_position(step_ret[0:6], i)
        if np.linalg.norm(pos) <= safe_sphere:
            done = 1
            reward -= 10
            print("Collision", reward)
            return state, reward, done  # Collision happened, simulation over
            
    # Check: tumbling rates below certain threshold
    ang_vels: np.ndarray = step_ret[12:15]
    if np.all(np.abs(ang_vels) <= detumbling_threshold):
        done = 1
        # reward += 25
        print("Detumbling", reward)
        return state, reward, done  # Detumbling completed, simulation over
    
    # Penalty: end effector pointing away
    ee_loc: np.ndarray = -step_ret[19:22]
    ee_pos: np.ndarray = step_ret[22:25]
    angle = np.arccos(np.dot(ee_loc / np.linalg.norm(ee_loc), ee_pos / np.linalg.norm(ee_pos)))
    reward -= abs(angle)
    
    # Check: joint angles beyond a certain threshold
    if np.any(np.abs(step_ret[6:12]) > max_joint_vel):
        reward -= 5
        return state, reward, done  # Joint angular velocity too high, simulation over
    
    """# Check: range beyond a certain value
    if np.any(np.abs(step_ret[:6]) > max_range):
        reward -= 5
        return step_ret, reward, done  # Joint max range too high, simulation over

    # Reward: Detumbling rate
    dwx = abs(step_ret[12]) - abs(prev_state[12]) / t_step
    dwy = abs(step_ret[13]) - abs(prev_state[13]) / t_step
    dwz = abs(step_ret[14]) - abs(prev_state[14]) / t_step
    
    if sum(1 for grad in [dwx, dwy, dwz] if grad < 0) < 2:
        reward -= 0.25  # no component decreasing
    """
    
    return state, reward, done

## Train setup

In [None]:
# Init torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Define different parameters for training the agent
max_episode = 200
max_time_steps = 1000
ep_r = 0
total_step = 0
score_hist = []

# For reproducibility
# env.seed(0)
# torch.manual_seed(0)
# np.random.seed(0)

# Environment action ans states
state_dim = 21
action_dim = 6
min_Val = torch.tensor(1e-7).float().to(device)

# Create a DDPG instance
agent = DDPG(state_dim, action_dim, hidden_actor=128, hidden_critic=128)

## Training loop

In [None]:
# Train the agent for max_episodes
for i in range(max_episode):
    total_reward = 0
    step = 0

    # Reset environment
    state = np.array(env.current_state()[-1])[:state_dim]  # discard time
    for _ in range(max_time_steps):
        # Get action
        noise = max_torques * np.random.normal(0, 1, size=action_dim)
        action = (agent.select_action(state) + noise).clip(-max_torques, max_torques) # Noise and clip
        # action += ou_noise.sample()

        # Perform step
        t, step_ret = env.step(t_step=t_step, action=list(action))
        next_state, reward, done = evaluate_step(step_ret, state)

        # Retrieve reward
        total_reward += reward
        agent.replay_buffer.push((state, next_state, action, reward, done))
        state = next_state
        if done:
            break
        step += 1
        
    del env
    env = Environment(
    y0_arm + y0_debris,                   # Initial conditions
    debris.Ixx,                           # Debris Ixx
    debris.Iyy,                           # Debris Iyy
    debris.Izz,                           # Debris Izz
    debris.radius,                        # Debris cylinder radius
    debris.height,                        # Debris cylinder height 
    debris.thickness,                     # Debris cylinder thickness
    debris.sigma,                         # Debris conductivity
    electromagnet.n_turns,                # Coil number of turns
    electromagnet.current,                # Coil current
    electromagnet.radius,                 # Coil radius
    arm.base_offset_x,                    # Arm base x-offset
    arm.base_offset_y,                    # Arm base y-offset
    arm.base_offset_z,                    # Arm base z-offset
    scale,                                # Arm scaling factor
    arm.dh_a,                             # Arm joint a parameters
    arm.dh_d,                             # Arm joint d parameters
    arm.dh_alpha                          # Arm joint alpha parameters
    )

    score_hist.append(total_reward)
    total_step += step + 1
    print("Episode: {} \t Time: {:0.2f}s \t Total Reward: {:0.2f}".format(i, t, total_reward))
    agent.update(update_iteration=200, gamma=0.99, tau=0.001, batch_size=32)
    if i % 10 == 0:
        agent.save("./")

In [None]:
import matplotlib.pyplot as plt 
import numpy as np
def plot_learning_curve(x, scores):
    running_avg = np.zeros(len(scores))
    for i in range(len(running_avg)):
        running_avg[i] = np.mean(scores[max(0, i-100):(i+1)])
    plt.plot(x, running_avg)
    plt.title(f'Running average of previous {max_episode} scores')

x = [i+1 for i in range(len(score_hist))]
plot_learning_curve(x, score_hist)

# DDPG: Testing

In [None]:
test_iteration=200
  
for t in range(test_iteration):
    # Get action
    action = (agent.select_action(state)).clip(-max_torques, max_torques) # Noise and clip

    # Perform step
    _, step_ret = env.step(t_step=t_step, action=list(action))
    next_state, reward, done = evaluate_step(step_ret, state)
    save_step(t, step_ret, attitude, arm, action)

    ep_r += reward
    if done: 
        print("reward{}".format(reward))
        print("Episode \t{}, the episode reward is \t{:0.2f}".format(t, ep_r))
        ep_r = 0
        break
    state = next_state

In [None]:
x = [i+1 for i in range(arm.joint_torques.shape[-1])]
plt.title(f'Torques evolution')
plt.grid(True)
plt.plot(x, arm.joint_torques.T)

In [None]:
attitude.plot(["angular_velocity", "quaternions", "energy", "euler_angles"])

In [None]:
arm.plot()

In [None]:
animate_system(
    t=attitude.t,
    q=attitude.q,
    eu=attitude.euler_angles,
    h=debris.height,
    r=debris.radius,
    dpi=300,
    arms=[arm],
    dh_par=joints
)