In [None]:
%matplotlib inline
from matplotlib import cm
import matplotlib.pyplot as plt
import numpy as np

reward_seqs = [0.0, 0.18974148, 0.1992907, 0.350656, 0.35882896, 0.39109656, \
 0.39486712, 0.39511558, 0.46375644, 0.47071177, 0.5655106, 0.70239437, \
 0.70598173, 0.72608304, 0.7819717, 0.79253316, 0.8283197, 0.9700098, \
 0.99150366, 1.0] 
fig, axs = plt.subplots(1, 3)
fig.set_size_inches(24, 4.8)
for i in range(3):
    ax = axs[i]
    ax.scatter(np.random.randint(5, size=20), 
        np.random.randint(5, size=20),
        c=reward_seqs, cmap=cm.jet)
    # ax.scatter(init_pose_seqs[idx[-best_k:], i, self.gripper_mid_pt, 0], 
    #     init_pose_seqs[idx[-best_k:], i, self.gripper_mid_pt, 2], marker='v', 
    #     c=reward_seqs[idx[-best_k:]])
    # ax.scatter(init_pose_seqs[idx[:-best_k], i, self.gripper_mid_pt, 0], 
    #     init_pose_seqs[idx[:-best_k], i, self.gripper_mid_pt, 2], 
    #     c=reward_seqs[idx[:-best_k]])
    # ax.scatter(selected[:, i, 0], selected[:, i, 2], c='r')
    # ax.scatter(others[:, i, 0], others[:, i, 2], color=[0.0,0.3,0.7,0.3])

    ax.set_title(f"GRIP {i+1}")
    ax.set_xlabel('x coordinate')
    ax.set_ylabel('z coordinate')

color_map = cm.ScalarMappable(cmap=cm.jet)
color_map.set_array(reward_seqs)
plt.colorbar(color_map, ax=axs)

# plt.savefig(path)
plt.show()

In [None]:
# Covariance matrix adaptation evolution strategy (CMA-ES)
def optimize_action_CMA_ES(   
    self,
    init_pose_seqs,
    act_seqs,
    reward_seqs,    # [n_sample]
    best_k_ratio=0.05
):
    best_k = max(3, int(init_pose_seqs.shape[0] * best_k_ratio))
    m = np.mean(init_pose_seqs)
    n_samples = init_pose_seqs.shape[0]
    C = np.eye(n_samples)
    p_sigma = 0
    p_c = 0
    idx = np.argsort(reward_seqs)
    print(f"Selected top reward seqs: {reward_seqs[idx[-best_k:]]}")
    # print(f"Selected top init pose seqs: {init_pose_seqs[idx[-best_k:], :, self.gripper_mid_pt, :7]}")

    self.visualize_sampled_init_pos(init_pose_seqs, reward_seqs, idx, \
        os.path.join(self.rollout_path, f'plot_cem_s{self.sample_iter_cur}_o{self.opt_iter_cur}'))

    init_pose_seqs_sample = []
    act_seqs_sample = []
    for i in range(best_k, 0, -1):
        init_pose_seq = init_pose_seqs[idx[-i]]
        # print(f"Selected init pose seq: {init_pose_seq[:, self.gripper_mid_pt, :7]}")
        init_pose_seqs_sample.append(init_pose_seq)
        act_seqs_sample.append(act_seqs[idx[-i]])
        j = 1

        if i > 1:
            n_samples = int(init_pose_seqs.shape[0] / (2**i))
        else:
            n_samples = init_pose_seqs.shape[0] - len(init_pose_seqs_sample) + 1
        
        while j < n_samples:
            mid_point_seq, angle_seq = self.get_center_and_rot_from_pose(init_pose_seq)
            init_pose_seq_sample = []
            for k in range(init_pose_seq.shape[0]):
                p_noise = np.clip(np.array([0, 0, np.random.randn()*0.03]), a_max=0.1, a_min=-0.1)
                rot_noise = np.clip(np.random.randn() * np.pi / 36, a_max=0.1, a_min=-0.1)
            
                new_mid_point = mid_point_seq[k, :3] + p_noise
                new_angle = angle_seq[k] + rot_noise
                init_pose = self.get_pose(new_mid_point, new_angle)
                init_pose_seq_sample.append(init_pose)

                # import pdb; pdb.set_trace()

            init_pose_seq_sample = np.stack(init_pose_seq_sample)
            act_seq_sample = self.get_action_seq_from_pose(init_pose_seq_sample)

            init_pose_seqs_sample.append(init_pose_seq_sample)
            act_seqs_sample.append(act_seq_sample)
            
            # print(f"Selected init pose seq: {init_pose_seq_sample[:, self.gripper_mid_pt, :7]}")

            j += 1

    # import pdb; pdb.set_trace()
    init_pose_seqs_sample = np.stack(init_pose_seqs_sample)
    act_seqs_sample = np.stack(act_seqs_sample)

    return init_pose_seqs_sample, act_seqs_sample

In [None]:
    def optimize_action_MPPI(   # Model-Predictive Path Integral (MPPI)
        self,
        init_pose_seqs,
        act_seqs,       # [n_sample, -1, action_dim]
        reward_seqs     # [n_sample]
    ):
        print(f"reward_seqs: {reward_seqs}")
        # [n_sample, 1, 1]
        # reward_seqs_exp = np.exp(self.reward_weight * (reward_seqs - np.mean(reward_seqs)))
        reward_seqs = (reward_seqs - np.mean(reward_seqs)) / np.var(reward_seqs)
        reward_seqs_norm = reward_seqs / np.linalg.norm(reward_seqs)
        reward_seqs_exp = np.exp(self.reward_weight * reward_seqs_norm)
        print(f"reward_seqs_exp: {reward_seqs_exp}")

        # [-1, action_dim]
        eps = 1e-8
        mid_point_x = np.full((self.n_sample, init_pose_seqs.shape[1]), self.mid_point[0])
        
        rot_noise_seqs = np.arccos((init_pose_seqs[:, :, self.gripper_mid_pt, 0] - mid_point_x) / self.sample_radius)
        print(rot_noise_seqs)
        print(reward_seqs_exp.reshape(-1, 1))
        print(reward_seqs_exp.reshape(-1, 1) * rot_noise_seqs)

        rot_noise_seq = np.sum(reward_seqs_exp.reshape(-1, 1) * rot_noise_seqs, axis=0) / (np.sum(reward_seqs_exp) + eps)
        # act_seq = np.sum(reward_seqs_exp.reshape(-1, 1, 1, 1) * act_seqs, axis=0) / (np.sum(reward_seqs_exp) + eps)

        print(f"rot_noise_seq: {rot_noise_seq}")

        init_pose_seq = []
        act_seq = []
        for rot_noise in rot_noise_seq:
            init_pose_seq.append(self.get_pose(self.mid_point, rot_noise))
            act_seq.append(self.get_action_seq(rot_noise))

        init_pose_seq = np.stack(init_pose_seq)
        act_seq = np.stack(act_seq)
        
        # [-1, action_dim]
        return init_pose_seq, act_seq