In [1]:
import time
import pandas as pd
import json
from datetime import datetime
from datetime import timezone

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.ticker import ScalarFormatter
from matplotlib.colors import ListedColormap

import plotly.graph_objects as go
from plotly.subplots import make_subplots

import numpy as np

### CPG

In [2]:
class CPG_AFDC:
    def __init__(self,
                 o0_init    = 0.2, 
                 o1_init    = 0.2, 
                 o2_init    = 0.2, 
                 phi_init   = 0.01 * 2 * np.pi, 
                 _alpha     = 1.01, 
                 lrate      = 1.3):
        
        self.out0_t = o0_init
        self.out1_t = o1_init
        self.out2_t = o2_init
        self.phi    =  phi_init

        self.alpha = _alpha
        self.w20_t = 0              # w -> weight
        self.w02_t = 1              # Default = 1
        self.w2p_t = 0.03

        self.hebbian_learning_rate = lrate 
        
        # Scaling factors
        factor   = 1
        self.A02 = 1 * factor
        self.A20 = 1 * factor
        self.A2p = 1
        self.B02 = 0.01 * factor
        self.B20 = 0.01 * factor
        self.B2p = 0.01 * factor
        
        
        self.w00 = self.alpha*np.cos(self.phi)
        self.w01 = self.alpha*np.sin(self.phi)
        self.w10 = self.alpha*(-np.sin(self.phi))
        self.w11 = self.alpha*np.cos(self.phi)
        

        self.discretize_count = 0
        self.discretize_factor = 10

        self.out0       = []
        self.out1       = []
        self.outFreq    = []

        self.w20_t1 = 0.0
        self.w02_t1 = 0.0

    # Only CPG
    def update_cpg_so2(self, phi):
        self.out0_t1 = np.tanh(self.w00*self.out0_t + self.w01*self.out1_t)
        self.out1_t1 = np.tanh(self.w10*self.out0_t + self.w11*self.out1_t)

        self.update_cpg_weights_with_phi(phi)

        # save for next iteration
        self.out0_t = self.out0_t1
        self.out1_t = self.out1_t1
        self.w20_t = self.w20_t1
        self.w02_t = self.w02_t1
        
    def update_cpg_weights_with_phi(self, phi):
        self.phi = phi
        # update cpg weight 
        self.w00 = self.alpha * np.cos(self.phi)
        self.w01 = self.alpha * np.sin(self.phi) 
        self.w10 = self.alpha * (-np.sin(self.phi)) 
        self.w11 = self.alpha * np.cos(self.phi)  

    # CPG-AFDC
    def update_adaptive_cpg_with_synaptic_plasticity(self, perturbation):
        # Adaptive CPG with Synaptic Plasticity (H0, H1, H2) in eq.(2)
        self.out0_t1 = np.tanh(self.w00*self.out0_t + self.w01*self.out1_t + self.w02_t*self.out2_t)
        self.out1_t1 = np.tanh(self.w10*self.out0_t + self.w11*self.out1_t)
        self.out2_t1 = np.tanh(self.w20_t*self.out0_t + self.w2p_t*(perturbation))  

        self.update_cpg_weights_with_learning_phi()    
        self.update_sensory_feedback_neuron_weights(perturbation)

        # save for next iteration
        self.out0_t = self.out0_t1
        self.out1_t = self.out1_t1
        self.out2_t = self.out2_t1
        self.w20_t = self.w20_t1
        self.w02_t = self.w02_t1
        self.w2p_t = self.w2p_t1
        
    def update_cpg_weights_with_learning_phi(self):

        # update cpg phi with "Adaptation throgh Fast Dynamical Coupling (AFDC)" in eq.(4)
        self.delta_phi = self.hebbian_learning_rate*(self.w02_t*self.out2_t*self.w01*self.out1_t)    # Hebbian learning rule
        self.phi = self.phi + self.delta_phi                                            # Stocastic Gradient Descent (new = old + (lrate*error))

        # update cpg weight 
        self.w00 = self.alpha * np.cos(self.phi)
        self.w01 = self.alpha * np.sin(self.phi) 
        self.w10 = self.alpha * (-np.sin(self.phi)) 
        self.w11 = self.alpha * np.cos(self.phi)  

    def update_sensory_feedback_neuron_weights(self, perturbation):
        
        # initial predefined relaxation weight value
        w20_init = 0
        w02_init = 1
        w2p_init = 0.03 

        # update sensory feedback neuron weights in eq.(5)
        self.delta_w20 = - self.A20*self.out2_t*self.out0_t  - self.B20*(self.w20_t - w20_init)
        self.delta_w02 = - self.A02*self.out0_t*self.out2_t  - self.B02*(self.w02_t - w02_init)
        self.delta_w2p =   self.A2p*self.out2_t*perturbation - self.B2p*(self.w2p_t - w2p_init)

        self.w20_t1 = self.w20_t + self.delta_w20
        self.w02_t1 = self.w02_t + self.delta_w02
        self.w2p_t1 = self.w2p_t + self.delta_w2p

### RBF

In [3]:
class RBF:
    def __init__(self, nc = 100, variance_gaussian = 0.01, nM = 500, alpha = 0.25): 
        self.nc = nc
        self.variance_gaussian = variance_gaussian
        self.nM = nM
        self.alpha = alpha

        self.target_length = None
        self.ci = []
        self.cx = []
        self.cy = []

        self.M  = []
        self.M_stack = []

        self.error = []
        self.error_new_traj = []

        self.error_stack = []
        self.error_max = 0
        self.error_max_stack = []

        self.W_stack = []

        self.learning_iteration = 0
        self.learning_rate = 0.25
        self.learning_rate_2 = 0.5

    # Record
    def construct_kernels_with_cpg_one_cycle(self, O0_cpg_one_cycle, O1_cpg_one_cycle, target_length):
        
        if target_length is None:
            print('please input target length')
            return 

        self.target_length = target_length

        self.K = np.zeros((self.nc, self.target_length))
        self.W = np.zeros((self.nc))
        self.M = np.zeros((self.target_length))

        self.ci = np.linspace(0, O0_cpg_one_cycle.shape[0]-1, num = self.nc, dtype = int) # self.ci is the indices of raw data center at each sampling point
        self.cx = O0_cpg_one_cycle[self.ci]
        self.cy = O1_cpg_one_cycle[self.ci]


        b = np.zeros((self.target_length, 1))
        for i in range(self.nc):
            b = np.exp(-(np.power((O0_cpg_one_cycle - self.cx[i]), 2) + np.power((O1_cpg_one_cycle - self.cy[i]), 2)) / self.variance_gaussian) # b is a normalized gaussian distribution
            self.K[i, :] = b.transpose()
        return self.K
    
    def imitate_path_by_learning(self, target_traj, max_learning_iteration = 500, learning_rate = 0.05):
        ''' learning rate 0.25 is too high '''

        self.learning_iteration = max_learning_iteration
        self.learning_rate = learning_rate

        # self.M_stack = [] # Comment to see the evolution across the learning serveral paths
        # self.error_stack, error_max_stack = [] # Comment to see the evolution across the learning serveral paths
        for i in range(self.learning_iteration):
            self.M = np.matmul(self.W, self.K)           
            
            self.error = (target_traj[self.ci] - self.M[self.ci])
            
            self.W = self.W + learning_rate * self.error   # [where is the point of has much error --> adjust weight]

            # Investigate path learning evolution
            self.M_stack.append(self.M)
            self.W_stack.append(self.W)


            self.error_max = np.mean(abs(self.error))  # Get the maximum absolute error --> [should change to average error along path]  --> error 10% of first iteration is defined as converge.
            self.error_max_stack.append(self.error_max)
            self.error_stack.append(self.error)
        return self.M  
    
    def adapt_imitate_path(self, qd_old, q_old, max_learning_iteration = 10, learning_rate = 0.05):
        '''
        qd_old :    old desired (traj.) gait cycle (length 10000 samples)
        q_old  :    old actual (traj.) gait cycle (length 10000 samples)
        '''

        self.learning_iteration = max_learning_iteration
        self.learning_rate_2 = learning_rate

        # if qd_old != self.target_length and q_old != self.target_length:
        #     print('length is not match, please check the length of traj.')
        #     return
        
    
        # # Solution 1
        # for i in range(self.learning_iteration):
        #     self.M = np.matmul(self.W, self.K)           
        #     self.error = (qd_old[self.ci] - self.M[self.ci] + q_old[self.ci] )
        #     self.W = self.W + learning_rate * self.error  
            
        # Solution 2
        self.error_new_traj = qd_old[self.ci] - q_old[self.ci]
        self.W = self.W + self.learning_rate_2 * self.error_new_traj  
        self.M = np.matmul(self.W, self.K)
            
        return self.M 


    def calualate_weight_mul_kernel(self):
        return np.matmul(self.W, self.K)
    def update_rbf_weigt(self, new_weight):
        self.W = new_weight
    def get_imitated_path(self):
        return self.M


    def get_rbf_weight(self):
        return self.W
    def get_evolution_learning_path(self):
        return np.asarray(self.M_stack)
    def get_error_while_learning(self):
        return np.asarray(self.error_stack)
    def get_error_max_while_learning(self):
        return np.asarray(self.error_max_stack)
    def get_weight_while_learning(self):
        return np.asarray(self.W_stack)

    
    # Replay
    def regenerate_target_traj(self, O0_cpg_t, O1_cpg_t, weight):
        '''
        reconstruct_kernels_with_cpg on time step
        '''
        self.K = np.zeros((self.nc,1))
        for i in range(self.nc):
            b = np.exp(-(np.power((O0_cpg_t - self.cx[i]), 2) + np.power((O1_cpg_t - self.cy[i]), 2)) / self.variance_gaussian) # b is a normalized gaussian distribution
            self.K[i] = b

        self.M = np.matmul(weight, self.K)     
        return self.M[0]    


In [4]:
from scipy.interpolate import interp1d
# resample target
def resample_cycle(cycle, num_points):
    """Resamples a gait cycle to have a fixed number of points using interpolation."""
    original_indices = np.linspace(0, 1, len(cycle))  # original indices based on the length of the cycle
    new_indices = np.linspace(0, 1, num_points)  # new indices to match the desired number of points
    interpolation_function = interp1d(original_indices, cycle, kind='linear')  # interpolate the data
    return interpolation_function(new_indices)  # return the resampled cycle

### Stick insect pattern

In [5]:
leg_0_df = pd.read_csv('include/stick_insect_path/rbfn_joint_targets_0.csv')
leg_1_df = pd.read_csv('include/stick_insect_path/rbfn_joint_targets_1_v1.csv')
leg_2_df = pd.read_csv('include/stick_insect_path/rbfn_joint_targets_2_new_v1.csv')

fig = go.Figure()
fig = make_subplots(rows=3, cols=3, shared_xaxes=False)
fig.update_layout(height=800, width=1100, title="Path plot")
fig.update_layout(margin=dict(l=10, r=10, t=40, b=10))  # Reduce margins

fig.add_trace(go.Scatter( y=leg_0_df['TR0'], mode='lines', name=f'TR0'), row=1, col=1)
fig.add_trace(go.Scatter( y=leg_0_df['CR0'], mode='lines', name=f'CR0'), row=2, col=1)
fig.add_trace(go.Scatter( y=leg_0_df['FR0'], mode='lines', name=f'FR0'), row=3, col=1)

fig.add_trace(go.Scatter( y=leg_1_df['TR1'], mode='lines', name=f'TR1'), row=1, col=2)
fig.add_trace(go.Scatter( y=leg_1_df['CR1'], mode='lines', name=f'CR1'), row=2, col=2)
fig.add_trace(go.Scatter( y=leg_1_df['FR1'], mode='lines', name=f'FR1'), row=3, col=2)

fig.add_trace(go.Scatter( y=leg_2_df['TR2'], mode='lines', name=f'TR2'), row=1, col=3)
fig.add_trace(go.Scatter( y=leg_2_df['CR2'], mode='lines', name=f'CR2'), row=2, col=3)
fig.add_trace(go.Scatter( y=leg_2_df['FR2'], mode='lines', name=f'FR2'), row=3, col=3)

In [None]:
target_1 = resample_cycle(np.array(leg_0_df['TR0']), 10000)     
target_1_length = len(target_1)

cpg_one_cycle = np.load('include/cpg_rbf/cpg_one_cycle.npz')

# 90 percent of CPG cycle
out0_cpg_one_cycle = resample_cycle(cpg_one_cycle['O0'][:-1000], target_1_length)    
out1_cpg_one_cycle = resample_cycle(cpg_one_cycle['O1'][:-1000], target_1_length) 
# 100 percent of CPG cycle
out0_cpg_one_full_cycle = resample_cycle(cpg_one_cycle['O0'], target_1_length)    
out1_cpg_one_full_cycle = resample_cycle(cpg_one_cycle['O1'], target_1_length) 



rbf = RBF()
cpg_kernel = rbf.construct_kernels_with_cpg_one_cycle(out0_cpg_one_full_cycle, out0_cpg_one_full_cycle, target_1_length)
imitated_path_offline_1     = rbf.imitate_path_by_learning(target_1)
imitated_weight_1         = rbf.get_weight_while_learning()








fig = go.Figure()
fig = make_subplots(rows=3, cols=1, shared_xaxes=False)
fig.update_layout(height=600, width=1100, title="CPG plot")
fig.update_layout(margin=dict(l=10, r=10, t=40, b=10))  # Reduce margins

fig.add_trace(go.Scatter( y=out0_cpg_one_cycle, mode='lines', name=f'cpg_output_0'), row=1, col=1)
fig.add_trace(go.Scatter( y=out1_cpg_one_cycle, mode='lines', name=f'cpg_output_1'), row=1, col=1)
fig.add_trace(go.Scatter( y=out0_cpg_one_full_cycle, mode='lines', name=f'out0_cpg_one_full_cycle'), row=2, col=1)
fig.add_trace(go.Scatter( y=out1_cpg_one_full_cycle, mode='lines', name=f'out1_cpg_one_full_cycle'), row=2, col=1)

fig.add_trace(go.Scatter( y=imitated_path_offline_1, mode='lines', name=f'imitated_path_offline_1'), row=3, col=1)
fig.add_trace(go.Scatter( y=target_1, mode='lines', name=f'target_1'), row=3, col=1)

12661


## Test

In [7]:
# cpg = CPG_AFDC()
# cpg_output_1 = []
# cpg_output_2 = []
# imitated_path_online_1 = []
# for i in range(5000):
#     cpg.update_cpg_so2(0.010)

#     cpg_output_1.append(cpg.out0_t)
#     cpg_output_2.append(cpg.out1_t)
#     imitated_path_online_1.append(rbf.regenerate_target_traj(cpg.out0_t, cpg.out1_t, imitated_weight_1)[0])

# print(imitated_path_online_1)
# fig = go.Figure()
# fig = make_subplots(rows=2, cols=1, shared_xaxes=False)
# fig.update_layout(height=400, width=1100, title="CPG plot")
# fig.update_layout(margin=dict(l=10, r=10, t=40, b=10))  # Reduce margins

# fig.add_trace(go.Scatter( y=cpg_output_1, mode='lines', name=f'cpg_output_1'), row=1, col=1)
# fig.add_trace(go.Scatter( y=cpg_output_2, mode='lines', name=f'cpg_output_2'), row=1, col=1)
# fig.add_trace(go.Scatter( y=imitated_path_online_1, mode='lines', name=f'imitated_path_online_1'), row=2, col=1)
