# **Parallelization (Paralelização)**

Na prática, a paralelização é essencial e pode acelerar significativamente a otimização. Para algoritmos baseados em população, a avaliação de um conjunto de soluções pode ser facilmente paralelizada.

## **Vectorized Matrix Operations (Operações Matriciais Vetorizadas)**

Uma maneira eficiente de paralelizar a avaliação é usando **operações matriciais NumPy**, como no exemplo abaixo:


In [2]:
import numpy as np
from pymoo.core.problem import Problem

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = np.sum(x ** 2, axis=1)

problem = MyProblem()

In [3]:
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)

Threads: 1.2808420658111572


# Starmap Interface

In [4]:
from pymoo.core.problem import ElementwiseProblem

class MyProblem(ElementwiseProblem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
         out["F"] = (x ** 2).sum()


# Threads

In [5]:
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import StarmapParallelization
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize

# Criando um pool de threads
n_threads = 4
pool = ThreadPool(n_threads)
runner = StarmapParallelization(pool.starmap)

# Definindo o problema com paralelização
problem = MyProblem(elementwise_runner=runner)

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)

pool.close()


Threads: 1.3931777477264404


# Processes

In [5]:
import multiprocessing
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize

# Criando um pool de processos
n_processes = 8
pool = multiprocessing.Pool(n_processes)
runner = StarmapParallelization(pool.starmap)

# Definindo o problema com paralelização
problem = MyProblem(elementwise_runner=runner)

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Processes:', res.exec_time)

pool.close()

Processes: 2.198477029800415


# Dask (Distribuição em Cluster)

In [None]:
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize
from pymoo.core.problem import DaskParallelization
from dask.distributed import Client

client = Client()
client.restart()
print("DASK STARTED")

# Criando um runner para paralelização
runner = DaskParallelization(client)

# Definindo o problema
problem = MyProblem(elementwise_runner=runner)

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Dask:', res.exec_time)

client.close()
print("DASK SHUTDOWN")

# Custom Parallelization (Paralelização Personalizada)

In [7]:
from pymoo.core.problem import Problem
from multiprocessing.pool import ThreadPool

pool = ThreadPool(8)

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):

        # Define a função de avaliação
        def my_eval(x):
            return (x ** 2).sum()

        # Prepara os parâmetros para o pool
        params = [[X[k]] for k in range(len(X))]

        # Avalia as soluções em paralelo
        F = pool.starmap(my_eval, params)

        # Armazena os valores calculados
        out["F"] = np.array(F)

problem = MyProblem()
res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Threads:', res.exec_time)
pool.close()

Threads: 1.3053219318389893


# Dask - Custom Parallelization

In [None]:
import numpy as np
from dask.distributed import Client
from pymoo.core.problem import Problem
from pymoo.optimize import minimize

client = Client(processes=False)

class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=False, *args, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        def fun(x):
            return np.sum(x ** 2)

        jobs = [client.submit(fun, x) for x in X]
        out["F"] = np.row_stack([job.result() for job in jobs])

problem = MyProblem()

res = minimize(problem, GA(), termination=("n_gen", 200), seed=1)
print('Dask:', res.exec_time)

client.close()