# Concurrencia
<a href="https://colab.research.google.com/github/milocortes/diplomado_ciencia_datos_mide/blob/edicion-2023/templates/concurrencia_mide_2023.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
## La instrucción lscpu muestra información acerca de la arquitectura del CPU


## Multiprocesamiento

* Para crear un proceso *hijo* se invoca al constructor <code>Process</code>, se especifica la función a ejecutar (sin paréntesis) y se especifican en una tupla los argumentos de la función.
* La clase  <code>Process</code> tiene varias variables de instancia, *i.e.* una variable que se relaciona con una única instancia de una clase, como  <code>pid</code> y <code>name</code>, que son públicas y con las cuales podemos identificar al proceso. Por ejemplo, <code>current process().pid</code> nos da el pid (Process ID) del proceso que está ejecutándose actualmente.
* La clase <code>Process</code> tiene un método <code>start</code> que debe ser llamado por el proceso para iniciar su tarea.

### Creación de múltiples procesos

In [None]:
# Definimos una función que será pasada como target y que recibe un argumento
def sayHi2(n):
    print("Hola {} desde el proceso {}\n".format(n, current_process().pid))

if __name__ == '__main__':

    name = "Juan"

    print("Hola desde el proceso {} (proceso main)".format(current_process().pid))

    # Creamos tres procesos hijos
    p1 = Process(target = sayHi2, args = (name,))
    p2 = Process(target = sayHi2, args = (name,))
    p3 = Process(target = sayHi2, args = (name,))

    # Cuando creas múltiples procesos, estos corren idependientemente de cada uno.
    p1.start()
    p2.start()
    p3.start()


### Creación de múltiples procesos usando <code>Pool</code>

Mientras que el constructor <code>Process</code> para crear explícitamente procesos independiente, el módulo <code>multuprocessing</code> ofrece otras alternativas para la creación de procesos. Una de ellas es mediante el mecanismo <code>Pool</code> y <code>Pool.map</code>.

En este enfoque todos los procesos ejecutan la misma función(<code>sayHi2</code>) y los argumentos son pasados mediante una lista de argumentos a distribuirse a los procesos.  

El método <code>map</code> se encarga de distribuir los argumentos al pool de procesos. El método regresa una lista con los valores de cada una de las ejecuciones de la función <code>sayHi2</code> de cada proceso. Dado que la función <code>sayHi2</code> no regresa un valor, obtenemos una lista de <code>None</code> .

In [None]:
import time
import datetime
import random 

# Definimos una función que será pasada como target y que recibe dos argumentos 
def sayHi3(argumentos):
    nombre, apellido = argumentos
    segundos_dormir = random.randint(1, 10)
    
    # El proceso entrará en un estado inactivo (se dormirá) segundos_dormir segundos
    time.sleep(segundos_dormir)
    
    # Cuando el proceso despierta, calcula el tiempo actual y lo imprime
    now = datetime.datetime.now()
    
    print("Hola {} desde el proceso {} {}, dormí {} segundos y desperté a las {}\n".format(nombre,
                                                                                        apellido, 
                                                                                        current_process().pid,
                                                                                        segundos_dormir,
                                                                                        now.time()))
    # El proceso regresa los segundos que estuvo dormido
    return segundos_dormir
    

if __name__ == '__main__':

    print("Hola desde el proceso {} (proceso main)".format(current_process().pid))

    name = "Juan"
    apellido = "González"

    p = Pool(processes = 3)
    resultados = p.map(sayHi3, [(name, apellido), (name, apellido), (name, apellido)])
    print(resultados)

In [None]:
## Creación de procesos anónimos

"""
Un proceso (o cualquier objeto) no tiene que estar almacenado en una variable para ser utilizado.
Pero si no se almacena en una variable, no se podrá usar después que es instanciado.
Un objeto que no está almacenado en una variable se llama un objeto anónimo.
"""
def sayHi4(nombre):
    print("Hola {} desde el proceso {}-{}".format(nombre, current_process().name,current_process().pid))
    

if __name__ == '__main__':

    nombre = "Juan"


    for i in range(3):
        Process(target = sayHi4, args = (nombre,), name = str(i)).start()

In [None]:
## Creación de procesos anónimos almacenados en una lista

"""
Si necesitas acceder a los procesos en algún otro momento, podrías guardarlos en una lista al momento de crearlos.
"""

if __name__ == '__main__':

    nombre = "Juan"
    n_workes = 4

    workers  = [Process(target = sayHi4, args = (nombre,), name = str(i)) for i in range(n_workes)]

    for worker in workers:
        worker.start()


## Sincronización entre procesos y variables (memoria) compartida

El módulo <code>multiprocessing</code> contiene los equivalentes de todas las **primitivas de sincronización** del módulo <code>threading</code>. **Primitivas de sincronización** corresponden a mecanísmos básicos con los cuales podemos construir mecanismos de sincronización más sofisticados.

Veremos la implementación de un candado (*lock*) el cual forza que sólo **un proceso a la vez** un bloque de código que modifica un dato compartido, es decir, nos da un acceso exclusivo. El trabajo del candado es asegurar que un thread o proceso tenga acceso  exclusivo a algún dato compartido al bloquear temporalmente a otros threads o procesos que están intentando acceder a él al mismo tiempo.

En Python, el candado tiene dos estados: <code>acquire</code> y <code>release</code>. Cuando un hilo o proceso <code>acquire</code> el candado, este puede ejecutar la operación inmediatamente. Cuando un hilo o proceso intenta <code>acquire</code> un candado que ya está adquirido, el thread o proceso es bloqueado hasta que el candado sea liberado ,<code>release</code>, por el thread o proceso que lo mantiene en su poseción. Cuando el thread o proceso libera el candado, el resto de los theads o procesos contienden entre sí por adquirir el candado.

Proteger un dato compartido con un candado puede verse como un acuerdo entre todos los threads o procesos que todos accederán al dato cuando hayan adquirido el candado. De esta manera, dos o más hilos no pueden acceder al dato compartido concurrentemente y causar una **condición de carrera**, *race condition*.

La **Condición de Carrera** ocurre cuando varios hilos están realizando operaciones de forma concurrente a un dato compartido, y el resultado de las operaciones depende de la forma como los hilos fueron calendarizados por el sistema operativo.

Un caso particular de condición de carrera es *data race*, que es cuando ...


La biblioteca <code>multiprocessing</code> nos permite valores que pueden ser compartidos por los procesos. Por ejemplo, <code>Value</code> nos permite crear un valor compartido de un tipo específico y un valor inicial:

* <code>num = Value('d', 0.0)</code>, indica que la variable <code>num</code> puede ser accedida por todos los procesos, es de tipo <code>'d'</code>, flotante de doble precisión, y toma el valor inicial de 0.0.
* <code>arr = Array('i', range(10))</code>, construye un arreglo con valores tipo <code>'i'</code>, enteros con signo, con 10 elementos, inicializados del 0 al 9.




In [None]:
from multiprocessing import Value, Lock, Process, current_process

def f(l, valor_compartido):
    print("Hola desde el proceso {}-{}".format(current_process().name,current_process().pid))

    l.acquire()
    for i in range(100000):
        valor_compartido.value += 1 
    l.release()
    
if __name__ == '__main__':
    
    num = Value('d', 0.0)
    
    lock = Lock( )

    p1 = Process(target = f, args = (lock,num), name = str(1))
    p2 = Process(target = f, args = (lock,num), name = str(2))

    p1.start()
    p2.start()

    """
    La operación join indica que el proceso padre (en este caso el proceso main) entra en un estado blocked 
    y espera hasta que termine la ejecución del proceso hijo. Puede verse como un mecanismo adicional de 
    sincronización entre procesos. 
    
    ¿Qué sucede si no utilizo join? El proceso padre puede terminar antes que los procesos hijos terminen su
    tarea, y ,con ello, la ejecución del programa completo.
    
    Ejemplo de ejecución sin join:
    
            Inicio de programa                        Fin del programa
                 |                                             |
                 |                                             |
    proceso main |============================================>|
                 |                                             |
                 |                                             |
                 |                                             |
    proceso 1    |=======================>                     |
                 |                                             |
                 |                                             |
    proceso 2    |=================================>           |
                 |                                             |
                 |                                             |

    Ejemplo de ejecución con join:
    
            Inicio de programa                        El proceso main  espera
                                                      hasta que terminen los procesos hijo
                 |                                             |
                 |                                             |
    proceso main |============================================>|
                 |                                             |
                 |                                             |
                 |                                             |
    proceso 1    |============================================>|
                 |                                             |
                 |                                             |
    proceso 2    |============================================>|
                 |                                             |
                 |                                             |
    
    A considerar:
    
    * p.join() hace que el proceso que se encuentra corriendo concurrentemente espere que p termine. 
    * join hace más lenta la ejecución, pues espera a que termine el proceso más lento, pero puede ser utilizado
      para hacer cumplir un orden particular de las ejecuciones.
    * Procesos anónimos no pueden ser utilizados con join.
    """
    p1.join()
    p2.join()

    print(num.value)


In [None]:
## Podemos almacenar procesos anónimos en una lista para poder 
## utilizar join
if __name__ == '__main__':
    
    num = Value('d', 0.0)
    
    lock = Lock( )
    
    n_procesos = 6
    
    workers = [Process(target = f, args = (lock,num), name = str(i)) for i in range(n_procesos)]
    
    for worker in workers:
        worker.start()
    
    for worker in workers:
        worker.join()
    
    print(num.value)

In [None]:
from multiprocessing import Array

arr = Array('i', range(10000000))

secuencial_start_time = time.time()
suma_secuencial = sum(arr)
secuencial_end_time = time.time() - secuencial_start_time

print("--- Ejecución secuencial. Suma : {} .Tiempo de ejecución : {} segundos ---".format(suma_secuencial, secuencial_end_time))

In [None]:
def get_rango(n_array, total_proc, id_proc):
    chunk = n_array//total_proc
    rango_inicio = chunk*(id_proc)
    
    if id_proc != (total_proc-1):
        rango_final = rango_inicio + chunk - 1
    else:
        rango_final = n_array - 1
    
    return (rango_inicio, rango_final)


def suma_parcial():

    chunk_proceso = get_rango(SIZE_ARRAY, TOTAL_PROC, int(current_process().name))
    suma_parcial_proc = sum(arreglo_completo[:][chunk_proceso[0]:chunk_proceso[1]+1])

    ## Un objeto compartido tiene asociado a él un candado. 
    ## De manera que podemos adquirir su candado llamando al método get_lock()
    ## para tener acceso exclusivo al dato compartido
    with suma_global.get_lock():
        suma_global.value += suma_parcial_proc
        

    
if __name__ == '__main__':
    
    SIZE_ARRAY = 10000000
    TOTAL_PROC = 4
    
    arreglo_completo = Array('i', range(SIZE_ARRAY))
    suma_global = Value('d', 0.0)
    
    workers = [Process(target = suma_parcial, args = (), name = str(i)) for i in range(TOTAL_PROC)]
    
    concurrente_start_time = time.time()
    
    for worker in workers:
        worker.start()
    
    for worker in workers:
        worker.join()
    concurrente_end_time = time.time() - concurrente_start_time
    
    print("--- Ejecución concurrente. Suma : {} .Tiempo de ejecución : {} segundos ---".format(suma_global.value, concurrente_end_time))

## Multiprocesamiento en operaciones IO

In [None]:
import pandas as pd

url_root = "https://www.inegi.org.mx/contenidos/programas/ccpv/2020/microdatos/iter/"

urls = [url_root + "ITER_{:02d}_2020_csv.zip".format(i) for i in range(1,33)]

acumula_resultados_secuencial = []

secuencial_start_time = time.time()

for url in urls:
    print(url.split("/")[-1])
    datos_entidad = pd.read_csv(url)
    acumula_resultados_secuencial.append(datos_entidad.query("MUN == 0 and LOC == 0"))

secuencial_end_time = time.time() - secuencial_start_time

print("--- Ejecución secuencial. Tiempo de ejecución : {} segundos ---".format(secuencial_end_time))


In [None]:
pd.concat(acumula_resultados_secuencial)

In [None]:
def io_task(url):
    
    print("Descargando {} por el proceso {}".format(url.split("/")[-1], current_process().pid))
 
    datos_entidad = pd.read_csv(url)

    return datos_entidad.query("MUN == 0 and LOC == 0")

if __name__ == '__main__':
    concurrente_start_time = time.time()

    p = Pool(processes = 10)
    acumula_resultados_concurrente = p.map(io_task, urls)
    
    concurrente_end_time = time.time() - concurrente_start_time
    
    print("--- Ejecución concurrente. Tiempo de ejecución : {} segundos ---".format(concurrente_end_time))
    

In [None]:
pd.concat(acumula_resultados_concurrente)