# MULTIPROCESSING

`Multiproccesing` es un paquete de python que permite la generación de procesos, el cual ofrece concurrencia local como remota.

# Ejemplo secuencial

In [51]:
import time, random
import os
import multiprocessing as mp
import matplotlib.pyplot as plt
import numpy as np

In [31]:
def calc_cuad(numeros):
    print("Calculo del cuadrado: ")
    for n in numeros:
        time.sleep(0.2)
        print("Cuadrado: ", n * n)

In [30]:
def calc_cubo(numeros):
    print("Calculo del cubo: ")
    for n in numeros:
        time.sleep(0.2)
        print("Cubo: ", n * n *n)

In [4]:
nums = range(10)

t = time.time()
calc_cuad(nums)
calc_cubo(nums)

print("Tiempo ejecución: ", time.time() - t)
print("Finaliza Ejecución")


Calculo del cuadrado: 
Cuadrado:  0
Cuadrado:  1
Cuadrado:  4
Cuadrado:  9
Cuadrado:  16
Cuadrado:  25
Cuadrado:  36
Cuadrado:  49
Cuadrado:  64
Cuadrado:  81
Calculo del cubo: 
Cubo:  0
Cubo:  1
Cubo:  8
Cubo:  27
Cubo:  64
Cubo:  125
Cubo:  216
Cubo:  343
Cubo:  512
Cubo:  729
Tiempo ejecución:  4.0833899974823
Finaliza Ejecución


Una manera sencilla de generar un proceso es por medio de la creación del objeto `Process` y llamarlo por medio del método `start()`

In [5]:
def tarea(nombre):
    print("Hola", nombre)
    
def tarea2(nombre):
    print("Adios", nombre)
    
# if __name__ == "__main__":            
Proceso1 = mp.Process(target = tarea, args = ("Ale",))

# Iniciar el Proceso 1
Proceso1.start()                      

# Pausar los demas procesos hasta terminar Proceso 1
Proceso1.join()

Hola Ale


In [6]:
nums = range(10)

t = time.time()
Proceso2 = mp.Process(target = calc_cuad, args = (nums,))

Proceso2.start()
Proceso2.join()

print("Tiempo ejecución: ", time.time() - t)
print("Finaliza Ejecución")

Calculo del cuadrado: 
Cuadrado:  0
Cuadrado:  1
Cuadrado:  4
Cuadrado:  9
Cuadrado:  16
Cuadrado:  25
Cuadrado:  36
Cuadrado:  49
Cuadrado:  64
Cuadrado:  81
Tiempo ejecución:  2.080228090286255
Finaliza Ejecución


In [7]:
nums = range(10)

Proceso2 = mp.Process(target = calc_cuad, args = (nums,))
Proceso3 = mp.Process(target = calc_cubo, args = (nums,))
Proceso4 = mp.Process(target = calc_cubo, args = (nums,))

t = time.time()

Proceso2.start()
# Proceso2.join()

Proceso3.start()
Proceso4.start()
# Proceso3.join()

Proceso2.join()
Proceso3.join()
Proceso4.join()

print("Tiempo ejecución: ", time.time() - t)
print("Finaliza Ejecución")

Calculo del cuadrado: 
Calculo del cubo: 
Calculo del cubo: 
Cuadrado:  0
Cubo:  0
Cubo:  0
Cubo:  1
Cuadrado:  1
Cubo:  1
Cubo:  8
Cuadrado:  4
Cubo:  8
Cubo:  27
Cuadrado:  9
Cubo:  27
Cubo:  64
Cuadrado:  16
Cubo:  64
Cubo:  125
Cuadrado:  25
Cubo:  125
Cuadrado:  36
Cubo:  216
Cubo:  216
Cubo:  343
Cuadrado:  49
Cubo:  343
Cuadrado:  64
Cubo:  512
Cubo:  512
Cuadrado:  81
Cubo:  729
Cubo:  729
Tiempo ejecución:  2.096680164337158
Finaliza Ejecución


In [8]:
print("Nombre del modulo: ", __name__)
print("Proceso padre: ", os.getppid())
print("Id del proceso: ", os.getpid())

Nombre del modulo:  __main__
Proceso padre:  13037
Id del proceso:  13047


In [9]:
def info(titulo):
    print(titulo)
    print("Nombre modulo: ", __name__)
    print("Proceso padre: ", os.getppid())
    print("Id del proceso: ", os.getpid())

In [10]:
def f(nombre):
    info("Function f")
    print("Hola ", nombre)
    print("-------------")

In [11]:
Proceso5 = mp.Process(target = f, args = ("Ale",))
Proceso5.start()
Proceso5.join()

Function f
Nombre modulo:  __main__
Proceso padre:  13047
Id del proceso:  13112
Hola  Ale
-------------


# Visibilidad de Variables

In [12]:
num_res = []

def calc_cuad_2(numeros):
    
    for n in numeros:
        print("Cuadrado: ", n * n)
        num_res.append(n * n)
    print("Resultado del proceso", os.getpid() ,": ", num_res)

In [13]:
numeros = range(10)
t = time.time()
Proceso6 = mp.Process(target = calc_cuad_2, args = (numeros,))

Proceso6.start()
Proceso6.join()

print("Tiempo de ejecución: ", time.time() - t)
print("Resultado del proceso: ", num_res)

Cuadrado:  0
Cuadrado:  1
Cuadrado:  4
Cuadrado:  9
Cuadrado:  16
Cuadrado:  25
Cuadrado:  36
Cuadrado:  49
Cuadrado:  64
Cuadrado:  81
Resultado del proceso 13113 :  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Tiempo de ejecución:  0.04886293411254883
Resultado del proceso:  []


Los procesos tienen su propio espacio de memoria. Así, las variables del programa no se comparten entre procesos. Es necesario crear comunicación entre procesos (IPC) si se desea compartir datos entre procesos.

## Multiprocessing

Con el método `cpu_count()` se muestra el número de procesadores del sistema que se está utilizando.

In [14]:
mp.cpu_count()

4

### Ejercicios 
#### 1.-
Crea un proceso que construya 5 procesos hijos. "Captura" los id's de los procesos hijos y crea una tabla

In [15]:
class MyProcess(mp.Process):
    def __init__(self, nombre):
        super(MyProcess, self).__init__()
        self.nombre = nombre

    def run(self):
        print("Child Process Name: {} PID: {}  Parent PID: {}".format(self.nombre, mp.current_process().pid, os.getppid()))
        
    def get_prid(self):
        return mp.current_process().pid
    
    def get_nombre(self):
        return self.nombre
    
    def get_pprid(self):
        return os.getppid()

In [16]:
def main():
    print("Main Process PID: {}".format(mp.current_process().pid))
    
    procesos = [MyProcess(str(r)) for r in range(2005,2011)]
    
    for p in procesos:
        p.start()
        p.join()
main()

Main Process PID: 13047
Child Process Name: 2005 PID: 13114  Parent PID: 13047
Child Process Name: 2006 PID: 13115  Parent PID: 13047
Child Process Name: 2007 PID: 13116  Parent PID: 13047
Child Process Name: 2008 PID: 13117  Parent PID: 13047
Child Process Name: 2009 PID: 13118  Parent PID: 13047
Child Process Name: 2010 PID: 13119  Parent PID: 13047


In [17]:
def proceso1(x):
    y = (x ** 2) + x - 2
    plt.plot(x, y)
    plt.show()
    
def proceso2(x):
    y = (x ** 3) - 2 * (x ** 2) + x + 1
    plt.plot(x, y)
    plt.show()
    

In [18]:
def main():
    print("Main Process PID: {}".format(mp.current_process().pid))

    t = time.time()
    Proceso7 = mp.Process(target = proceso1, args = (np.linspace(-10, 10),))
    Proceso8 = mp.Process(target = proceso2, args = (np.linspace(-10, 10),))
    
    Proceso7.start()
    Proceso8.start()
    
    Proceso7.join()
    Proceso8.join()
    
    print("Tiempo de ejecución paralelo: ", time.time() - t)
"""
    t = time.time()
    
    proceso1(np.linspace(-10, 10))
    proceso2(np.linspace(-10, 10))
    print("Tiempo de ejecución secuencial: ", time.time() - t)"""
    
main()

Main Process PID: 13047
Tiempo de ejecución paralelo:  0.3486621379852295


In [19]:
def reduccion_secuencial():
    num = 10e6 
    while num != 0:
        num -= 1
    
def reduccion_concurrente1():
    num = 5e6 
    while num != 0:
        num -= 1
    
def reduccion_concurrente2():
    num = 5e6 
    while num != 0:
        num -= 1
    


In [20]:
def main():
    print("Main Process PID: {}".format(mp.current_process().pid))

    t = time.time()
    Proceso9 = mp.Process(target = reduccion_concurrente1)
    Proceso10 = mp.Process(target = reduccion_concurrente2)
    
    Proceso9.start()
    Proceso10.start()
    
    Proceso9.join()
    Proceso10.join()
    
    print("Tiempo de ejecución concurrente: ", time.time() - t)

    t = time.time()
    
    reduccion_secuencial()
    print("Tiempo de ejecución secuencial: ", time.time() - t)
    
main()

Main Process PID: 13047
Tiempo de ejecución concurrente:  0.5217556953430176
Tiempo de ejecución secuencial:  0.9792940616607666


## Memoria compartida

Los datos se pueden almacenar en un mapa de memoria compartida usando `Array` o `Value`. Veamos algunos ejemplos:

In [21]:
def calc_cuad(numeros, resultado, val):
    val.value = 5.35
    for idx, n in enumerate(numeros):
        resultado[idx] = n * n

    print("Resultado del proceso:", result[:])
    
nums = range(10)

t = time.time()
result = mp.Array('i', 10)
val = mp.Value('d', 0.0)
p1 = mp.Process(target = calc_cuad, args = (nums, result, val))

p1.start()
p1.join()

print("Tiempo de ejecución: ", time.time() - t)
print("Resultado del proceso: ", result[:])
print("Resultado del proceso: ", val.value)

Resultado del proceso: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Tiempo de ejecución:  0.07104301452636719
Resultado del proceso:  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Resultado del proceso:  5.35


In [22]:
def f(conn):
    conn.send("Hello World!")
    conn.close()

In [23]:
parent_conn, child_conn = mp.Pipe()
p = mp.Process(target = f, args = (child_conn,))
p.start()
print(p.recv())

AttributeError: 'Process' object has no attribute 'recv'

In [24]:
nombres = ["Carlos", "Renata", "Sandra", "END"]

def send_msgs(conn, msgs):
    for msg in msgs:
        conn.send(msg)
    conn.close()
    
def recv_msgs(conn):
    while 1:
        msg = conn.recv()
        if msg == "END":
            break
    conn.close()

In [25]:
def deposit(balance, lock):   
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        
        balance.value += 1
        lock.release()
        
def withdraw(balance, lock):
    for i in range(100):
        lock.acquire()
        time.sleep(0.01)
        balance.value -= 1
        lock.release()

In [26]:
balance = mp.Value('i', 200)
lock = mp.Lock()
d = mp.Process(target = deposit, args = (balance, lock))
w = mp.Process(target = withdraw, args = (balance, lock))

d.start()
w.start()

d.join()
w.join()
print(balance.value)

200


In [34]:
datos = [1,2,3,4,5]
p = mp.Pool()
# resultado = p.map(calc_cuad, datos)
# print(resultado)

Process ForkPoolWorker-45:
Process ForkPoolWorker-44:
Process ForkPoolWorker-43:
Process ForkPoolWorker-42:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiproce

## Ejercicios

1.- Padre crea dos hijos
    Hijo 1: Crea un archivo de texto(10 lineas)
    Hijo 2: Lee el archivo y crea una copia
    
2.- Crea dos procesos

    P1 : Hace la suma x = x + 1
    p2 : Hace la suma y = x + 1
    X puede ser una lista de valores

3.- Simula un proceso de supermercado. Se tiene 10 clientes con 10 productos c/u 

    A) Crea un proceso (cajera) que atienda los 10 clientes, calcula el tiempo.
    B) Crea dos procesos (2 cajeras) que atiendan 5 clientes c/u calcula tiempo.

In [45]:
def crea_archivo(archivo):
    f = open(archivo, "w+")
    for i in range(10):
        f.write("\n Escribo linea: " + str(i))
    f.close()

In [46]:
def leer_archivo(archivo):
    f = open(archivo, "r")
    f1 = open("archivo_nuevo.txt", "w+")
    for linea in f:
        f1.write(linea)
    f.close()
    f1.close()

In [54]:
archivo = "archivito.txt"
t = time.time()
p10 = mp.Process(target = crea_archivo, args = (archivo,))
p11 = mp.Process(target = leer_archivo, args = (archivo,))

p10.start()
p11.start()
p10.join()
p11.join()
print("Tiempo: ", time.time() - t)

Tiempo:  0.0325009822845459


In [76]:
def proceso_suma(numeros, lock):
    lock.acquire()
    numeros.value = numeros.value + 1
    lock.release()
    return numeros

In [77]:
def proceso_suma_2(numeros, lock):
    lock.acquire()
    numero = numeros.value + 1
    lock.release()
    return numero

In [78]:
# Entero
X = mp.Value('i', 5)
lock = mp.Lock()
P1 = mp.Process(target = proceso_suma, args = (X, lock))
P2 = mp.Process(target = proceso_suma_2, args = (X, lock))

P1.start()
P2.start()

P1.join()
P2.join()
print(X.value)

6


In [116]:
def proceso_suma_a(numeros):
    for i in range(len(numeros)):
        numeros[i] = numeros[i] + 1
    return numeros

In [119]:
def proceso_suma2_a(numeros):
    print([n for n in numeros])
    for i in range(len(numeros)):
        numeros[i] = numeros[i] + 1
    return numeros

In [None]:
# Entero
t = time.time()
lock = mp.Lock()
X = mp.Array('i', range(1000000), lock=lock)
P1 = mp.Process(target = proceso_suma_a, args = (X,))
P2 = mp.Process(target = proceso_suma2_a, args = (X,))

P1.start()
P2.start()

P1.join()
P2.join()
print("Tiempo ejecución: ", time.time() - t)

In [92]:
def cajera(clientes):
    total = 0
    
    while clientes.empty() == False:
        print('Cajera %s antendiendo...' % (mp.current_process().name))
        c = clientes.get()
        total += c.total
        if clientes.empty() == True:
            break;
        
    print("Total: $", total, " Cajera: ", mp.current_process().name)

In [93]:
class Cliente():
    def __init__(self, productos):
        self._productos = productos
    
    @property
    def productos(self):  
        return self._productos
    
    @property
    def total(self):
        return random.randint(1, 3000)
    

In [94]:
t = time.time()
lock = mp.Lock()
clientes = mp.SimpleQueue()
[clientes.put(Cliente(10)) for _ in range(10)]
P1 = mp.Process(target = cajera, args = (clientes,))
P2 = mp.Process(target = cajera, args = (clientes,))

P1.start()
P2.start()

P1.join()
P2.join()
print("Tiempo ejecución: ", time.time() - t)


Cajera Process-51 antendiendo...
Cajera Process-52 antendiendo...
Cajera Process-51 antendiendo...
Cajera Process-52 antendiendo...
Cajera Process-51 antendiendo...
Cajera Process-52 antendiendo...
Cajera Process-51 antendiendo...
Cajera Process-52 antendiendo...
Cajera Process-51 antendiendo...
Cajera Process-51 antendiendo...
Cajera Process-52 antendiendo...
Total: $ 6834  Cajera:  Process-51


Process Process-52:
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-92-de8ac6a37991>", line 6, in cajera
    c = clientes.get()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)


KeyboardInterrupt: 

  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
