Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added function to optimize ensemble parameters #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

tbekolay
Copy link
Member

@tbekolay tbekolay commented Oct 27, 2016

Migrated from nengo/nengo#871.

I added a small test, which revealed a small bug -- the rng wasn't getting passed to the sample (now get_samples) function. Fixed that, now works and gives deterministic results when rng is set!

Note that this should wait until nengo/nengo#1181 is merged, in case the get_samples functions undergoes any other changes.

It does a simple random search over the ensemble parameter space
(encoders, gains, biases) to find ones that work well to decode
a particular function.
@tbekolay
Copy link
Member Author

tbekolay commented Nov 4, 2016

Here are two other snippets from @studywolf for optimizing ensembles to compute certain functions, migrated from nengo/nengo#440 ... before merging, it would be good to compare these approaches to the one in this PR:

import numpy as np

import nengo

def get_intercepts_for_function(function, num_samples=500, 
                                threshold=5e-3, eval_range=[-1, 1],
                                plot_results=False):

    # generate random set of initial eval_points, encoders, and intercepts
    num_neurons = 10
    eval_points = [np.random.random() for ii in range(num_neurons)] 
    encoders = [np.random.choice([-1,1],1)[0] for ii in range(num_neurons)]
    intercepts = [np.random.random()*encoders[ii] for ii in range(num_neurons)] 

    def build(intercepts, eval_points, values, build_test=False):
        m = nengo.Network()
        with m:
            if values is not None:
                # input function returns time signal
                input1 = nengo.Node(output=values) 
            else:
                input1 = nengo.Node(lambda t: np.sin(t / 4))

            m.ens1 = nengo.Ensemble(num_neurons, 1, 
                                    encoders=np.array(encoders)[:,None],
                                    intercepts=intercepts)
            ens2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            output1 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
            output2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            nengo.Connection(input1, m.ens1)
            nengo.Connection(input1, ens2)
            nengo.Connection(m.ens1, output1, function=fun0, 
                             eval_points=eval_points)
            nengo.Connection(ens2, output2, function=fun0)

            if build_test == True:
                ens3 = nengo.Ensemble(num_neurons, 1)
                output3 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
                nengo.Connection(input1, ens3)
                nengo.Connection(ens3, output3, function=fun0)
                m.probe_output3 = nengo.Probe(output3, synapse=.01)

            m.probe_input1 = nengo.Probe(input1)
            m.probe_output1 = nengo.Probe(output1, synapse=.01)
            m.probe_output2 = nengo.Probe(output2)
        return m


    for ii in range(num_samples):

        m = build(intercepts, eval_points, eval_points[-1])

        sim = nengo.Simulator(m)
        sim.run(.1)

        # randomly add some sample between 0 and 1
        error = np.abs(sim.data[m.probe_output2][-1] - 
                       sim.data[m.probe_output1][-1]) 

        if error > threshold: 
            encoders.append(-1)
            encoders.append(1)

            epsilon = .001
            intercepts.append((eval_points[-1] + epsilon)*-1)
            intercepts.append(eval_points[-1] - epsilon)
            num_neurons += 2

        eval_points.append(np.random.random() * \
                          (eval_range[1] - eval_range[0]) + eval_range[0])

        if ii % 100 == 0:
            print 'ens1.n_neurons=%i'%num_neurons
            print 'num eval_points: ', len(eval_points)

    m = build(intercepts, eval_points, values=None, build_test=True)
    sim = nengo.Simulator(m)
    run_time = 20
    sim.run(run_time)

    print 'error of trained: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output1])) / (run_time / .001)
    print 'error of control: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output3])) / (run_time / .001)

    if plot_results == True:
        import matplotlib.pyplot as plt
        plt.plot(sim.trange(), sim.data[m.probe_input1])
        plt.plot(sim.trange(), sim.data[m.probe_output1])
        plt.plot(sim.trange(), sim.data[m.probe_output2])
        plt.plot(sim.trange(), sim.data[m.probe_output3])
        plt.legend(['input', 'approx', 'answer', 'control'])

        plt.figure()
        from nengo.utils.ensemble import tuning_curves as tc
        eps, a = tc(m.ens1, sim)
        plt.plot(eps, a)
        plt.show()

    return intercepts, eval_points, encoders, num_neurons



################################################################################

if __name__ == '__main__':

    # generate a random target signal to follow
    n1 = np.zeros((10000,), dtype=complex)
    n2 = np.zeros((10000,), dtype=complex)
    n1[:5] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    n2[20:25] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    s1 = np.fft.ifft(n1)*1000
    s2 = np.fft.ifft(n2)*1000
    s2[:5000] = 0
    s2[6000:] = 0
    s = s1 + s2

    def fun0(x):
        return s.real[int(np.floor((x+1)*5000))]

    get_intercepts_for_function(fun0, num_samples=500, plot_results=True)

and

import numpy as np
from scipy.interpolate import interp1d

import nengo

def get_intercepts_for_function_derivative(function, num_samples=500, 
                                           threshold=5e-3, eval_range=[-1, 1],
                                           plot_results=False):

    # evaluate the function, find areas with large derivatives
    num_evals = 1000
    x = np.linspace(eval_range[0], eval_range[1], num_evals)
    vals = np.zeros(num_evals)
    for ii in range(num_evals-1):
        vals[ii] = function(x[ii])
    # get derivatives
    dvals = np.hstack([0, np.diff(vals)])
    ddvals = np.hstack([0, np.diff(dvals)])

    # find the parts that are unusually large, to focus on
    high = np.zeros(num_evals)
    # high[dvals > np.mean(dvals) + np.var(dvals)] = 1
    high[ddvals > np.mean(ddvals) + np.var(ddvals)] = 1
    # convolve with Gaussian to get our probability dist for sampling
    gauss = np.exp(-np.linspace(-1, 1, 10)**2 / 2)
    dist = np.convolve(high, gauss, mode='same')
    # normalize distribution 
    dist /= np.sum(dist)
    # also add in a term to make sure everything has some probability
    dist += np.ones(dist.shape[0]) / dist.shape[0]
    # normalize distribution 
    dist /= np.sum(dist)
    # get cumulative sum, making sure that 0 is the first value
    # which is important for the interpolation below
    cdf = np.hstack([0, np.cumsum(dist)])

    # generate interpolation function
    pdist = interp1d(cdf, np.linspace(-1, 1, cdf.shape[0]))

    import matplotlib.pyplot as plt
    # test pdist
    a = [pdist(np.random.random()) for ii in range(1000)]
    plt.subplot(211)
    plt.plot(a, np.ones(len(a)), 'rx')
    plt.plot(np.linspace(-1, 1, vals.shape[0]), vals)
    # plt.plot(np.linspace(-1, 1, vals.shape[0]), dvals)
    plt.plot(np.linspace(-1, 1, vals.shape[0]), ddvals)
    plt.legend(['samples', 'function', 'function derivative'])
    plt.subplot(212)
    plt.plot(np.linspace(-1, 1, vals.shape[0]), high*max(dist))
    plt.plot(np.linspace(-1, 1, vals.shape[0]), dist)
    plt.legend(['areas with high derivative', 'probability dist'])
    plt.show()

    # generate random set of initial eval_points, encoders, and intercepts
    num_neurons = 10
    eval_points = [np.random.random() for ii in range(num_neurons)] 
    encoders = [np.random.choice([-1,1],1)[0] for ii in range(num_neurons)]
    intercepts = [np.random.random()*encoders[ii] for ii in range(num_neurons)] 

    def build(intercepts, eval_points, values, build_test=False):
        m = nengo.Network()
        with m:
            if values is not None:
                # input function returns time signal
                input1 = nengo.Node(output=values) 
            else:
                input1 = nengo.Node(lambda t: np.sin(t / 4))

            m.ens1 = nengo.Ensemble(num_neurons, 1, 
                                    encoders=np.array(encoders)[:,None],
                                    intercepts=intercepts)
            ens2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            output1 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
            output2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            nengo.Connection(input1, m.ens1)
            nengo.Connection(input1, ens2)
            nengo.Connection(m.ens1, output1, function=fun0, 
                             eval_points=eval_points)
            nengo.Connection(ens2, output2, function=fun0)

            if build_test == True:
                ens3 = nengo.Ensemble(num_neurons, 1)
                output3 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
                nengo.Connection(input1, ens3)
                nengo.Connection(ens3, output3, function=fun0)
                m.probe_output3 = nengo.Probe(output3, synapse=.01)

            m.probe_input1 = nengo.Probe(input1)
            m.probe_output1 = nengo.Probe(output1, synapse=.01)
            m.probe_output2 = nengo.Probe(output2)
        return m


    for ii in range(num_samples):

        m = build(intercepts, eval_points, eval_points[-1])

        sim = nengo.Simulator(m)
        sim.run(.1)

        # randomly add some sample between 0 and 1
        error = np.abs(sim.data[m.probe_output2][-1] - 
                       sim.data[m.probe_output1][-1]) 

        if error > threshold: 
            encoders.append(-1)
            encoders.append(1)

            epsilon = .001
            intercepts.append((eval_points[-1] + epsilon)*-1)
            intercepts.append(eval_points[-1] - epsilon)
            num_neurons += 2

        eval_points.append(pdist(np.random.random()))

        if ii % 100 == 0:
            print 'ens1.n_neurons=%i'%num_neurons
            print 'num eval_points: ', len(eval_points)

    m = build(intercepts, eval_points, values=None, build_test=True)
    sim = nengo.Simulator(m)
    run_time = 20
    sim.run(run_time)

    print 'error of trained: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output1])) / (run_time / .001)
    print 'error of control: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output3])) / (run_time / .001)

    if plot_results == True:
        import matplotlib.pyplot as plt
        plt.plot(sim.trange(), sim.data[m.probe_input1])
        plt.plot(sim.trange(), sim.data[m.probe_output1])
        plt.plot(sim.trange(), sim.data[m.probe_output2])
        plt.plot(sim.trange(), sim.data[m.probe_output3])
        plt.legend(['input', 'approx', 'answer', 'control'])

        plt.figure()
        from nengo.utils.ensemble import tuning_curves as tc
        eps, a = tc(m.ens1, sim)
        plt.plot(eps, a)
        plt.show()

    return intercepts, eval_points, encoders, num_neurons



################################################################################

if __name__ == '__main__':

    # generate a random target signal to follow
    n1 = np.zeros((10000,), dtype=complex)
    n2 = np.zeros((10000,), dtype=complex)
    n1[:5] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    n2[20:25] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    s1 = np.fft.ifft(n1)*1000
    s2 = np.fft.ifft(n2)*1000
    s2[:5000] = 0
    s2[6000:] = 0
    s = s1 + s2

    def fun0(x):
        return s.real[int(np.floor((x+1)*5000))]

    get_intercepts_for_function_derivative(fun0, num_samples=200, plot_results=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants