In [2]:
import numpy as np
import matplotlib.pyplot as plt
import os
path = os.path.abspath(os.path.join(os.getcwd(),".."))
print(path)
import sys, os
sys.path.append(os.path.dirname(os.getcwd()))
from dynamics_predict.dynamics_networks import DynamicsNetwork, DynamicsParamsOptimizer, EncoderDynamicsNetwork, EncoderDecoderDynamicsNetwork, VAEDynamicsNetwork
from rl.policy_networks import DPG_PolicyNetwork
from utils.load_params import load_params
from utils.common_func import rand_params
from dynamics_predict.defaults import DYNAMICS_PARAMS, HYPER_PARAMS
from environment import envs
import torch



/home/quantumiracle/research/COS513_project/src


In [5]:
env_name = 'inverteddoublependulum'
data_path = path+'/data/dynamics_data/'+env_name+'/test_dynamics.npy'
param_dim = len(DYNAMICS_PARAMS[env_name+'dynamics'])
print('parameter dimension: ', param_dim)

parameter dimension:  5


In [6]:
test_data = np.load(data_path, allow_pickle=True)
print('number of samples in dest data: ', len(test_data))
idx=5  # index of sample to test: 0-10
sa = test_data[idx]['sa']
s_ = test_data[idx]['s_']
params = test_data[idx]['params'] # true parameters
sa = torch.Tensor(sa)
s_ = torch.Tensor(s_)
print(sa.shape, s_.shape)
print('true parameters: ', params)

number of samples in dest data:  10
torch.Size([275717, 12]) torch.Size([275717, 11])
true parameters:  [0.04490007 9.947507   0.4617513  0.6338595  1.1446342 ]


## Switch Linear Dynamics

In [8]:
from torch.distributions import Normal
import torch.nn.functional as F
import torch.nn as nn


class DynamicsParamsOptimizer():
    """ 
    Dynamics parameters optimization model (gradient-based) based on a trained 
    forward dynamics prediction network: (s, a, learnable_params) -> s_ with real-world data. 
    """
    def __init__(self, state_space, action_space, param_dim, param_ini_v, hidden_dim=256, hidden_activation=F.relu, output_activation=None, num_hidden_layers=4, lr=1e-2, gamma=0.99):
        self.model = SLDynamicsNetwork(state_dim, action_dim, param_dim, switch_dim)
        self.criterion = nn.MSELoss()

    def train(self, s, a, theta, s_, epoch):
        """ s,a concat with param (learnable) -> s_ """

        for ep in range(epoch):
            s_pred = self.model.forward(s, a, theta)
            self.model.optimizer.zero_grad()
            loss = self.criterion(s_pred, s_)
            loss.backward()
            self.model.optimizer.step()
            if ep%100==0:
                print('epoch: {}, loss: {}'.format(ep, loss.item()))
            

class SLDynamicsNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, param_dim, latent_dim, switch_dim):
        self.A = torch.rand((switch_dim, state_dim, state_dim))
        self.B = torch.rand((switch_dim, state_dim, action_dim))
        self.E = torch.rand((param_dim, latent_dim))
        self.epsilon = self.gaussian_noise(state_dim, scale=0.1)
        self.switch_logits = torch.nn.Sequential(
            nn.Linear(state_dim, switch_dim)
        )
        self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)

    def gaussian_noise(self, state_dim, scale):
        normal = Normal(0, 1)
        epsilon = scale * normal.sample(state_dim) 
        return epsilon

    def forward(self, s, a, theta):
        if not isinstance(s, torch.Tensor):
            s = torch.Tensor(s)
        if not isinstance(a, torch.Tensor):
            a = torch.Tensor(a)
        if not isinstance(theta, torch.Tensor):
            theta = torch.Tensor(theta)

        logits_ = self.switch_logits(s)
        switch_var = F.gumbel_softmax(logits, tau=1, hard=False)  # if hard, return one-hot
        A_w = switch_var @ self.A  # chosen by the switch variable
        B_w = switch_var @ self.B  # chosen by the switch variable
        s_ = self.E@theta@(A_w@s + B_w@a) + self.epsilon

        return s_


NameError: name 'nn' is not defined

In [5]:
import numpy as np

a=np.random.uniform(0,1, size=(4,3))
b=np.random.uniform(0,1, size=(3,4,2))
c=np.matmul(a, b)
print(c.shape)


ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 4 is different from 3)

## 1. Direct optimization through NN

In [None]:
def train_params(Env, embedding, epoch=1000, lr=1e-2, dynamics_model_path='./', env_settings={}, default_params={}, data_path='./'):
    """
    Train the dynamics parameters with a trained/fixed dynamics prediction model, using the real-world dataset.
    Model input: state, action, dynamics parameters (learnable).
    Model output: next state.
    """
    env = Env(**env_settings, **default_params)
    if embedding:
        param_dim = HYPER_PARAMS[Env.name+'dynamics']['latent_dim']
        param_ini_v = np.zeros(param_dim)
        model = DynamicsParamsOptimizer(env.observation_space, env.action_space, param_dim, param_ini_v, num_hidden_layers=6, lr=lr)
    else:
        param_dim = len(DYNAMICS_PARAMS[Env.name+'dynamics'])
        param_ini_v = [np.mean(v) for k, v in env.parameters_spec.items() if k in DYNAMICS_PARAMS[Env.name+'dynamics']]
        model = DynamicsParamsOptimizer(env.observation_space, env.action_space, param_dim, param_ini_v, num_hidden_layers=6, lr=lr)
    model.dynamics_model.load_state_dict(torch.load(dynamics_model_path))
    model.dynamics_model.eval()

#     sa = np.load(data_path+'/sa.npy')
#     s_ = np.load(data_path+'/s_.npy')

    for ep in range(epoch):
        s_pred = model.forward(sa)
        model.optimizer.zero_grad()
        loss = model.criterion(s_pred, s_)
        loss.backward()
        model.optimizer.step()
        if ep%100==0:
            print('epoch: {}, loss: {}'.format(ep, loss.item()))
        
    return model.params

In [None]:
Env = envs[env_name]  # pandapushik2dsimple', 'inverteddoublependulum'
Type = ['EncoderDynamicsNetwork', 'EncoderDecoderDynamicsNetwork', 'VAEDynamicsNetwork'][2]

alpha = train_params(Env, embedding=True, \
              dynamics_model_path=path+'/data/dynamics_data/{}/model/{}_dim{}/dynamics'.format(env_name, Type, str(param_dim)), \
                   lr=1e-3,   epoch=100, data_path=data_path)
print('Method 1: predicted encoded parameters: ', alpha)

In [None]:
from dynamics_predict.dynamics_networks import DynamicsEncoder, DynamicsVariationalEncoder

# load encoder
if Type in ['EncoderDynamicsNetwork', 'EncoderDecoderDynamicsNetwork']:  # normal encoder
    encoder = DynamicsEncoder(param_dim=len(DYNAMICS_PARAMS[env_name]), latent_dim=HYPER_PARAMS[env_name+'dynamics']['latent_dim'])  # latent dimension needs to align with the trained DynamicsEncoder 
elif Type == 'VAEDynamicsNetwork': # variational auto-encoder
    encoder = DynamicsVariationalEncoder(param_dim=len(DYNAMICS_PARAMS[env_name+'dynamics']), latent_dim=HYPER_PARAMS[env_name+'dynamics']['latent_dim'])  # 
encoder.load_state_dict(torch.load(path+'/data/dynamics_data/{}/model/{}_dim{}/encoder'.format(env_name, Type, str(len(DYNAMICS_PARAMS[env_name+'dynamics'])))))
encoder.eval()

# normalize the data
[norm_mean, norm_std] = np.load(path+'/data/dynamics_data/{}/norm_factor.npy'.format(env_name))
print('norm factors: ', norm_mean, norm_std)
params_ = (params - norm_mean)/norm_std
params_ = torch.FloatTensor([params_])
# print(params.shape)
mu, logvar = encoder(params_)
true_alpha = mu.detach().cpu().numpy()[0]
print('true encoded parameters: ', true_alpha)

## Bayesian optimization

In [None]:
import numpy as np
from matplotlib import pyplot as plt
import torch
import sys, os
sys.path.append(os.path.dirname(os.getcwd()))
from environment import envs
from dynamics_predict.defaults import DYNAMICS_PARAMS, HYPER_PARAMS
from bayes_opt import BayesianOptimization
from dynamics_predict.dynamics_networks import DynamicsNetwork


class EmbeddingBayesianOptimization():
    def __init__(self, Env_name, data_path, dynamics_model_path):
        self.x=sa
        self.y=s_
#         self.x = tocrch.Tensor(np.load(data_path+'sa.npy'))
#         self.y = torh.Tensor(np.load(data_path+'s_.npy'))
        env = envs[Env_name]()
        self.env_name = Env_name
        self.dynamics_model = DynamicsNetwork(state_space=env.observation_space, action_space=env.action_space, \
                                              num_hidden_layers=6, param_dim=HYPER_PARAMS[self.env_name+'dynamics']['latent_dim'])
        self.dynamics_model.load_state_dict(torch.load(dynamics_model_path))

    def get_params_bound(self,):
        v_limit = 10.
        bounds = {'alpha_{}'.format(i): [-v_limit, v_limit] for i in range(HYPER_PARAMS[self.env_name+'dynamics']['latent_dim'])}
        return bounds
    
    def black_box_function(self, **alpha):
        """Function with unknown internals we wish to maximize.
        This is just serving as an example, for all intents and
        purposes think of the internals of this function, i.e.: the process
        which generates its output values, as unknown.
        """
        x=torch.cat((self.x, torch.Tensor(list(alpha.values())).repeat(self.x.shape[0],1)),axis=-1)
        y_=self.dynamics_model(x).detach().cpu().numpy()

        loss = np.square(np.subtract(self.y, y_)).mean()

        return -loss  # BO gives the maximum, so negagive loss


    def optimize(self, random_init=20, n_iter=500, result_path='params_bo.npy'):
        params_bound = self.get_params_bound()

        optimizer = BayesianOptimization(
            f=self.black_box_function,
            pbounds=params_bound,  # Here it defines the parameters to be randomized, which should also match with input arguments of black_box_function()
            verbose=2,  # verbose =2 print each iteration, verbose = 1 prints only when a maximum is observed, verbose = 0 is silent
            random_state=1,
        )
        
#         optimizer.probe(  # probe a certain datapoint at the beginning
#         params={"alpha_0": -1.0045, "alpha_1": 0.3682}, lazy=True)

        optimizer.maximize(
            init_points=random_init,
            n_iter=n_iter,
        )

        print(optimizer.max['params'])
        optimized_params = optimizer.max['params']
        np.save(result_path, [params_bound, optimized_params])
        return optimizer.res


In [None]:
dynamics_model_path = path+'/data/dynamics_data/{}/model/{}_dim{}/dynamics'.format(env_name, Type, str(param_dim))

data_path = path+'/data/dynamics_data/'+env_name+'/test_dynamics.npy'

ebo = EmbeddingBayesianOptimization(env_name, data_path, dynamics_model_path)
optimization_records = ebo.optimize()
np.save('../data/imgs/{}/bo{}_data.npy'.format(env_name, idx), optimization_records)

In [None]:
from matplotlib.colors import LogNorm
from matplotlib import cm
import scipy.interpolate
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
# idx=5
font = {
#         'family' : 'normal',
#         'weight' : 'bold',
        'size'   : 16}
optimization_records = np.load('../data/imgs/{}/bo{}_data.npy'.format(env_name, idx), allow_pickle=True)
matplotlib.rc('font', **font)

z=[]
x=[]
y=[]
alpha_id = [0,1]
for i, res in enumerate(optimization_records):
#     print("Iteration {}: \n\t{}".format(i, res))
    z.append(res['target'])
    x.append(res['params']['alpha_{}'.format(str(alpha_id[0]))])
    y.append(res['params']['alpha_{}'.format(str(alpha_id[1]))])

x=np.array(x)
y=np.array(y)
z=np.array(z)

# the optimal point found by BO
x_bo = x[np.argmax(z)]
y_bo = y[np.argmax(z)]

xmin = x.min()
xmax = x.max()
ymin = y.min()
ymax = y.max()
grid_x, grid_y = np.mgrid[xmin:xmax:500j, ymin:ymax:500j]
values = scipy.interpolate.griddata((x, y), z, (grid_x, grid_y), method='linear')
plt.figure(figsize=(4,3))
plt.imshow(values, origin='lower', extent=[xmin, xmax, ymin, ymax],
          aspect='auto', cmap='inferno')
plt.plot(x,y, '+', alpha=0.8)
plt.xlabel(r'$\alpha_{}$'.format(str(alpha_id[0])))
plt.ylabel(r'$\alpha_{}$'.format(str(alpha_id[1])))
plt.colorbar()
print(x_bo, y_bo)
plt.scatter(x_bo, y_bo, s=600, marker="*", c='white')
# plt.scatter(*true_alpha, s=300, marker=".", c='red', alpha=0.8)
plt.plot(x_bo, y_bo, "*", c='red', markersize=20, alpha=0.6)

plt.plot(*true_alpha[alpha_id], '^', c='lime', markersize=10, alpha=0.8)

plt.savefig('../data/imgs/{}/bo{}.pdf'.format(env_name, idx), bbox_inches='tight')
plt.show()


In [None]:
from matplotlib.colors import LogNorm
from matplotlib import cm

z=[]
x=[]
y=[]
for i, res in enumerate(optimization_records):
#     print("Iteration {}: \n\t{}".format(i, res))
    z.append(res['target'])
    x.append(res['params']['alpha_0'])
    y.append(res['params']['alpha_1'])
# x = y = np.linspace(-10, 10, 300)
# X, Y = np.meshgrid(x, y)
# x = X.ravel()
# y = Y.ravel()
x=np.array(x)
y=np.array(y)
z=np.array(z)
xmin = x.min()
xmax = x.max()
ymin = y.min()
ymax = y.max()
fig, axs = plt.subplots(ncols=1, sharey=True, figsize=(7, 4))
# fig.subplots_adjust(hspace=0.5, left=0.07, right=0.93)
ax = axs # ax = axs[k]
hb = ax.hexbin(x, y, C=z, gridsize=50, cmap='spring', bins=None)  # 'inferno'
ax.axis([xmin, xmax, ymin, ymax])
ax.set_title("Hexagon binning")
cb = fig.colorbar(hb, ax=ax)
cb.set_label('counts')
# plt.colorbar()
plt.show()

In [None]:
np.pi/4