# Pipelines

The Pipeline class allows us to combine multiple optimization processes into a single process. For example, we could do a coarse grid search to obtain an initial signal, then shrink the bounds to enclose the signal and sample with higher resolution, then finally fit all observed data to a model which predicts the location of the maximum. First, we define a cost function to optimize:

In [5]:
import numpy as np
def cost(state, params):
    return -np.exp(-(state['X']-params['x0'])**2)*np.exp(-(state['Y']-params['y0'])**2)
params = {'sigma_x': 0.3, 'sigma_y': 0.8, 'x0': 0.3, 'y0': 0.6, 'noise':0.0}

def holder_table(state, params):
    return - np.abs(np.sin(x))

The cost function should have two arguments. The "state" is a dictionary containing the independent variables, and the "params" is a dictionary containing parameters defining the cost evaluation - in this case, 'x0' and 'y0' define the center position of a multivariate Gaussian. The pipeline will minimize the cost function, so make sure you set the sign of the return value appropriately.

Now let's define our initial state and the bounds:

In [6]:
state = {'X': 0, 'Y': 2}
bounds = {'X': {'min': -3, 'max': 3}, 'Y': {'min': -3, 'max': 3}}

Now we'll instantiate the pipeline object:

In [None]:
from emergent.pipeline import Pipeline
pipe = Pipeline(cost, params, state, bounds)


We can add blocks to the Pipeline to define the optimization process we want to apply. Let's start with a simple 10x10 grid search:

In [None]:
from emergent.pipeline import GridSearch
pipe.add(GridSearch({'Steps': 10}))

Once we've added all of the blocks we want, we can run the pipeline:

In [None]:
import logging as log
log.basicConfig(level=log.INFO)
points, costs = pipe.run()

This basic grid search came within 1% of the true minimum with only 102 function evaluations (once when initializing the pipeline, 100 times during optimization, and once more to check the final result). However, as the dimensionality of the parameter space grows, the number of iterations required grows exponentially! A coarse 10-point-per-axis grid search in six dimensions would take a million iterations to complete!

For optimization in higher dimensions, a number of other algorithms are included. I will demonstrate these in the following examples.

# Gradient-based methods
The grid search is inefficient because all of the sampled points are chosen before starting the optimization. In many cases, the minimum can be found more efficiently by choosing each subsequent point based on knowledge of the parameter space. The simplest way to do this is to measure the local gradient, take a step downhill, and repeat. This is implemented by the GradientDescent algorithm - let's try it out. We'll rebuild the pipeline from scratch, but make sure you've run the above blocks in the current kernel to define the cost function, initial state, and bounds.

In [7]:
from emergent.pipeline import Pipeline, GradientDescent
import logging as log
log.basicConfig(level=log.INFO)
pipe = Pipeline(cost, params, state, bounds)
pipe.add(GradientDescent())
points, costs = pipe.run()

INFO:root:Optimization complete!
INFO:root:Time: 0s
INFO:root:Evaluations: 51
INFO:root:Improvement: 570.1%


The GradientDescent comes about as close to the local minimum in half the experimental iterations of the GridSearch method, and the relative performance should increase in higher dimensional parameter spaces.

The L-BFGS-B algorithm is another option, tending to perform excellently in low dimensional spaces:

In [8]:
from emergent.pipeline import Pipeline, LBFGSB
import logging as log
log.basicConfig(level=log.INFO)
pipe = Pipeline(cost, params, state, bounds)
pipe.add(LBFGSB())
points, costs = pipe.run()

INFO:root:Optimization complete!
INFO:root:Time: 0s
INFO:root:Evaluations: 31
INFO:root:Improvement: 676.8%


Lastly, the Adam optimizer incorporates momentum to more efficiently navigate valleys and saddle points. 

In [9]:
from emergent.pipeline import Pipeline, Adam
import logging as log
log.basicConfig(level=log.INFO)
pipe = Pipeline(cost, params, state, bounds)
pipe.add(Adam())
points, costs = pipe.run()

INFO:root:Optimization complete!
INFO:root:Time: 0s
INFO:root:Evaluations: 100
INFO:root:Improvement: 673.5%


# Stochastic methods
Gradient-based methods can be inefficient for several reasons:
* High-dimensional parameter spaces often have many local minima where the optimizer will get trapped.
* Measurement noise will degrade the gradient accuracy, which can cause the optimizer to move in the wrong direction or get stuck in regions where the noise is larger than the local gradient
* Gradient computations in N dimensions require 2N function evaluations, which can take a while.

An alternative is to use semi-random methods where samples are generated based on knowledge of other samples. For example, the DifferentialEvolution algorithm creates a population of points and carries out generations of "breeding" to select for regions of parameter space where the cost function is low:

In [10]:
from emergent.pipeline import Pipeline, DifferentialEvolution
import logging as log
log.basicConfig(level=log.INFO)
pipe = Pipeline(cost, params, state, bounds)
pipe.add(DifferentialEvolution())
points, costs = pipe.run()

INFO:root:Optimization complete!
INFO:root:Time: 0s
INFO:root:Evaluations: 533
INFO:root:Improvement: 676.8%


The ParticleSwarm optimizer similarly initializes a random set of particles which move through space to seek both their best known position and the best known position of the swarm:

In [11]:
from emergent.pipeline import Pipeline, ParticleSwarm
import logging as log
log.basicConfig(level=log.INFO)
pipe = Pipeline(cost, params, state, bounds)
pipe.add(ParticleSwarm())
points, costs = pipe.run()

INFO:root:Optimization complete!
INFO:root:Time: 0s
INFO:root:Evaluations: 112
INFO:root:Improvement: 323.5%


# Model-based methods
Online optimization is a powerful strategy, especially for expensive, black-box cost functions. The basic idea is to form a closed loop consisting of:
1. Fit a parameter surface to observed data.
2. Numerically optimize the modeled surface to generate the next point.
3. Repeat until convergence.

This can be implemented by importing models and specifying an optimizer to use. Make sure to add a pre-sampling block, like a coarse grid search, so the model has data to train on!

In [17]:
from emergent.pipeline import Pipeline, GridSearch, GaussianProcess, GradientDescent, GaussianModel
import logging as log
log.basicConfig(level=log.INFO)
pipe = Pipeline(cost, params, state, bounds)
pipe.add(GridSearch({'Steps': 5}))
# pipe.add(GridSearch({'Steps': 5}))
for i in range(10):
    pipe.add(GaussianProcess({'Optimizer': GradientDescent()}))
points, costs = pipe.run()

INFO:root:Optimization complete!
INFO:root:Time: 3s
INFO:root:Evaluations: 37
INFO:root:Improvement: 676.0%


In [4]:
pipe.costs

array([-0.77880078, -0.63762815, -0.80856032, -0.90483742, -0.89359735,
       -0.77880078, -0.69593431, -0.8824969 , -0.9875778 , -0.97530991,
       -0.85001609, -0.67032005, -0.85001609, -0.95122942, -0.93941306,
       -0.81873075, -0.56978282, -0.72252735, -0.80856032, -0.79851622,
       -0.69593431, -0.42741493, -0.54199419, -0.60653066, -0.59899621,
       -0.52204578, -0.9875778 , -0.99990927])

# EMERGENT integration

In [None]:
import numpy as np
from emergent.pipeline import (Pipeline, Source, GridSearch, GaussianModel, Rescale, DifferentialEvolution, 
                              ParticleSwarm, GaussianProcess, GradientDescent, Adam, LBFGSB, Prune)
from emergent.utilities.containers import DataDict

''' Define data source '''
hub = network.hubs['hub']
hub.range = DataDict({'thing': {'X': {'min': -2, 'max': 2}, 'Y': {'min': -2, 'max': 2}}})
thing = hub.children['thing']
state = {'thing': {'X': 0.1, 'Y': .9}}
bounds = hub.range.copy()
experiment = hub.gaussian
params = {'sigma_x': 0.3, 'sigma_y': 0.8, 'x0': 0.3, 'y0': 0.6, 'noise':0.0}
source = Source(state, bounds, experiment, params)

''' Declare and run pipeline.  Allowed blocks are:
    Sampling methods: GridSearch, DifferentialEvolution, ParticleSwarm
    Models: GaussianModel
    Other: Rescale '''
pipe = Pipeline(state, network, source=source)

# pipe.add(LBFGSB())
pipe.add(GridSearch(params={'Steps': 10}))
pipe.add(Rescale({'threshold': 0.75}))

pipe.add(ParticleSwarm({'Inertia': 0.3, 'Cognitive acceleration': 1, 'Social acceleration': 1}))
# pipe.add(DifferentialEvolution())

# pipe.add(GridSearch(params={'Steps':10}))
pipe.add(GaussianModel(optimizer = DifferentialEvolution()))




points, costs = pipe.run()
# pipe.plot()
print(points[-1], costs[-1])


# Plotting
After running a pipeline, we can call the pipe.plot() method to generate a plotting widget on the Dashboard:

In [None]:
pipe.plot()



In [None]:
self=pipe
import matplotlib.pyplot as plt
import seaborn
# seaborn.set()
plt.plot(self.costs, '.k')
plt.plot(np.minimum.accumulate(self.costs), '--k')
plt.show()



In [None]:
x = np.linspace(0,1,100)
y = np.linspace(0,1,100)
predict_points = np.transpose(np.meshgrid(x,y)).reshape(-1, 2)
len(predict_points)

In [None]:
network.emit('test', {'name': 'ParticleSwarm'})
network.emit('test', {'name': 'DifferentialEvolution'})



In [None]:
import importlib
module = getattr(importlib.import_module('emergent.pipeline'), 'GridSearch')
module().params['Steps'].value

# Modularization

In [None]:
from emergent.pipeline import Pipeline, GridSearch

''' Declare source - replace soon '''
hub = network.hubs['hub']
bounds = hub.range.copy()
params = {'sigma_x': 0.3, 'sigma_y': 0.8, 'x0': 0.3, 'y0': 0.6, 'noise':0.0}
state = {'thing': {'X': 1, 'Y': 0}}



import numpy as np
def cost(state, params):
    return -np.exp(-(state['thing']['X']-params['x0'])**2)

experiment = hub.gaussian
pipe = Pipeline(experiment, params,state, bounds, network)



pipe.add(GridSearch(params={'Steps': 10}))



In [None]:
pipe.run()

In [None]:
pipe.costs