In [1]:
import os
import time
import sys
import json
import numpy as np
import torch as th
import matplotlib.pyplot as plt
import motornet as mn
from simple_policy import Policy
from simple_task import CentreOutFF
from simple_utils import *
from tqdm import tqdm
import pickle

print('All packages imported.')
print('pytorch version: ' + th.__version__)
print('numpy version: ' + np.__version__)
print('motornet version: ' + mn.__version__)

All packages imported.
pytorch version: 2.1.1
numpy version: 1.26.2
motornet version: 0.2.0


In [2]:
device = th.device('cpu')

effector = mn.effector.RigidTendonArm26(muscle=mn.muscle.RigidTendonHillMuscle())
env = CentreOutFF(effector=effector, max_ep_duration=1.)

policy = Policy(env.observation_space.shape[0], 128, env.n_muscles, device=device)
optimizer = th.optim.Adam(policy.parameters(), lr=10**-3)

In [3]:
model_name = "simple"
if (not os.path.exists(model_name)):
    os.mkdir(model_name)

In [4]:
batch_size =    64
n_batch    =  1000
interval   =   100
model_name = "simple"

losses = {
    'overall': [],
    'position': [],
    'muscle': [],
    'muscle_derivative': [],
    'hidden': [],
    'hidden_derivative': [],
    'jerk' : []}

for batch in tqdm(range(n_batch),
                desc=f"Training {n_batch} batches of {batch_size}",
                unit="batch"):

    data = run_episode(env, policy, batch_size, catch_trial_perc=50, condition='train', ff_coefficient=0.0, detach=False)
    loss, losses_weighted = cal_loss(data)

    # backward pass & update weights
    optimizer.zero_grad() 
    loss.backward()
    th.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=1.)  # important!
    optimizer.step()

    # save weights/config/losses
    if (batch % interval == 0) and (batch != 0):
        save_model(env, policy, losses, model_name, quiet=True)
        with open(model_name + "/" + model_name + '_data.pkl', 'wb') as f:
            pickle.dump(data, f)
        print_losses(losses_weighted=losses_weighted, model_name=model_name, batch=batch)
        data, _ = test(model_name + "/" + model_name + "_cfg.json", model_name + "/" + model_name + "_weights")
        plot_stuff(data, model_name + "/" + model_name, batch=batch)

    # Update loss values in the dictionary
    losses['overall'].append(loss.item())
    losses['position'].append(losses_weighted['position'].item())
    losses['muscle'].append(losses_weighted['muscle'].item())
    losses['muscle_derivative'].append(losses_weighted['muscle_derivative'].item())
    losses['hidden'].append(losses_weighted['hidden'].item())
    losses['hidden_derivative'].append(losses_weighted['hidden_derivative'].item())
    losses['jerk'].append(losses_weighted['jerk_loss'].item())

save_model(env, policy, losses, model_name)
with open(model_name + "/" + model_name + '_data.pkl', 'wb') as f:
    pickle.dump(data, f)
print_losses(losses_weighted=losses_weighted, model_name=model_name, batch=batch)
data, _ = test(model_name + "/" + model_name + "_cfg.json", model_name + "/" + model_name + "_weights")
plot_stuff(data, model_name + "/" + model_name, batch=batch)

Training 1000 batches of 64:   2%|▏         | 16/1000 [00:06<06:47,  2.42batch/s]


KeyboardInterrupt: 

In [None]:
save_model(env, policy, losses, model_name)

In [None]:
# PLOT LOSS FUNCTION(s)

log = json.load(open(model_name + "_log.json",'r'))
print(log["losses"].keys())
w=50
for loss in ["overall","position","muscle","hidden","jerk"]:
    fig,ax = plot_training_log(log=log["losses"],loss_type=loss, w=w)
    ax.set_title(f"{loss} (w={w})")

In [None]:
# TEST NETWORK ON CENTRE-OUT

data = test(model_name + "_cfg.json", model_name + "_weights")
fig, ax = plot_simulations(xy=data['xy'], target_xy=data['tg'], figsize=(8,6))
fig, ax = plot_activation(data['all_hidden'], data['all_muscle'])
fig, ax = plot_kinematics(all_xy=data["xy"], all_tg=data["tg"], all_vel=data["vel"])