# Attributes lookup and sympy-based equations

## ANNarchy side

### Core

In [1]:
import numpy as np

class Value(object):
    "Placeholder for single parameters"
    def __init__(self, value, dtype):
        self._init_value = value
        self._instantiated = False
        self._dtype = dtype

    def _copy(self):
        return Value(self._init_value, self._dtype)

    def _instantiate(self, shape):
        self._instantiated = True
        self.shape = 1
        self._value = self._init_value
    
    def get_value(self):
        if not self._instantiated:
            return self._init_value
        return self._value
    
    def set_value(self, val):
        if not self._instantiated:
            self._init_value = val
        else:
            self._value = val

class Array(object):
    "Placeholder for arrays"
    def __init__(self, init, dtype):

        self._init_value = init
        self._dtype = dtype

        self._instantiated = False

    def _copy(self):
        return Array(
            self._init_value, 
            self._dtype
        )

    def _instantiate(self, shape):
        self._instantiated = True
        self._value = np.full(shape, self._init_value, dtype=self._dtype)
        self.shape = self._value.shape
    
    def get_value(self):
        if not self._instantiated:
            return self._init_value
        return self._value
    
    def set_value(self, val):
        if not self._instantiated:
            self._init_value = val
        else:
            if isinstance(val, np.ndarray):
                if val.shape == self.shape:
                    self._value = val
                elif val.shape == (1,):
                    self._value = np.full(self.shape, val[0], dtype=self._dtype)
                else:
                    print("Array assignment error", val, val.shape)
            elif isinstance(val, (float, int, bool)):
                self._value = np.full(self.shape, val)
            else:
                print("Array assignment error", val)

class Neuron(object):

    def Value(self, value, dtype=np.float32):
        "Creates and returns a single value."
        if not hasattr(self, "_data"):
            self._data = []
        val = Value(value, dtype)
        self._data.append(val)
        return val

    def Array(self, init=0.0, dtype=np.float32):
        "Creates and returns an array."
        if not hasattr(self, "_data"):
            self._data = []
        val = Array(init, dtype)
        self._data.append(val)
        return val

class Synapse(object):

    def Value(self, value, dtype=np.float32):
        "Creates and returns a single value."
        if not hasattr(self, "_data"):
            self._data = []
        val = Value(value, dtype)
        self._data.append(val)
        return val

    def Array(self, init=0.0, dtype=np.float32):
        "Creates and returns an array."
        if not hasattr(self, "_data"):
            self._data = []
        val = Array(init, dtype)
        self._data.append(val)
        return val

class Population(object):

    def __init__(self, shape, neuron):

        # Obligatory arguments
        self.shape = shape
        if isinstance(shape, tuple):
            size = 1
            for n in shape:
                size *= n
            self.size = size
        else: # single value
            self.size = int(shape)

        # Neuron type
        self._neuron_type = neuron
        self._spiking = False

        # Internal stuff
        self._attributes = {}
        self._values_list = []
        self._arrays_list = []

    def _analyse(self):

        # List attributes
        current_attributes = list(self._neuron_type.__dict__.keys())

        for attr in current_attributes:
            if isinstance(getattr(self._neuron_type, attr), (Value, )):
                self._values_list.append(attr)
                self._attributes[attr] = getattr(self._neuron_type, attr)._copy()
                self._attributes[attr]._instantiate(self.shape)
            if isinstance(getattr(self._neuron_type, attr), (Array, )):
                self._arrays_list.append(attr)
                self._attributes[attr] = getattr(self._neuron_type, attr)._copy()
                self._attributes[attr]._instantiate(self.shape)

        # Get lists of values and arrays
        self.attributes = list(self._attributes.keys())

        # Set the attributes to the neuron
        self._neuron_type.attributes = self.attributes
        self._neuron_type._values_list = self._values_list
        self._neuron_type._arrays_list = self._arrays_list

        # Analyse update()
        self._update_equations =  self._neuron_type.update().equations

        # Analyse spike()
        if 'spike' in [f for f in dir(self._neuron_type) if callable(getattr(self._neuron_type, f))]:
            self._spiking = True
            self._spiking_equation = self._neuron_type.spike()
            self._reset_equations =  self._neuron_type.reset().equations


    def __getattribute__(self, name):
        if name in ['attributes']:
            return object.__getattribute__(self, name)
        else:
            if hasattr(self, 'attributes') and name in self.attributes:
                return self._attributes[name].get_value()
        return object.__getattribute__(self, name)

    def __setattr__(self, name, value):

        if hasattr(self, 'attributes') and name in self.attributes:
            self._attributes[name].set_value(value)
        else:
            object.__setattr__(self, name, value)




In [2]:
class Network(object):

    def __init__(self):

        self.populations = []

    def add(self, shape, neuron):
        
        # Create the population
        pop = Population(shape, neuron)

        # Have the population analyse its attributes
        pop._analyse()

        # Store the population
        self.populations.append(pop)

        return pop



### Parser

In [4]:
import sympy as sp

class Equations(object):

    def __init__(self, neuron=None, symbols=None):

        self.neuron = neuron
        self.symbols = {'t': sp.Symbol("t")}
        if self.neuron is None:
            self._custom_symbols = symbols
        self._equations_list = []
        self._started = False
    
    def __enter__(self):

        if self.neuron is not None:

            for attr in self.neuron.attributes:
                if attr in self.neuron._values_list:
                    # Symbol
                    symbol = sp.Symbol("%(pop_prefix)s"+attr+"%(pop_suffix_value)s")
                    self.symbols[attr] = symbol
                    setattr(self, attr, symbol)

                elif attr in self.neuron._arrays_list:
                    # Symbol
                    symbol = sp.Symbol("%(pop_prefix)s"+attr+"%(pop_suffix_array)s")
                    self.symbols[attr] = symbol
                    setattr(self, attr, symbol)

                    # Derivative if needed
                    symbol = sp.Symbol("__grad__" + attr)
                    self.symbols["d"+attr+"_dt"] = symbol
                    setattr(self, "d"+attr+"_dt", symbol)
                    
                else:
                    print("Error")

        else: # Custom set of variables
            for attr in self._custom_symbols:
                # Symbol
                symbol = sp.Symbol(attr)
                self.symbols[attr] = symbol
                setattr(self, attr, symbol)

                # Derivative if needed
                symbol = sp.Symbol("d" + attr + "/dt")
                self.symbols["d"+attr+"_dt"] = symbol
                setattr(self, "d"+attr+"_dt", symbol)



        self._started = True

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        "Would be done later by the generator"

        code = ""
        for var, eq in self._equations_list:
            code += sp.ccode(self.symbols[var]) + " = " + sp.ccode(eq) + ";\n"
            if var.endswith("_dt"):
                orig_var = var[1:-3]
                code += "%(pop_prefix)s"+orig_var+"%(pop_suffix_array)s += dt*(" + sp.ccode(self.symbols[var]) + ");\n"

        self.equations = code

    def __str__(self):
        string = ""
        for var, eq in self._equations_list:
            string += sp.ccode(self.symbols[var]) + " = " + sp.ccode(eq) + "\n"
        return string

    def __getattribute__(self, name):

        return object.__getattribute__(self, name)

    def __setattr__(self, name, value):

        # After __enter__(), track modifications to the variables
        if hasattr(self, '_started') and self._started:
            # Do not assign equations to symbols, just store them
            if name in self.symbols.keys():
                self._equations_list.append((name, value))
            else:
                object.__setattr__(self, name, value)
        else:
            object.__setattr__(self, name, value)

    def ite(self, cond, then, els):

        return sp.Piecewise((then, cond), (els, True))


## User side

In [5]:
class RateCoded(Neuron):
    """
    Simple rate-coded neuron.

    $$\tau \, \dfrac{d v(t)}{dt} = ge(t) + ite(ge(t) > 1, ge(t), 0) + v^2(t) - v(t))/\tau$$

    $$r(t) = tanh(v(t))$$
    """

    def __init__(self, tau):

        self.tau = self.Value(tau)

        self.ge = self.Array(init=0.0)
        self.v = self.Array(init=0.0)
        self.r = self.Array(init=0.0)

    def update(self):

        # eq will contain all variables of the model as sympy symbols, plus some operations (ite = if/then/else)
        with Equations(self) as n:

            # One can declare intermediary variables that won't be allocated in memory!
            shunting = n.ite(n.ge > 1, n.ge, 0)
            
            # ODEs use the dX_dt trick
            n.dv_dt = (n.ge + shunting + sp.exp(n.v**2) - n.v) / n.tau
            
            # Sympy function can be used directly
            n.r = sp.tanh(n.v)

        return n



In [6]:
net = Network()
pop = net.add(10, RateCoded(tau=20.))

print("Attributes:", pop.attributes)

print("Tau:", pop.tau)
print("v = 0:", pop.v)

pop.v = 1.

print("v = 1:", pop.v)

pop.v *= 5.

print("v = 5:", pop.v)

pop.v[:3] = 1.

print("v[:3] = 1:", pop.v)


print()
print("Generated C code:")
print(pop._update_equations  %{'pop_prefix': "", "pop_suffix_value": "", "pop_suffix_array": "[i]"})

Attributes: ['tau', 'ge', 'v', 'r']
Tau: 20.0
v = 0: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
v = 1: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
v = 5: [5. 5. 5. 5. 5. 5. 5. 5. 5. 5.]
v[:3] = 1: [1. 1. 1. 5. 5. 5. 5. 5. 5. 5.]

Generated C code:
__grad__v = (ge[i] - v[i] + ((ge[i] > 1) ? (
   ge[i]
)
: (
   0
)) + exp(pow(v[i], 2)))/tau;
v[i] += dt*(__grad__v);
r[i] = tanh(v[i]);



In [7]:
class LIF(Neuron):

    def __init__(self, params):

        self.tau = self.Value(params['tau'])
        self.V_th = self.Value(params['V_th'])

        self.ge = self.Array(init=0.0)
        self.v = self.Array(init=0.0)

    def update(self):

        with Equations(self) as n:

            n.dv_dt = (n.ge - n.v) / n.tau

        return n

    def spike(self):

        with Equations(self) as n:

            return n.v >= n.V_th

    def reset(self):

        with Equations(self) as n:

            n.v = 0

        return n


In [8]:
net = Network()
pop = net.add(100, LIF({'tau': 20., 'V_th': 1.0}))

print("Neural update:")
print(pop._update_equations  %{'pop_prefix': "", "pop_suffix_value": "", "pop_suffix_array": "[i]"})

print()
print("Spiking condition:")
print(sp.ccode(pop._spiking_equation)  %{'pop_prefix': "", "pop_suffix_value": "", "pop_suffix_array": "[i]"})

print()
print("Reset:")
print(pop._reset_equations  %{'pop_prefix': "", "pop_suffix_value": "", "pop_suffix_array": "[i]"})

Neural update:
__grad__v = (ge[i] - v[i])/tau;
v[i] += dt*(__grad__v);


Spiking condition:
v[i] >= V_th

Reset:
v[i] = 0;



One can use the Equations() in a standalone mode to check correctness:

In [10]:
with Equations(symbols=['v', 'r', 'tau']) as n:
    n.dv_dt = (1 - n.v)/n.tau
    n.r = n.ite(n.v > 0, n.v, 0)

print(n)

dv/dt = (1 - v)/tau
r = ((v > 0) ? (
   v
)
: (
   0
))



Sanity check whether it works in multiprocessing:

In [20]:
import multiprocessing

def worker(val):

    with Equations(symbols=['v', 'r', 'tau']) as n:
        n.dv_dt = (val - n.v)/n.tau

    return n

jobs = []
vals = np.linspace(0.0, 1.0, 5)

with multiprocessing.Pool() as pool:
    res = pool.map(worker, vals)
    
    for w in res:
        print(w)
        print("----")

dv/dt = -v/tau

----
dv/dt = (0.25 - v)/tau

----
dv/dt = (0.5 - v)/tau

----
dv/dt = (0.75 - v)/tau

----
dv/dt = (1.0 - v)/tau

----


In [None]:
class RateCodedSynapse(Synapse):

    def __init__(self):

        self.w = self.Weight()

    def transmit(self):

        with Equations(self) as s:

            s.post.target += s.w * s.pre.r

        return s

class SpikingSynapse(Synapse):

    def __init__(self):

        self.w = self.Weight()


    def transmit(self):

        self.transmit_on_spike = True # default when pre is a spiking population
        # self.transmit_on_spike = False # continuous integration for NMDA synapse


        with Equations(self) as s:

            s.post.target += s.w

        return s

In [None]:
class TraceSynapse(RateCodedSynapse):

    def __init__(self, tau, eta):

        self.tau = self.Value(tau)

        self.eta = self.Value(eta)

        self.w = self.Weight()

        self.trace = self.Array(init=0.0)

    def update(self):

        with Equations(self, method="euler") as s:

            s.dtrace_dt = (s.pre.r - s.trace) / s.tau

        return s

    def apply(self):

        with Equations(self) as s:

            s.w += s.eta * s.trace * s.post.r * s.post.R

        return s



proj = net.connect(pop, pop, 'ge', TraceSynapse)
#...
net.simulate(100.)
if success:
    pop.R = 1.0
else:
    pop.R = 0.0
proj.apply()




In [21]:
class STDP(SpikingSynapse):

    def __init__(self, tau):

        self.tau = self.Value(tau)

        self.w = self.Weight()

        self.x = self.Array(init=0.0)
        self.y = self.Array(init=0.0)

    def update(self):

        with Equations(self, method=Euler) as s:

            s.dx_dt = (- s.x) / s.tau
            s.dy_dt = (- s.y) / s.tau

        return s

    def transmit(self):

        with Equations(self) as s:

            s.post.target += s.w

        return s
        
    def on_pre(self):

        with Equations(self) as s:

            s.x += 1.0
            s.w -= s.y

        return s

    def on_post(self):

        with Equations(self) as s:

            s.y += 1.0
            s.w += s.x

        return s


NameError: name 'Synapse' is not defined