# Export Model Artifacts

## Setup

Configure `model_dir` below to the directory containing the `A1GymEnv-v0.zip` model. 
If necessary, set `stats_dir` as well to the subfolder containing the recorded statistics.

In [81]:
from pathlib import Path

project_dir = Path('.').absolute().parent
model_dir = project_dir / 'debug_logs' / 'ppo' / 'A1GymEnv-v0_6'
stats_dir = model_dir / 'stats'


params_dir = model_dir / 'parameters'
params_dir.mkdir(exist_ok=True, parents=True)
sample_inp_oup_dir = model_dir / 'sample_inp_oup'
sample_inp_oup_dir.mkdir(exist_ok=True, parents=True)

## Export Model

The rest of the code should be runnable without any changes. 

In [82]:
# Extract actor network from SB3 PPO policy
# Export actor network weights

import torch
import torch.nn as nn
import numpy as np
from utils import ALGOS

model_path = model_dir / "A1GymEnv-v0.zip"
model = ALGOS["ppo"].load(model_path)

def extract_policy_layers(model):
    mlp_extractor = model.policy.mlp_extractor.policy_net
    action_net = model.policy.action_net

    layers = []
    for m in mlp_extractor.modules():
        if not isinstance(m, nn.Sequential):
            layers.append(m)
    layers.append(action_net)

    return nn.Sequential(*layers)

def save_tensor_as_csv(path, t: torch.Tensor):
    t_np = t.detach().cpu().numpy()
    np.savetxt(path, t_np, delimiter = ',')

policy_net = extract_policy_layers(model)
policy_net.eval()
print(policy_net)

for name, param in policy_net.named_parameters():
    if len(param.size()) == 1:
        param = torch.unsqueeze(param, axis=-1)
    name = name.replace('.', '_')
    print(name, param.size())
    save_tensor_as_csv(params_dir / f'{name}.csv', param)

Sequential(
  (0): Linear(in_features=73, out_features=256, bias=True)
  (1): ReLU()
  (2): Linear(in_features=256, out_features=256, bias=True)
  (3): ReLU()
  (4): Linear(in_features=256, out_features=12, bias=True)
)
0_weight torch.Size([256, 73])
0_bias torch.Size([256, 1])
2_weight torch.Size([256, 256])
2_bias torch.Size([256, 1])
4_weight torch.Size([12, 256])
4_bias torch.Size([12, 1])


In [83]:
# Export normalizer parameters
import pickle
normalizer_path = model_dir / "A1GymEnv-v0" / "vecnormalize.pkl"
with open(normalizer_path, "rb") as pkl:
    normalizer = pickle.load(pkl)

obs_mean = normalizer.obs_rms.mean 
obs_std = np.sqrt(normalizer.obs_rms.var + normalizer.epsilon)
obs_mean = obs_mean.reshape(1,-1)
obs_std = obs_std.reshape(1,-1)
print(obs_mean.shape, obs_std.shape)

np.savetxt(params_dir / 'obs_mean.csv', obs_mean, delimiter = ',')
np.savetxt(params_dir / 'obs_std.csv', obs_std, delimiter = ',')

(1, 73) (1, 73)


In [84]:
# Export default pose and motor polarity
from blind_walking.envs.env_wrappers import simple_openloop

pose_offset = simple_openloop.LaikagoPoseOffsetGenerator()._pose.reshape(1,-1)
np.savetxt(params_dir / 'pose_offset.csv', pose_offset, delimiter = ',')
print(pose_offset)

motor_polarity = np.array([1, -1, -1] * 4).reshape(1,-1)
np.savetxt(params_dir / 'motor_signs.csv', motor_polarity, delimiter = ',')

[[ 0.    0.67 -1.25  0.    0.67 -1.25  0.    0.67 -1.25  0.    0.67 -1.25]]


In [85]:
# Export nn observations and actions
motion_logs = (
    "motor_position",
    "motor_velocity",
    "motor_torque",
    "base_rpy",
    "base_rpy_rate",
    "base_position",
    "base_velocity",
    "time",
)

nn_logs = (
    "nn_observations",
    "nn_actions"
)

all_logs = motion_logs + nn_logs 

def to_camel_case(snake_str):
    components = snake_str.split('_')
    # We capitalize the first letter of each component except the first one
    # with the 'title' method and join them together.
    return components[0] + ''.join(x.title() for x in components[1:])

def export_logged_trajectories(stat_dir, param_dir, name):
    traj = np.load(str(stat_dir / f'{name}.npy'))
    traj = np.squeeze(traj)
    oup_name = to_camel_case(name)
    np.savetxt(str(param_dir / f'{oup_name}.csv'), traj,  delimiter = ',')

for log_name in all_logs:
    export_logged_trajectories(stats_dir, params_dir, log_name)

In [86]:
# Export sample in-out pairs
import json

input_dim = obs_mean.shape[1]
policy_net.eval()
policy_net = policy_net.to(torch.device('cpu'))

sample_inputs = {
    'zeros': torch.zeros(1, input_dim),
    'ones': torch.ones(1, input_dim)
}
inp_oup_names = {}
for name, inp_value in sample_inputs.items():
    inp_name = name + '_in.csv'
    oup_name = name + '_out.csv'
    inp_oup_names[inp_name] = oup_name
    oup_value = policy_net(inp_value)
    save_tensor_as_csv(sample_inp_oup_dir / inp_name, inp_value)
    save_tensor_as_csv(sample_inp_oup_dir / oup_name, oup_value)

with open(sample_inp_oup_dir / 'inp_oup_name_pairs.txt', 'w') as file:
    for inp_name, oup_name in inp_oup_names.items():
        line = ','.join([inp_name, oup_name]) + "\n"
        file.write(line)

In [87]:
# Export butterworth filter coefficients and history
# Export initial motor angles
import gym 
import numpy as np
import utils.import_envs
env = gym.make("A1GymEnv-v0")
env.reset()

filter = env.robot._action_filter
a = filter.a.T.copy()
b = filter.b.T.copy()
print(a.shape, b.shape)
print(a)

np.savetxt(params_dir / 'filter_a_coeff.csv', a, delimiter =',')
np.savetxt(params_dir / 'filter_b_coeff.csv', b, delimiter =',')

initial_motor_pos = env.robot.GetMotorAngles() * motor_polarity
print(initial_motor_pos)
np.savetxt(params_dir / 'final_motor_position.csv', initial_motor_pos, delimiter=',')


Init CPG gait=walk, duty_factor=0.75, period=0.6666666666666666
argv[0]=
(2, 12) (2, 12)
[[ 1.          1.          1.          1.          1.          1.
   1.          1.          1.          1.          1.          1.        ]
 [-0.77567951 -0.77567951 -0.77567951 -0.77567951 -0.77567951 -0.77567951
  -0.77567951 -0.77567951 -0.77567951 -0.77567951 -0.77567951 -0.77567951]]
[[-0.00237892 -0.90104205  1.79415947 -0.01273846 -0.89080655  1.8118248
  -0.01493733 -0.91010711  1.7881657   0.01362704 -0.88808935  1.81157552]]
