In [None]:
from matplotlib import pyplot as plt
import numpy as np


def rk4(h, y, inputs, f):
    '''
    用于数值积分的rk4函数。
    args:
        h - 步长
        y - 当前状态量
        inputs - 外界对系统的输入
        f - 常微分或偏微分方程
    return:
        y_new - 新的状态量,即经过h时间之后的状态量
    '''
    k1 = f(y, inputs)
    k2 = f(y + h / 2 * k1, inputs)
    k3 = f(y + h / 2 * k2, inputs)
    k4 = f(y + h * k3, inputs)

    y_new = y + h / 6 * (k1 + 2 * k2 + 2 * k3 + k4)
    return y_new


def sigmoid(x):
    return 2 * (1 / (1 + np.exp(-(np.array(x)))) - 0.5)


reluoffset = .13
def relu(x, reluoffset=reluoffset):  
    return np.maximum(0, np.array(x)-reluoffset)  


def myplot(t, listarray, label, linestyle='-'):
    plt.plot(t,listarray, label=label, linestyle=linestyle)
    plt.legend()


class LowPassFilter:
    """
    current输入当前膜电位,
    history是current历史轨迹,注意历史轨迹初值应该等于当前,
    derivative()是history动力学方程,
    step()返回history,
    update()更新current,
    """
    def __init__(self, current=None, history=None, tau=0.1, dt=1) -> None:
        if current is not None:
            self.current = current
        else:
            assert current is not None, "The current is None."
        if history is None:            
            self.history = current
        else:
            self.history = history
        self.tau = tau
        self.dt = dt
        self.currentnew = self.current
        self.historynew = self.history
    
    def derivative(self, state, inputs=0):
        history = state
        Dhistory = self.tau * (self.current - history)
        return Dhistory

    def step(self, dt, current, inputs=0 ):
        """
        返回新的历史轨迹
        """
        if dt is None:
            dt = self.dt
        state = self.history
        self.current = current
        statenew = rk4(dt, state, inputs, self.derivative)
        self.historynew = statenew
        self.currentnew = self.current
        return statenew

    def update(self):
        self.current = self.currentnew
        self.history = self.historynew    


class Neuron:
    def __init__(self, num=3, I=0, potential=None, trajectory=None, tau=0.1, dt=1) -> None:
        """
        根据输入I,获取神经元激活的兴奋程度potential(兴奋当前1),
        trajectory(兴奋历史2)是potential的历史兴奋程度,
        delta(兴奋增量3)是兴奋当前-兴奋历史(1-2),
        updateDelta()更新delta,
        ,
        """
        self.num = num
        self.I = I
        assert self.num == np.size(I), "The input size is different from the neuron number."
        if potential is None:
            self.potential = self.activation(I)
        else:
            self.potential = potential
        if trajectory is None:
            self.trajectory = self.potential
        self.potentialnew = self.potential
        self.trajectorynew = self.trajectory
        self.delta = self.potential - self.trajectory        
        self.tau = tau
        self.dt = dt
        self.lowpassfilter = LowPassFilter(self.potential, self.trajectory, self.tau, self.dt)
    
    def updateDelta(self):
        self.delta = self.potential - self.trajectory 
        return self.delta
    
    def activation(self, I):
        return sigmoid(I)    
    """
    # def step(self, dt, inputs=0):
    #     if dt is None:
    #         dt = self.dt
    #     state = self.history
    #     statenew = rk4(dt, state, inputs, self.derivative)
    #     self.history = statenew
    #     return statenew
    """
    def step(self, dt, I):
        if dt is None:
            dt = self.dt
        assert self.num == np.size(I), "The input size is different from the neuron number."
        self.potential = self.activation(I)
        self.trajectory = self.potential
        self.trajectorynew = self.lowpassfilter.step(dt=dt, current=self.potential)
        self.potentialnew = self.potential
        return self.trajectorynew

    def update(self):
        self.potential = self.potentialnew
        self.trajectory = self.trajectorynew
        self.updateDelta()
        self.lowpassfilter.update()


class Synapse:
    def __init__(self, preNeuron: Neuron, postNeuron: Neuron, weight=None, reward=0, alpha=1, beta=1, dt=1, tau=1,) -> None:
        self.preNeuron = preNeuron
        self.postNeuron = postNeuron
        self.preNum = self.preNeuron.num
        self.postNum = self.postNeuron.num
        self.preI = self.preNeuron.I
        self.postI = self.postNeuron.I
        self.prePotential = self.preNeuron.potential
        self.prePotentialnew = self.prePotential
        self.preTrajectory = self.prePotential  # self.preNeuron.trajectory
        self.preTrajectorynew = self.preTrajectory
        self.postPotential = self.postNeuron.potential
        self.postPotentialnew = self.postPotential
        self.postTrajectory = self.postPotential  # self.preNeuron.trajectory
        self.postTrajectorynew = self.postTrajectory
        if weight is not None:
            self.weight = weight
        else:
            assert weight is not None, "The weight is None."
        assert self.weight.shape == (self.postNum, self.preNum), "The shape of the weight does not match. "
        self.reward = np.array(reward)
        self.alpha = alpha
        self.beta = beta
        self.dt = dt
        self.tau = tau

    def derivative(self, state, inputs=0):
        w = state
        # Dw = (np.matmul(self.beta * np.tile(self.postTrajectory, self.postNum), (self.alpha * self.preTrajectory.T - w)) * self.reward) / self.tau
        Dw = self.beta * ( self.postTrajectory * self.reward * (self.alpha * self.preTrajectory.T - w))  / self.tau
        return Dw

    def step(self, dt, preI, postI=None, inputs=0):
        self.preI = preI
        if dt is None:
            dt = self.dt
        state = self.weight
        statenew = rk4(dt, state, inputs, self.derivative)
        self.weight = statenew
        self.preTrajectorynew = self.preNeuron.step(dt, self.preI)
        # self.postI = np.matmul(self.weight, self.prePotential)
        if postI is None:
            self.postI = np.matmul(self.weight, self.preNeuron.activation(self.preI))
        else:
            self.postI = postI
        self.postTrajectorynew = self.postNeuron.step(dt, self.postI)
        return statenew

    def update(self):
        self.prePotential = self.preNeuron.potential
        self.preTrajectory = self.preTrajectorynew
        self.postPotential = self.postNeuron.potential
        self.postTrajectory = self.postTrajectorynew
        # self.preNeuron.potential = self.prePotentialnew
        # self.postNeuron.potential = self.postPotential
        self.preNeuron.update()
        self.postNeuron.update()  
        

class SynapseKM(Synapse):
    def __init__(self, preNeuron: Neuron, postNeuron: Neuron, weight=None, reward=0, alpha=1, beta=1, dt=1, tau=1) -> None:
        super().__init__(preNeuron, postNeuron, weight, reward, alpha, beta, dt, tau)
    
    def derivative(self, state, inputs=0):
        return super().derivative(state, inputs)

    def step(self, dt, preI, postI=None, inputs=0):
        return super().step(dt, preI, postI, inputs)
    
    def update(self):
        return super().update()
    

class SynapseKD(Synapse):
    def __init__(self, preNeuron: Neuron, postNeuron: Neuron, weight=None, reward=0, alpha=1, beta=1, dt=1, tau=1) -> None:
        super().__init__(preNeuron, postNeuron, weight, reward, alpha, beta, dt, tau)
        self.delta = self.preNeuron.delta
    
    def derivative(self, state, inputs=0):
        w = state
        delta = self.delta
        # Dw = (np.matmul(self.beta * np.tile(self.postPotential, self.postNum), (self.alpha * delta.T - w)) * self.reward) / self.tau
        Dw = self.beta * (self.postPotential * self.reward * (self.alpha * delta.T - w)) / self.tau
        return Dw

    def step(self, dt, preI, postI=None, inputs=0):
        return super().step(dt, preI, postI, inputs)
    
    def update(self):
        self.preNeuron.trajectory = self.preTrajectorynew
        self.delta = self.preNeuron.updateDelta()
        return super().update()
    
        
class SynapseDK(Synapse):
    def __init__(self, preNeuron: Neuron, postNeuron: Neuron, weight=None, reward=0, alpha=1, beta=1, dt=1, tau=1) -> None:
        super().__init__(preNeuron, postNeuron, weight, reward, alpha, beta, dt, tau)
        self.delta = self.postNeuron.delta

    def derivative(self, state, inputs=0):
        w = state
        delta = self.delta
        # Dw = (np.matmul(self.beta * np.tile(delta, self.postNum), (self.alpha * self.preTrajectory.T - w)) * self.reward) / self.tau
        # print(delta.shape, self.reward.shape, self.preTrajectory.T.shape, w.shape)
        Dw = self.beta * (delta * self.reward * (self.alpha * self.preTrajectory.T - w)) / self.tau
        return Dw
    
    def step(self, dt, preI, postI=None, inputs=0):
        return super().step(dt, preI, postI, inputs)
    
    def update(self):
        self.postNeuron.trajectory = self.postTrajectorynew
        self.delta = self.postNeuron.updateDelta()
        return super().update()        


numKC, numMBON, numDAN = 6, 3, 3
stepNum = 15000*5
start = 0
dt = .02
end = start + stepNum * dt
t = np.linspace(start, end, stepNum)
print(f"start:{start}, end:{end}, dt:{dt}, stepNum:{stepNum}")

In [4]:
from Bee import BeeFoodEnv
Bee = BeeFoodEnv(3)
for i in range(Bee.observation_space.n):
    print(i, Bee.decode(i))

0 (0, 0, False)
1 (1, 0, False)
2 (2, 0, False)
3 (0, 1, False)
4 (1, 1, False)
5 (2, 1, False)
6 (0, 2, False)
7 (1, 2, False)
8 (2, 2, False)
9 (0, 0, True)
10 (1, 0, True)
11 (2, 0, True)
12 (0, 1, True)
13 (1, 1, True)
14 (2, 1, True)
15 (0, 2, True)
16 (1, 2, True)
17 (2, 2, True)


In [None]:
num_locations = 3
env = BeeFoodEnv(num_locations)
num_state, num_action = env.observation_space.n, env.action_space.n
numKC, numMBON, numDAN = num_state, num_action, num_action
stepNum = 15000*5
start = 0
dt = .02
end = start + stepNum * dt
t = np.linspace(start, end, stepNum)
print(f"start:{start}, end:{end}, dt:{dt}, stepNum:{stepNum}")

# 训练 
# 初始权重,初始电流,初始化神经元、权重,初始化奖励
initWeightKM = np.zeros([numMBON,numKC])
# initWeightKD = np.zeros([numDAN,numKC])
# initWeightDK = np.zeros([numKC,numDAN]) 

IKC = np.ones((numKC,1))
IKC[num_locations-1::num_locations] = 0
KC = Neuron(numKC, IKC, dt=dt,tau=0.003)
IMBON = np.array([0,0,1,0,0]).reshape(numMBON,1)
MBON = Neuron(numMBON, IMBON, dt=dt,tau=0.02)
IDAN = np.array([0,0,1,0,0]).reshape(numDAN,1)
DAN = Neuron(numDAN,IDAN,dt=dt,tau=0.02)
skmreward = np.ones((numMBON, 1))
tau = 5
skm = SynapseKM(KC, MBON, initWeightKM, skmreward, dt=dt, tau=tau)
# skd = SynapseKD(KC, DAN, initWeightKD, skdreward, dt=dt, tau=tau)
# sdk = SynapseDK(DAN, KC, initWeightDK, sdkreward, dt=dt, tau=tau)

skmweight, skdweight, sdkweight=[], [], []
KCp, KCt, DANp, DANt, MBONp, MBONt = [], [], [], [], [], []
KCI, DANI, MBONI = [], [], []
skmrewardI, skdrewardI, sdkrewardI = [], [], []
# DANactivateKCI, DANactivateKCItra = [], []
# DANLPF = LowPassFilter(current=IDANactivateKC)
# 0:拽 1:不动 2:右移 3:左移 4:站立
lenterm = end/3
for i in t:
    if i < lenterm:  # 先学右移
        IKC = np.ones((numKC,1))
        IKC[num_locations-1::num_locations] = 0
        # [1,1,0,1,1,0,1,1,0]
        IMBON = np.array([0,0,1,0,0]).reshape(numMBON,1)
    elif lenterm <= i < lenterm * 2:  # 再学拽
        IKC = np.zeros((numKC,1))
        IKC[num_locations-1::num_locations] = 1
        # [1,1,1,1,1,1,0,0,0]
        IMBON = np.array([1,0,0,0,0]).reshape(numMBON,1)
    else:  # 学站立
        IKC = np.zeros((numKC,1))
        IKC[-1] = 1
        # [0,0,0,0,0,0,0,0,1]
        IMBON = np.array([0,0,0,0,1]).reshape(numMBON,1)
    # 
    skmreward = np.ones((numMBON, 1))
    skdreward = np.ones((numDAN,1))
    sdkreward = np.ones((numKC,1))
    # 奖励修正，应该是全1的，为什么呢？因为根据学习律，就算不应该学习的奖励设置为1，也不会学习，此时不应该学习的KC或者MBON为0   
    if lenterm * .6 < i < lenterm:
        IDAN = np.array([0,0,1,0,0]).reshape(numDAN,1)
    elif lenterm * 1.6 < i < lenterm * 2:
        IDAN = np.array([1,0,0,0,0]).reshape(numDAN,1)
    elif lenterm * 2.6 < i < lenterm * 3:
        IDAN = np.array([0,0,0,0,1]).reshape(numDAN,1)
    else:
        skmreward = np.zeros((numMBON, 1))
        # skdreward = np.zeros((numDAN,1))
        # sdkreward = np.zeros((numKC,1))
        IDAN = np.array([0,0,0,0,0]).reshape(numDAN,1)
          

    skm.reward = skmreward

    KMWeight = skm.step(dt,IKC,IMBON)
    skm.update()
    skmweight.append(KMWeight)
