In [1]:
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join('', '...')))  
# Import the required modules
import numpy as np
from matplotlib import pyplot as plt
import scipy
import seaborn as sns
import pickle
import h5py
import json
import argparse
from functions.system.limbNetwork import limbNetwork
from functions.controller import controllerFuncs

## Here we will define the simulation parameters 

This involves 
1. loading the saved random network (adjacency matrix and readout weights)
2. setting the task parameters such as time for preparation and movement, and the penalties on different system states
3. setting up cost parameters

In [None]:
########## load Radnomly initialized networks from the hdf5 files ##########
def load_networks_by_spectral_radius(args):
    filename = args.filename
    spectral_radius = args.spectral_radiuses[0] + 0
    networks = []
    readouts = []
    with h5py.File(filename, 'r') as f:
        spectral_radius_group = str(spectral_radius)  # Ensure the spectral radius is in the correct format
        if spectral_radius_group in f:
            for i in range(10):  # Assuming there are always 10 networks
                network_dataset_name = f'{spectral_radius_group}/networks/network_{i}'
                readout_dataset_name = f'{spectral_radius_group}/readouts/readout_{i}'
                if network_dataset_name in f:
                    network = f[network_dataset_name][()]
                    networks.append(network)

                    readout = f[readout_dataset_name][()]
                    readouts.append(readout)
                else:
                    print(f"Dataset {network_dataset_name} not found.")
        else:
            print(f"Spectral radius group {spectral_radius_group} not found.")
    return networks, readouts


######## set simulation and cost parameters #############
def setsimparams():
    k = 0.1
    m = 1
    tau = 0.06
    delta = 1e-2
    rtime = 1.0
    prep_time = 1.0
    
    

    num_neurons = 100
    nStep = int(np.round((rtime + prep_time)/delta))
    nPrep = int(np.round(prep_time/delta))
    # the cost can penalize froma given end time, or throughout the reach period
    nTerminal = int((nStep - nPrep) / 2) # Use terinal time as OPTIONAL if the movement must end at a specific time
    nHold = 0
    w = np.zeros(shape=(8+num_neurons+1, nStep-nPrep+1))



    ##### COST PARAMETERS WITH TAU_NET on THE B MATRIX #######
    state_alpha = 100 
    prep_alpha = 100
    effort_alpha = 1e-13
    
    # total execution period lasts 1 second (rtime). But set the terminal time step as 500 ms (0.5s) where the hand must be on the target with zero velocity
    # we will use a high cost on the position (10) and relatively low cost on velocity (0.1) at the terminal time
    

    # TERMINAL COSTS: cost after terminal time until the end of the reach period. the hand must be stable on the target.
    # we can use the same penalties as before for position and velocity costs
    #for t in range(nTerminal, nStep-nPrep):
    #    w[:8, t] = (1/nTerminal) * np.array([1, .1, 0, 0, 1, .1, 0, 0]) #* ((t) / (nStep - nPrep))**6

    # COST PARAMETERS WITH HIGHER EXPONENTIAL (i.e. giving more importance to terminal position errors) #######
    for t in range(0, nStep-nPrep):
        w[:8, t] = (1/nStep) * np.array([100, 0.0, 0.000, 0, 100, 0.0, 0.000, 0])  * ((t) / (nStep - nPrep))**6

    wprep = np.zeros(shape=(8+num_neurons+1, nPrep+1))
    for t in range(0, nPrep+1):
        wprep[:8, t] = (1/nStep) * np.array([100, 1, 1, 0, 100, 1, 1, 0]) 


    ##### COST PARAMETERS WITH smaller exponential (i.e. giving importance to continuous position errors instead of terminal position) #######
    prep_alpha = 100
    effort_alpha = 1e-13 
    for t in range(0, nStep-nPrep):
        w[:8, t] = (1/nStep) * np.array([1, 0, 0, 0, 1, 0, 0, 0]) * ((t) / (nStep - nPrep))**2
    wprep = np.zeros(shape=(8+num_neurons+1, nPrep+1))
    for t in range(0, nPrep+1):
        wprep[:8, t] = (1/nStep) * prep_alpha * np.array([1, 1, 1, 0, 1, 1, 1, 0])
    r = effort_alpha + 0.0



    #wprep = np.zeros(shape=(8+num_neurons+1, nPrep+1))
    #for t in range(0, nPrep+1):
    #    wprep[:8, t] = (1/nStep) * prep_alpha * np.array([1, 1, 1, 0, 1, 1, 1, 0])
    r = effort_alpha + 0.0


    simparams = {'k': k, 'm': m, 'tau': tau, 
              'delta': delta, 'rtime': rtime, 'prep_time': prep_time, 'nStep': nStep, 'r': r, 
              'w': w, 
              'num_neurons': num_neurons, 'nPrep': nPrep, 'wprep': wprep, 'neuron_tau_net': 20e-3}
    
    return simparams

#### define the function to simulate the reaching control task to 8 different targets ###########


In [3]:
def reach8dir(params):
    # GET body matrices and SET cost matrices
    system_dynamics = limbNetwork(params)
    Atot, Btot, Htot = system_dynamics.getSystemMatrices()
    #R, Q = setCostmatrices(Atot, Btot, params['delta'], params['r'], params['w']) # cross-checked the R, Q with Fred's code. They are the same
    #R, Q = setCostmatrices_wprep(Atot, Btot, params['delta'], params['r'], params['w'], params['wprep'], params['nPrep']) # cross-checked the R, Q with Fred's code. They are the same

    # By this point, we have the system matrices (Atot,Btot) and the cost matrices (R,Q). Now we need to compute the feedback gains L.
    # For this, we need to compute the noise covariance matrix SigmaXi. We will use the same noise covariance matrix as Fred's code.
    # The noise covariance matrix is defined as follows:
    SigmaXi = system_dynamics.SigmaXi + 0.0
    SigmaOmega = system_dynamics.SigmaOmega + 0.0

    R, Q = controllerFuncs.setCostmatrices_wprep_MPC(Atot, Btot, params['delta'], params['r'], params['w'], params['wprep'], params['nPrep'])
    L = controllerFuncs.basicLQG(Atot, Btot, Q, R, SigmaXi)
    K = controllerFuncs.basicKalman(Atot, Btot, Q, R, Htot, SigmaXi, SigmaOmega)

    system_dynamics.SigmaXi[3,3] = 0                 # X force trick
    SigmaXi[3, 3] = 0                 # X force trick
    system_dynamics.SigmaXi[7,7] = 0                 # Y force trick
    SigmaXi[7, 7] = 0                 # Y force trick

    # Now we have the feedback gains L. We can use this to simulate the system.
    # For this, we need to define the simulation time and the target parameters
    simtime = np.arange(0, params['rtime']+params['prep_time'] + params['delta'], params['delta'])
    num_targconditions = 8
    num_states = (params['num_neurons'] + 1 + 8) * 3
    num_controls = params['num_neurons']
    num_steps = len(simtime)
    all_states = np.zeros((num_steps, num_targconditions, num_states))
    all_estimates = np.zeros((num_steps, num_targconditions, num_states))
    all_controls = np.zeros((num_steps, num_targconditions, num_controls))
    all_innovation = np.zeros((num_steps, num_targconditions, num_states))
    
    # targets location is in the indices 8 and 12 for X and Y respectively
    # so for 8 targ conditions, we will use radial targets at 5 cm from origin in 8 directions from 0 to 360 degrees
    # the target states are defined as follows:
    target_states = np.zeros((num_targconditions, num_states))
    target_states[:, 8 + params['num_neurons'] + 1] = 0.2*np.cos(np.arange(0, 2*np.pi, 2*np.pi/num_targconditions))   # X
    target_states[:, 12 + params['num_neurons'] + 1] = 0.2 * np.sin(np.arange(0, 2*np.pi, 2*np.pi/num_targconditions))  # Y

    # target states for preparation
    prep_states = np.zeros((num_targconditions, num_states))
    prep_states[:, 2 * (8 + params['num_neurons'] + 1)] = 0.0   # X
    prep_states[:, 2 * (8 + params['num_neurons'] + 1) + 4] = 0.0   # X

    # the target states are active for the entire simulation time
    # so the all_states matrix will have the target states repeated for all time steps
    all_states[:, :, 8 + params['num_neurons'] + 1] = target_states[:, 8 +params['num_neurons'] + 1].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_states[:, :, 12 + params['num_neurons'] + 1] = target_states[:, 12 + params['num_neurons'] + 1].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_states[:, :, 2 * (8 + params['num_neurons'] + 1)] = prep_states[:, 2 * (8 + params['num_neurons'] + 1)].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_states[:, :, 2 * (8 + params['num_neurons'] + 1) + 4] = prep_states[:, 2 * (8 + params['num_neurons'] + 1) + 4].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_estimates[:, :, 8 + params['num_neurons'] + 1] = target_states[:, 8 + params['num_neurons'] + 1].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_estimates[:, :, 12 + params['num_neurons'] + 1] = target_states[:, 12 + params['num_neurons'] + 1].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_estimates[:, :, 2 * (8 + params['num_neurons'] + 1)] = prep_states[:, 2 * (8 + params['num_neurons'] + 1)].reshape((1, num_targconditions)).repeat(num_steps, axis=0)
    all_estimates[:, :, 2 * (8 + params['num_neurons'] + 1) + 4] = prep_states[:, 2 * (8 + params['num_neurons'] + 1) + 4].reshape((1, num_targconditions)).repeat(num_steps, axis=0)

    # set the desired offset states as 1 for the preparatory and actual target states
    all_states[:, :, 8 + params['num_neurons']] = 1.0
    all_states[:, :, 2*(8 + params['num_neurons'])] = 0.0
    all_states[:, :, 3*(8 + params['num_neurons'])] = 0.0
    all_estimates[:, :, 8 + params['num_neurons']] = 1.0
    all_estimates[:, :, 2*(8 + params['num_neurons'])] = 0.0
    all_estimates[:, :, 3*(8 + params['num_neurons'])] = 0.0


    #
    for i in range(num_targconditions):
        system_dynamics.reset(all_states[0, i, :])
        print('target number', i)
        for t in range(num_steps-1):
            # apply mechanical perturbation lateral (OPTIONAL)
            if t >= params['nPrep'] + 10 and  t  <= params['nPrep'] + 20   and i == 2:
                system_dynamics.states[3] = 0 # setting a non-zero value applies external perturbation for 100 ms 

            current_state = all_states[t, i, :] + 0.0
            current_estimate = all_estimates[t, i, :] + 0.0

            if t < params['nPrep']:
                # apply control based on current estimate
                current_control = -L[0, :, :] @ current_state
            else:
                current_control = -L[t - params['nPrep'], :, :] @ current_state
            

            all_controls[t, i, :] = current_control + 0.0


            ## MPC method to recompute the control gains at each time step
            #if t < num_steps - 40:
            #    L = mixedLQG(Atot, Btot, Q[t:t+40, :, :], R[t:t+40, :, :], SigmaXi, params['nPrep'])
            #else:
            #    L = mixedLQG(Atot, Btot, Q[t:num_steps, :, :], R[t:num_steps, :, :], SigmaXi, params['nPrep'])

            #update the estimate
            #next_estimate, next_innovation = bodyins.nextEstimate(K[t, :, :], current_control)
            # update the state
            nextState = system_dynamics.nextState(current_control)
        
        
            all_states[t+1, i, :] = nextState + 0.0
            all_estimates[t+1, i, :] = nextState + 0.0
            #all_innovation[t+1, i, :] = next_innovation + 0.0
    
    results = {}
    results['states'] = all_states
    results['estimates'] = all_estimates
    results['controls'] = all_controls
    results['innovations'] = all_innovation
    results['L'] = L
    results['K'] = K
    results['Atot'] = Atot
    results['Btot'] = Btot
    results['bodyins'] = system_dynamics
    results['Htot'] = Htot

    return results

#### Function to run simulation of an 8 target reaching task once #####

In [4]:
########## function to run multiple simulations of the reach task based on user input (single or batch runs) ##########
def run_simulation(args):
    spectral_radius = args.spectral_radiuses[0]
    networks, readouts = load_networks_by_spectral_radius(args)
    network_id = args.network_id + 0

    simparams_normal = setsimparams()
    
    
    network = networks[network_id]
    readout = readouts[network_id]
    networkparams = {}
    networkparams['Wrec'] = network
    networkparams['Wout'] = readout
    networkparams['spectral_radius'] = spectral_radius
    networkparams['network_id'] = network_id

    networkparams['Win'] = np.zeros((networkparams['Wrec'].shape[0], 8))
    S = 8
    N = networkparams['Wrec'].shape[0]

    simparams_normal.update(networkparams)
    

    # run the simulations
    results_8dir = reach8dir(simparams_normal)
    
    return results_8dir

In [5]:
########## function to save the results in a new hdf5 file ##########
def save_results_to_hdf5(filename, results_8dir, args):
    spectral_radius = args.spectral_radiuses[0]
    i = args.network_id + 0
    with h5py.File(filename, 'a') as f:
        group = f.require_group(str(spectral_radius))
        network_group = group.require_group(f'network_{i}')
        res_group = network_group.require_group('results_8dir')

        for key, value in results_8dir.items():
            # If dataset already exists, delete it so we can overwrite
            if key in res_group:
                del res_group[key]

            # Save numpy arrays directly
            if isinstance(value, np.ndarray):
                res_group.create_dataset(key, data=value)
                continue

            # Save basic scalars/strings directly
            if isinstance(value, (int, float, bool, str, bytes)):
                res_group.create_dataset(key, data=value)
                continue

            # Try pickling arbitrary Python objects and store as bytes
            try:
                pickled = pickle.dumps(value)
                dt = h5py.special_dtype(vlen=bytes)
                res_group.create_dataset(key, data=pickled, dtype=dt)
            except Exception:
                # If it still can't be saved, skip with a warning (e.g., large/unpicklable objects like live class instances)
                print(f"Skipping {key}: unsupported type {type(value)} (not saved)")
            

In [None]:
########## MAIN function that runs the task simulations (this example script runs one random newtork) ##########

##___________________To use the inhibitory stabilized network structure use the following args___________##
parser = argparse.ArgumentParser(description='Generate ISN neural networks with different spectral radiuses and save them to a HDF5 file.')
parser.add_argument('--filename', type=str, default = 'isn_networks.hdf5', help='The HDF5 file to save the networks to.')

#_________define the spectral radius or variance parameter for random connectivity___________#
# CAUTION: Note that for denseRNNs, although we call this parameter as spectral_radiuses, it is actually the variance scaling factor 'g'
# If using denseRNNs, set this value to 0.8 to load from existing datastore files
parser.add_argument('--spectral_radiuses', type=float, nargs='+', default=[0.3], help='The spectral radiuses of the networks.')

parser.add_argument('--seed', type=int, default=0, help='The seed for the random number generator.')
parser.add_argument('--network_id', type=int, default=0, help='The ID of the network to run the simulation for.')
parser.add_argument('--neural_effort_scaling', type=int, default=1e-12, help='The neural effort level. 1 for low, 2 for high.')
# In a Jupyter notebook avoid parsing kernel args: use empty list to get defaults
args = parser.parse_args([])
# the file should be in the data store directory above the current directory
# use only the basename from the provided filename to avoid accidental absolute paths
args.filename = os.path.join('datastore', 'WeightsData', 'ISN', os.path.basename(args.filename))
args.neural_effort_scaling = 1
store_data_filename = 'datastore/SimulationData/8dirreach_task/denseRNN/denseRNN_results_singlesimulation.hdf5'


##___________________To use the dense RNN without any inhibitory stabilized network structure use the following args___________##
#parser = argparse.ArgumentParser(description='Generate dense neural networks with different spectral radiuses and save them to a HDF5 file.')
#parser.add_argument('--filename', type=str, default = 'dense_network_weights.hdf5', help='The HDF5 file to save the networks to.')

##_________define the spectral radius or variance parameter for random connectivity___________#
## CAUTION: Note that for denseRNNs, although we call this parameter as spectral_radiuses, it is actually the variance scaling factor 'g'
### If using denseRNNs, set this value to 0.8 to load from existing datastore files
#parser.add_argument('--spectral_radiuses', type=float, nargs='+', default=[0.8], help='The spectral radiuses of the networks.')

#parser.add_argument('--seed', type=int, default=0, help='The seed for the random number generator.')
#parser.add_argument('--network_id', type=int, default=0, help='The ID of the network to run the simulation for.')
#parser.add_argument('--neural_effort_scaling', type=int, default=1e-12, help='The neural effort level. 1 for low, 2 for high.')
## In a Jupyter notebook avoid parsing kernel args: use empty list to get defaults
#args = parser.parse_args([])
## the file should be in the data store directory above the current directory
## use only the basename from the provided filename to avoid accidental absolute paths
#args.filename = os.path.join('datastore', 'WeightsData', 'denseRNN', os.path.basename(args.filename))
#args.neural_effort_scaling = 1
#store_data_filename = 'datastore/SimulationData/8dirreach_task/denseRNN/denseRNN_results_singlesimulation.hdf5'


	
# Run a single simulation
args.network_id = 0
results_8dir = run_simulation(args)
# pass the results and args to the save function (correct argument order)
save_results_to_hdf5(store_data_filename, results_8dir, args)
#


(109, 109) (109, 100)
target number 0
target number 1
target number 2
target number 3
target number 4
target number 5
target number 6
target number 7
Skipping bodyins: unsupported type <class 'functions.system.limbNetwork.limbNetwork'> (not saved)


In [7]:
args.spectral_radiuses

[0.3]