In [None]:
from env_utils import get_envs
import numpy as np
from env_utils import initialize_envs, close_envs

In [None]:
arg_dict = {}
arg_dict["env"] = "CartpoleSwingup"
arg_dict["dim_in"] = 6
arg_dict["dim_out"] = 4
arg_dict["dim_states"] = 4
arg_dict["dim_actions"] = 1
arg_dict["dim_angles"] = 1
arg_dict["target_reward"] = -0.08

In [None]:
arg_dict = {}
arg_dict["env"] = "PendulumEnv"
arg_dict["dim_in"] = 4
arg_dict["dim_out"] = 2
arg_dict["dim_states"] = 2
arg_dict["dim_actions"] = 1
arg_dict["dim_angles"] = 1
arg_dict["target_reward"] = -0.08

In [None]:
training_envs, test_envs = get_envs(**arg_dict)

In [None]:
training_envs

In [None]:
seed = 1
envs = initialize_envs(training_envs, seed)

In [None]:
envs

In [None]:
env1 = envs[0]

In [None]:
print("action space high: ", env1.action_space.high)
print("action space low: ", env1.action_space.low)

In [None]:

def anylize_env(env, test_episodes = 200,max_episode_step = 1000, render = False):
    print("state space shape: ", env.observation_space.shape)
    print("state space lower bound: ", env.observation_space.low)
    print("state space upper bound: ", env.observation_space.high)
    print("action space shape: ", env.action_space.shape)
    print("action space lower bound: ", env.action_space.low)
    print("action space upper bound: ", env.action_space.high)
    print("reward range: ", env.reward_range)
    rewards = []
    steps = []
    for episode in range(test_episodes):
        env.reset()
        step = 0
        episode_reward = 0
        for _ in range(max_episode_step):
            if render:
                env.render()
            step += 1
            action = env.action_space.sample()
            state, reward,done,_= env.step(action)
            episode_reward += reward
            if done:
               # print("done with step: %s " % (step))
                break
        steps.append(step)
        rewards.append(episode_reward)
    env.close()
    print("Randomly sample actions for %s episodes, with maximum %s steps per episodes"
          % (test_episodes, max_episode_step))
    print(" average reward per episode: %s, std: %s " % (np.mean(rewards), np.std(rewards) ))
    print(" average steps per episode: ", np.mean(steps))
    print(" average reward per step: ", np.sum(rewards)/np.sum(steps))

In [None]:
import numpy as np
import scipy.stats as stats
import torch
import matplotlib.pyplot as plt
import time
from torch.nn import init
import math

In [None]:
def parameterized_truncated_normal(uniform, mu, sigma, a, b):
    normal = torch.distributions.normal.Normal(0, 1)

    alpha = (a - mu) / sigma
    beta = (b - mu) / sigma

    alpha_normal_cdf = normal.cdf(alpha)
    p = alpha_normal_cdf + (normal.cdf(beta) - alpha_normal_cdf) * uniform

    p = p.numpy()
    one = np.array(1, dtype=p.dtype)
    epsilon = np.array(np.finfo(p.dtype).eps, dtype=p.dtype)
    v = np.clip(2 * p - 1, -one + epsilon, one - epsilon)
    x = mu + sigma * np.sqrt(2) * torch.erfinv(torch.from_numpy(v))
    x = torch.clamp(x, a[0], b[0])
    return x

def sample_truncated_normal(shape=(), mu=0.0, sigma=1.0, a=-2, b=2):
    #uni = torch.from_numpy(np.random.uniform(0, 1, shape))
    uni = torch.rand(shape)
    return parameterized_truncated_normal(uni, mu=mu, sigma=sigma, a=a, b=b)

In [None]:
init_mean = np.array([0,1,2,3])
init_var = np.array([1,4,2,3])
lb = -4
ub = 8
popsize = 100000
sol_dim = 4

In [None]:
t = time.time()
mean, var = init_mean, init_var
a, b = torch.tensor([lb]*sol_dim), torch.tensor([ub]*sol_dim)
size = [popsize,sol_dim]
lb_dist, ub_dist = mean - lb, ub - mean
constrained_var = np.minimum(np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)), var)
#constrained_var = np.sqrt(constrained_var)
mu= torch.tensor(mean)
sigma = torch.tensor(constrained_var)
r1 = sample_truncated_normal(size, mu, sigma, a, b).numpy()
print(time.time()-t)

fig, axs = plt.subplots(sol_dim,sharex=True)
for i in range(2):
    axs[i].hist(r1[:, i], density=True, histtype='stepfilled', alpha=0.2, bins=50)

r1.shape

In [None]:
mean, var = init_mean, init_var
t = time.time()
X = stats.truncnorm(lb, ub, loc=np.zeros_like(mean), scale=np.ones_like(mean))
lb_dist, ub_dist = mean - lb, ub - mean
constrained_var = np.minimum(np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)), var)
r = X.rvs(size=[popsize, sol_dim]) * np.sqrt(constrained_var) + mean
print(time.time()-t)

fig, axs = plt.subplots(sol_dim,sharex=True)
for i in range(2):
    axs[i].hist(r[:, i], density=True, histtype='stepfilled', alpha=0.2, bins=50)

r.max()

In [None]:
fig, axs = plt.subplots(4,sharex=True)

axs[0].hist(r1[:, 0], density=True, histtype='stepfilled', alpha=0.2, bins=50)
axs[1].hist(r[:, 0], density=True, histtype='stepfilled', alpha=0.2, bins=50)
axs[2].hist(r1[:, 1], density=True, histtype='stepfilled', alpha=0.2, bins=50)
axs[3].hist(r[:, 1], density=True, histtype='stepfilled', alpha=0.2, bins=50)
r.shape

In [None]:
samples = X.rvs(size=[popsize, sol_dim]) * np.sqrt(constrained_var) + mean

In [None]:
while (t < self.max_iters) and np.max(var) > self.epsilon:
    lb_dist, ub_dist = mean - self.lb, self.ub - mean
    constrained_var = np.minimum(np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)), var)

    samples = X.rvs(size=[self.popsize, self.sol_dim]) * np.sqrt(constrained_var) + mean
    costs = self.cost_function(samples)
    elites = samples[np.argsort(costs)][:self.num_elites]

    new_mean = np.mean(elites, axis=0)
    new_var = np.var(elites, axis=0)

    mean = self.alpha * mean + (1 - self.alpha) * new_mean
    var = self.alpha * var + (1 - self.alpha) * new_var

    t += 1
sol, solvar = mean, var

In [None]:
torch.min()

In [None]:

mean = np.array([-0.5,0.5])
var = np.array([1,1])
lb, ub = -1, 1
a, b = (lb-init_mean)/init_var, (ub-init_mean)/init_var 
popsize, sol_dim = 100000, 2
X = stats.truncnorm(a, b, loc=init_mean, scale=init_var)
lb_dist, ub_dist = mean - lb, ub - mean
constrained_var = np.minimum(np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)), var)

t = time.time()
r = X.rvs(size=[popsize, sol_dim]) 
print(time.time()-t)
plt.hist(r[:,1], density=True, histtype='stepfilled', alpha=0.2, bins=50)
r.shape

In [None]:
plt.hist(samples[:,2])

In [None]:
import numpy as np
from mpc_pendulum import MPC
from utils import *
from models.dynamic_model import DynamicModel
from tqdm import trange, tqdm
import copy

#training_envs = ['PendulumEnv_070-070-v0']
training_env_params = [
            (0.7, 0.7),
            (0.9, 0.9),
            (0.7, 0.9),
            (0.9, 0.7)
        ]

test_env_params = [
            (0.8, 0.8),
            (1.0, 1.0),
            (0.8, 1.0),
            (1.0, 0.8)
        ]

training_envs, test_envs = get_env_names(
    "PendulumEnv", training_env_params, test_env_params)

seed = 1
envs_train = initialize_envs(training_envs, seed)
envs_test = initialize_envs(test_envs, seed)

env = envs_train[0]


config_path = "config-new.yml"
config = load_config(config_path)
nn_config = config['NN_config']
mpc_config = config['mpc_config']
mpc_controller = MPC(mpc_config=mpc_config)
mpc_controller.reset()


In [None]:
torch.manual_seed(0)
np.random.seed(0)
model = DynamicModel(NN_config=nn_config)


pretrain_episodes = 50
max_step = 100
#data_list = []
for epi in range(pretrain_episodes):
    obs = env.reset()
    done = False
    for i in range(max_step):
        if done:
            break
        action = env.action_space.sample()
        obs_next, reward, done, state_next = env.step(action)
        data = [0, obs, action, obs_next - obs]
        #data_list.append(data)
        model.add_data_point(data)
        obs = copy.deepcopy(obs_next)

In [None]:
loss = model.fit()

In [None]:
test_episode = 2
test_epoch = 20
for ep in range(test_epoch):
    print('epoch: ', ep)

    for epi in range(test_episode):
        #print('episode: ', epi)
        acc_reward = 0
        obs = env.reset()

        done = False
        mpc_controller.reset()
        for i in range(max_step):
            if done:
                break
            

            action = np.array([mpc_controller.act(model=model, state=obs)])
            obs_next, reward, done, state_next = env.step(action)
            env.render()

            # append data but not training
            model.add_data_point([0, obs, action, obs_next - obs])
            obs = copy.deepcopy(obs_next)
            acc_reward += reward
            # logger.info('reward: {}', reward)
            #time.sleep(0.1)
        print('step: ', i, 'acc_reward: ', acc_reward)
        env.close()

    # use the collected date to train model
    print('fitting the model...')
    #model.n_epochs = 20
    model.fit()