## Parallelization

By default whole population is evaluation in numpy array.

However, elementwise_evaluation might be necessary.

then paralleliation makes sense.

i) vectorized through numpy

ii) default

iii) threads

iv) dask


### Vectorized through NumPy

### Elementwise and Serialized

In [1]:
import numpy as np
import time
from pymoo.model.problem import Problem


class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=2, n_obj=1, elementwise_evaluation=True, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
        time.sleep(1)
        out["F"] = x.sum()

problem = MyProblem()
X = np.random.random((10,2))

In [2]:
problem.parallelization = None

In [3]:
%time F = problem.evaluate(X)

CPU times: user 3.23 ms, sys: 3.32 ms, total: 6.55 ms
Wall time: 10 s


### Multiple Threads 

Uses multiprocess default in python

default n-1 threads, but can be changed


In [4]:
problem.parallelization = "threads"

In [5]:
%time F = problem.evaluate(X)

CPU times: user 21.3 ms, sys: 28.7 ms, total: 50 ms
Wall time: 2.12 s


### Dask

In [6]:
from dask.distributed import Client, client, LocalCluster
import numpy as np

from pymoo.model.problem import Problem

local = False

if local:
    cluster = LocalCluster(dashboard_address=":9300")
    client = Client(cluster)
else:
    client = Client(address="localhost:9000")
    #client = Client(address="host-94108.dhcp.egr.msu.edu:8786")

    
def task(x):
        return np.linalg.inv(x.reshape((1000,1000))).sum()

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=1000000, n_obj=1, elementwise_evaluation=False, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        jobs = client.map(task, X)
        out["F"] = np.row_stack([job.result() for job in jobs])


X = np.random.random((100, 1000000))
%time F = MyProblem().evaluate(X)



class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=1000000, n_obj=1, elementwise_evaluation=False, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        out["F"] = np.row_stack([task(x) for x in X])
        
%time F = MyProblem().evaluate(X)



OSError: Timed out trying to connect to 'tcp://localhost:9000' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x820e8bdd8>: ConnectionRefusedError: [Errno 61] Connection refused