# Parallelization

In practice parallelization is important and can significantly speed up the optimization. 
For population-based algorithms the evaluation of a set of solutions can be parallelized easily 
by parallelization the evaluation itself.

# Vectorized Matrix Operations

One way is using the `NumPy` matrix operations which has been used for almost all test problems implemented in *pymoo*.
By default `elementwise_evaluation` is set to `False` which implies the `_evaluate` retrieves a set of solutions.
Thus, `x` is a matrix where each row is an individual and each column a variable.

In [4]:
import numpy as np

from pymoo.model.problem import Problem

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = np.sum(x ** 2, axis=1) 

The `axis=1` operation parallelizes the sum of the matrix directly using an efficient numpy operation.

In [5]:
from pymoo.algorithms.so_genetic_algorithm import GA
from pymoo.optimize import minimize

res = minimize(MyProblem(), GA())
print('Threads:', res.exec_time)

Threads: 0.899420976638794


## Starmap Interface

In general **pymoo** allows to pass a `starmap` object to be used for parallelization. 
The `starmap` interface is defined in the Python standard library `multiprocessing.Pool.starmap` [function](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#multiprocessing.pool.Pool.starmap).
This allows great and flexible parallelization opportunities. 

**IMPORTANT:** Please note that the problem needs to have set `elementwise_evaluation=True` which implicates one call of `_evaluate` only takes care of a single solution.


In [6]:
class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, 
                         elementwise_evaluation=True, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = (x ** 2).sum()

Then, we can pass a `starmap` object to be used for parallelization.

### Threads

In [7]:
from multiprocessing.pool import ThreadPool

# the number of threads to be used
n_threads = 8

# initialize the pool
pool = ThreadPool(n_threads)

# define the problem by passing the starmap interface of the thread pool
problem = MyProblem(parallelization = ('starmap', pool.starmap))

In [8]:
from pymoo.algorithms.so_genetic_algorithm import GA
from pymoo.optimize import minimize

res = minimize(problem, GA())
print('Threads:', res.exec_time)

Threads: 1.0480430126190186


In [9]:
pool.close()

### Processes

In [10]:
import multiprocessing

# the number of processes to be used
n_proccess = 8
pool = multiprocessing.Pool(n_proccess)
problem = MyProblem(parallelization = ('starmap', pool.starmap))

In [11]:
res = minimize(problem, GA())
print('Processes:', res.exec_time)

Processes: 11.800267219543457


In [12]:
pool.close()

**Note:** Here clearly the overhead of serializing and transfer the data are visible.

## Dask

More advanced is to distribute the evaluation function to a couple of workers. There exists a couple of framework that support the distribution of code. For our framework, we recommend using [Dask](https://dask.org).

A documentation to setup the cluster is available [here](https://docs.dask.org/en/latest/setup/cli.html). Basically, you first start a scheduler somewhere and then connect workers to it. Then, a client object connects to the scheduler and distributes the jobs automatically for you.

In [13]:
from dask.distributed import Client
client = Client()

In [14]:
import numpy as np
from pymoo.model.problem import Problem

# define the evaluation function that directly returns the objective and/or
# constraint values. Please note it must be elementwise_evaluation.
def fun(x):
    return {
        "F": np.sum(x ** 2)
    }

# define a problem without any evaluation function - everything is done by parallelization
class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, 
                         elementwise_evaluation=True, *args, **kwargs)

# create the problem and set the parallelization to dask
problem = MyProblem(parallelization=("dask", client, fun))

In [15]:
res = minimize(problem, GA())
print('Dask:', res.exec_time)

Dask: 54.45502281188965


In [16]:
client.close()

**Note:** Here the overhead of transferring data to the workers of Dask is dominating. However, if your problem is computationally more expensive this shall not be the case anymore.

## Custom Parallelization

If you need more control over the parallelization process, we like to provide an example of implementation a custom parallelization.
Since by default `elementwise_evaluation` is disabled the `_evaluate` gets the whole set of solutions to be evaluated.

### Threads

Thus, a thread pool can be initialized in the constructor of the `Problem` class and then be used to speed up the evaluation.
The code below basically does what internally happens using the `starmap` interface of *pymoo* directly (with an inline function definition and without some overhead this is why it is actually slightly faster).

In [17]:
from pymoo.model.problem import Problem

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)
        self.pool = ThreadPool(8)

    def _evaluate(self, X, out, *args, **kwargs):
        
        # define the function
        def my_eval(x):
            return (x ** 2).sum()
            
        # prepare the parameters for the pool
        params = [[X[k]] for k in range(len(X))]

        # calculate the function values in a parallelized manner and wait until done
        F = self.pool.starmap(my_eval, params)
        
        # store the function values and return them.
        out["F"] = np.array(F)
        
problem = MyProblem()       

In [18]:
res = minimize(problem, GA())
print('Threads:', res.exec_time)

Threads: 1.0474400520324707


In [19]:
problem.pool.close()

### Dask

In [20]:
import numpy as np
from dask.distributed import Client

from pymoo.algorithms.so_genetic_algorithm import GA
from pymoo.model.problem import Problem
from pymoo.optimize import minimize


class MyProblem(Problem):

    def __init__(self, client, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=False, *args, **kwargs)
        self.client = client

    def _evaluate(self, X, out, *args, **kwargs):
        def fun(x):
            return np.sum(x ** 2)

        jobs = [self.client.submit(fun, x) for x in X]
        out["F"] = np.row_stack([job.result() for job in jobs])


In [21]:
client = Client(processes=False)

problem = MyProblem(client)

res = minimize(problem, GA())
print('Dask:', res.exec_time)

client.close()

Dask: 26.734183311462402
