# Reimplementación - Multiprocessing

El propósito de este _notebook_ es mostrar el procedimiento para hacer una re-implementación de la clase `colony`, a su versión utilizando multiprocesamiento con la librería `mutiprocessing` vista en clase y explicada en el [capítulo V del libro](https://itam-ds.github.io/analisis-numerico-computo-cientifico/V.optimizacion_de_codigo/5.4/Computo_en_paralelo_usando_CPUS_en_SMC.html#multiprocessing) de optimización y cómputo en paralelo. 

En particular, del resultado de los ejercicios de perfilamiento, se tiene que el ḿetodo `.solve_tsp` es el que lleva más tiempo de computo. Dado lo anterior, buscamos implementar la siguiente solución:

- Enviar un número determinado de hormigas a un _pool_ de workers, donde en cada uno de llos cada hormiga recorrerá el grafo buscando una solución en cada iteración del algoritmo. 
- Una vez completado ese proceso, en cada iteracíon se actualiza el nviel de feromonas del grafo de acuerdo a los recorridos que llevó a cabo cada hormiga. 


A continuación se presentan las fases implementadas para lograr dicho resultado. Este Trabajaremos con 3
0 ciudades de China sacadas aleatoriamente del conjunto de la base de datos _National Traveling Salesman Problems_ de la Universidad de Waterloo, disponible [aquí](https://www.math.uwaterloo.ca/tsp/world/countries.html).


**Nota importante:
Este _notebook_ se ejecuta desde `practica-1-segunda-parte-ltejadal/src/ant_colony` del [repo](https://github.com/optimizacion-2-2021-1-gh-classroom/practica-1-segunda-parte-ltejadal) donde está implementada la librería, y se busca que los métodos y funciones presentados formen parte de `ACO-TSP`.**

In [4]:
import random
import time
import tsplib95
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from utils import *
from aco_tsp_oo import *
from multiprocessing import cpu_count
from multiprocessing import Pool

In [3]:
path_china = '../../datasets/ch71009.tsp'
G = read_coord_data(path_china, n_cities=30, seed=1999)

Problem with 71009 cities. Selected 30.


## Separación por _workers_

Inicialmente, buscamos ver la forma de distibuir los trabajos por _worker_. Se implementó la función `assign_ants_threats`, la cual busca definir qúe número de hormigas asigna a cada _worker_.

En particular, se busca hacer una distribución equitativa de hormigas entre _workers_, sin embargo, si el número de hormigas seleccionado no es múltiplo del número de _workers_, entonces se asigna el módulo restante de forma equitativa entre algunos de los _workers_.

Por defecto se selecciona el número de _workers_ como el número de CPUs disponibles, pero este parámetro puede ser controlado por el usuario con `n_cpu`.

In [3]:
def assign_ants_threats(n_ants, n_cpu=cpu_count()):
    mod = n_ants % n_cpu
    no_mod = n_ants - mod
    N_all = int(no_mod/n_cpu) # every thread will have at least N_all ants
    
    # list of ants for every thread
    if n_ants < n_cpu:
        ants_per_threat = [[N_all] for t in range(n_ants)]
    else:
        ants_per_threat = [[N_all] for t in range(n_cpu)]

    # Assign module
    for i in range(mod):
        ants_per_threat[i][0] += 1
        
    return ants_per_threat


In [4]:
## ejemplo
print(f'- Esta maquina tiene {cpu_count()} cpu (s).')
n_ants = 32
n_cpu = 6
print(f'- Se seleccionaron {n_cpu} workers en el pool y {n_ants} hormigas.')
ants_per_wk = assign_ants_threats(n_ants, n_cpu=cpu_count())
print(f'- La asignación de hormigas por worker fue: \n {ants_per_wk}')

- Esta maquina tiene 12 cpu (s).
- Se seleccionaron 6 workers en el pool y 32 hormigas.
- La asignación de hormigas por worker fue: 
 [[3], [3], [3], [3], [3], [3], [3], [3], [2], [2], [2], [2]]


In [5]:
## ejemplo con menos workers que hormigas
print(f'- Esta maquina tiene {cpu_count()} cpu (s).')
n_ants = 8
n_cpu = cpu_count()
print(f'- Se seleccionaron {n_cpu} workers en el pool y {n_ants} hormigas.')
ants_per_wk = assign_ants_threats(n_ants, n_cpu=cpu_count())
print(f'- La asignación de hormigas por worker fue: \n {ants_per_wk}')

- Esta maquina tiene 12 cpu (s).
- Se seleccionaron 12 workers en el pool y 8 hormigas.
- La asignación de hormigas por worker fue: 
 [[1], [1], [1], [1], [1], [1], [1], [1]]


## _Pool_ de _workers_

- Función por _worker_

Para poder distribuir los trabajos sobre cada _worker_, debemos definir la función que cada uno ejecutará. En particular, se busca que el número asignado de hormigas a cada _worker_ recorra el grafo buscando una solución al problema TSP, y se devuelva la distancia y el recorrido hecho por cada hormiga en forma de una lista de tuplas. 

Esto se realizó en la función `n_ants_walk`:

In [6]:
def n_ants_walk(n_ants, G):
    ants_in_thread = [ant(G) for i in range(n_ants)]
    # solution for each ant
    for a in ants_in_thread:
        a.walk_over_graph(0, 'inf', A)
    # tuple with sln for each ant
    slns = [(a.route, a.r_len) for a in ants_in_thread]
    return slns

- Aplanar lista de listas
Dado que `pool.starmap` nos devolverá una lista de listas anidada, bsucamos aplanarla para una lectura más fácil en nuestro algoritmo, por lo que se definió `flatten_list_of_list` para aplanar dicha lista en una sola.

In [7]:
def flatten_list_of_list(list_of_list):
    flatten_lst = []
    for l in list_of_list:
        for j in l:
            flatten_lst.append(j) 
    return flatten_lst

- Se define `multiprocessing_bt` para ejecutar la función `n_ants_walk` en el _pool_ de _workers_. Una vez terminado el proceso se devuelve una lista de tuplas con los recorridos de todas las hormigas y sus distancias asociadas. 

In [8]:
def multiprocessing_bt(ants_per_threat, num_cpu):
    with multiprocessing.Pool(processes=num_cpu) as pool:
        results = pool.starmap(n_ants_walk, ants_per_threat)
        
    return flatten_list_of_list(results)

## Reimplementación de `colony`

Finalmente se incluye el paso de multiprocesamiento en el método `solve_tsp` de la clase `colony`. A modo de ejemplo se reimplementará la clase `colony_multiw`con los ajustes correspondientes.

In [9]:
class colony_multiw():
    """Clase que representa una colonia de hormigas que recorren
    el grafo asignado para resolver el problema TSP.

    Args:
        G (networkx graph): Grafo con relaciones asociadas entre nodos
        init_node (int): Nodo inicial del recorrido.
        best_route (list, optional): Ruta con respecto a la cual se quiere mejorar.
        best_dist ([type], optional): Distancia total del recorrido x_best.
        n_ants (int, optional): Número de hormigas. Default es 2.
        max_iter (int, optional): [description]. Default es 100.
        alpha (int, optional): Factor de influencia de tau. Defaults to 1.
        beta (int, optional): Factor de influencia de eta. Defaults to 5.
        rho (float, optional): Tasa de evaporación de las feromonas. Defaults to .5.
        verbose (int, optional): Imprime progreso del algoritmo cada K iteraciones. Defaults to 10.
    """
    def __init__(self, G, init_node,
                 best_route = [],
                 best_dist = float('inf'),
                 n_ants=2,
                 max_iter=10, 
                 alpha=1, 
                 beta=5, 
                 rho=.5,
                 n_workers = 1,
                 verbose=10):
        self.graph = G
        self.A = None
        self.init_node = init_node
        self.best_route = best_route
        self.best_dist = best_dist
        self.lenghts = create_dic_dist_from_graph(self.graph)
        self.n_ants = n_ants
        self.max_iter = max_iter
        self.alpha = alpha
        self.beta = beta
        self.rho = rho
        self.n_workers = n_workers
        self.ants_per_worker = assign_ants_threats(self.n_ants, self.n_workers)
        self.tau = init_ferom(self.graph)
        self.eta = init_atrac(self.graph, self.lenghts)
        
    def _update_pheromone_levels(self, route, dist_route):
        """Actualiza el nivel de feromonas en las respectivas trayectorias
        del grafo.

        Args:
            route (lst): Lista que incluye un recorrido por el grafo.
            dist_route (float): Distancia de la ruta.
        """
        for i in range(1, len(route[:-1])):
            self.tau[i-1][i] += 1/dist_route
        self.tau[route[:-1][-1]][self.init_node]
        
    def _update_many_pheromone_levels(self, routes, distances):
        """Actualiza los niveles de feromonas para diferentes rutas
        recorridas por diferentes hormigas.

        Args:
            routes (lst of lst): Lista que contiene los recorridos 
            realziados por diferentes hormigas.
            distances (lst of floats): Lista con las distancias de
            las rutas.
        """
        if (len(distances) == len(self.graph.nodes)):
            for i in range(len(routes)):
                self._update_pheromone_levels(routes[i], distances[i])
                
    def _evaporates_pheromone(self):
        """Evapora los niveles de feromonas en todos los tramos del 
        grafo.
        """
        for e in self.tau:
            for v in self.tau:
                self.tau[e][v] = (1-self.rho)*v
                
    def _n_ants_walk(self, n_ants):
        ants_in_thread = [ant(self.graph) for i in range(n_ants)]
        # solution for each ant
        for a in ants_in_thread:
            a.walk_over_graph(0, 'inf', self.A)
        # tuple with sln for each ant
        slns = [(a.route, a.r_len) for a in ants_in_thread]
        return slns
    
    def _multiprocessing_bt(self, ants_per_threat, num_cpu):
        with Pool(processes=num_cpu) as pool:
            results = pool.starmap(self._n_ants_walk, ants_per_threat)
        
        return flatten_list_of_list(results)
    
    def _colony_run(self, A):
        """La hormigas de la colonia realizan recorridos 
        independientes simultáneamente.

        Args:
            A (dic): nivel de atracción de los nodos con respecto
            a sus vecinos.
        """
        # multiprocessing
        ants_journey = self._multiprocessing_bt(self.ants_per_worker, self.n_workers)
        
        # get paths and distances
        routes, distances = zip(*ants_journey)
        # updates pheromone levels
        self._update_many_pheromone_levels(routes, distances)
            
        # best route
        bst_idx = np.argmin(distances)
        min_dist = distances[bst_idx]
        bst_route = routes[bst_idx]
        
        # improves route if possible
        if min_dist < self.best_dist:
            self.best_dist = min_dist
            self.best_route = bst_route
            
    def solve_tsp(self):
        """Resuelve el problema TSP.
        """
        route = self.best_route
        dist = self.best_dist
        
        for k in range(self.max_iter):
            self.A = atraccion_nodos(self.graph,tau= self.tau, eta=self.eta, 
                                alpha=self.alpha, beta=self.beta)
            
            if k>1:
                self._evaporates_pheromone()
                
            # ants running across the graph
            self._colony_run(self.A)
            
    def plot_route(self, plt_size=(12, 8)):
        """Grafica la trayectoria encontrada por la colonia en el grafo.

        Args:
            plt_size (tuple, optional): Tamaño del gŕafico (ancho x altura). Defaults es (12, 8).
        """
        graph_optim_path(self.graph, self.best_route, self.best_dist, plt_size)

A continuación se resuelve el problema *TSP* para el conjunto de datos seleccionados utilizando la nueva clase.

In [10]:
colony_mw = colony_multiw(G, init_node=0,  n_ants= 100, n_workers=12)

In [11]:
start_time = time.time()
colony_mw.solve_tsp()
end_time = time.time()

In [12]:
secs = end_time-start_time
print("La solucion con pool de workers tomó", secs, "segundos." )
print(f"Distancia {colony_mw.best_dist} kms.")

La solucion con pool de workers tomó 1.3901257514953613 segundos.
Distancia 358.34575075242924 kms.


## Comparación con clase `colony`

In [13]:
colony_old = colony(G, init_node=0,  n_ants= 100)

In [14]:
start_time = time.time()
colony_old.solve_tsp()
end_time = time.time()

In [15]:
secs = end_time-start_time
print("La solucion sin pool tomó", secs, "segundos." )
print(f"Distancia {colony_old.best_dist} kms.")

La solucion sin pool tomó 8.584644079208374 segundos.
Distancia 358.34575075242924 kms.


## Referencias
- Capítulo V del libro de optimización (Cómputo en paralelo usando CPUs en un sistema de memoria compartida (SMC): https://itam-ds.github.io/analisis-numerico-computo-cientifico/V.optimizacion_de_codigo/5.4/Computo_en_paralelo_usando_CPUS_en_SMC.html#
- Librería [`multiprocessing`](https://docs.python.org/3.1/library/multiprocessing.html): https://docs.python.org/3.1/library/multiprocessing.html