# Programación en Paralelo

## Hilos daemon

In [15]:
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: Empezando", name)
    time.sleep(2)
    logging.info("Thread %s: Terminando", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : antes de crear thread (hilo)")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : antes de correr thread (hilo)")
    x.start()
    logging.info("Main    : espera a que termine el thread(hilo)")
    # x.join()
    logging.info("Main    : todo listo")

20:40:57: Main    : antes de crear thread (hilo)
20:40:57: Main    : antes de correr thread (hilo)
20:40:57: Thread 1: Empezando
20:40:57: Main    : espera a que termine el thread(hilo)
20:40:57: Main    : todo listo
20:40:59: Thread 1: Terminando


## Hilos con Join

In [20]:
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: Empezando", name)
    time.sleep(2)
    logging.info("Thread %s: Terminando", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(5):
        logging.info("Main    : Crea e inicializa el hilo %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : antes de unir al thread(hilo) %d.", index)
        thread.join()
        logging.info("Main    : thread %d creado", index)
        
#Python te ofrece varias primitivas que verás más adelante para ayudar a coordinar los hilos y hacerlos funcionar juntos

02:37:47: Main    : Crea e inicializa el hilo 0.
02:37:47: Thread 0: Empezando
02:37:47: Main    : Crea e inicializa el hilo 1.
02:37:47: Thread 1: Empezando
02:37:47: Main    : Crea e inicializa el hilo 2.
02:37:47: Thread 2: Empezando
02:37:47: Main    : Crea e inicializa el hilo 3.
02:37:47: Thread 3: Empezando
02:37:47: Main    : Crea e inicializa el hilo 4.
02:37:47: Thread 4: Empezando
02:37:47: Main    : antes de unir al thread(hilo) 0.
02:37:49: Thread 0: Terminando
02:37:49: Main    : thread 0 creado
02:37:49: Main    : antes de unir al thread(hilo) 1.
02:37:49: Thread 1: Terminando
02:37:49: Main    : thread 1 creado
02:37:49: Main    : antes de unir al thread(hilo) 2.
02:37:49: Thread 2: Terminando
02:37:49: Main    : thread 2 creado
02:37:49: Main    : antes de unir al thread(hilo) 3.
02:37:49: Thread 3: Terminando
02:37:49: Main    : thread 3 creado
02:37:49: Main    : antes de unir al thread(hilo) 4.
02:37:49: Thread 4: Terminando
02:37:49: Main    : thread 4 creado


## Usando un ThreadPoolExecutor

In [23]:
import concurrent.futures

# [rest of code]

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(thread_function, range(8))

02:41:19: Thread 0: Empezando
02:41:19: Thread 1: Empezando
02:41:19: Thread 2: Empezando
02:41:19: Thread 4: Empezando
02:41:19: Thread 3: Empezando
02:41:19: Thread 5: Empezando
02:41:19: Thread 6: Empezando
02:41:19: Thread 7: Empezando
02:41:21: Thread 0: Terminando
02:41:21: Thread 1: Terminando
02:41:21: Thread 2: Terminando
02:41:21: Thread 3: Terminando
02:41:21: Thread 4: Terminando
02:41:21: Thread 5: Terminando
02:41:21: Thread 6: Terminando
02:41:21: Thread 7: Terminando


In [29]:
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: Actualiza el inicio", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: Actualiza el término", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Probando la Actualización. Inicializando el valor es  %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(5):
            executor.submit(database.update, index)
    logging.info("Probando la actualización.Terminando el valor es %d.", database.value)

03:34:56: Probando la Actualización. Inicializando el valor es  0.
03:34:56: Thread 0: Actualiza el inicio
03:34:56: Thread 1: Actualiza el inicio
03:34:56: Thread 0: Actualiza el término
03:34:56: Thread 2: Actualiza el inicio
03:34:56: Thread 1: Actualiza el término
03:34:56: Thread 3: Actualiza el inicio
03:34:56: Thread 2: Actualiza el término
03:34:56: Thread 4: Actualiza el inicio
03:34:56: Thread 3: Actualiza el término
03:34:56: Thread 4: Actualiza el término
03:34:56: Probando la actualización.Terminando el valor es 3.


In [1]:
import os
import time
import threading
import multiprocessing
 
NUM_WORKERS = 4
 
def only_sleep():
    """ Do nothing, wait for a timer to expire """
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    time.sleep(1)
 
 
def crunch_numbers():
    """ Do some computations """
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    x = 0
    while x < 10000000:
        x += 1

In [2]:
## Run tasks serially
start_time = time.time()
for _ in range(NUM_WORKERS):
    only_sleep()
end_time = time.time()
 
print("Serial time=", end_time - start_time)
 
# Run tasks using threads
start_time = time.time()
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print("Threads time=", end_time - start_time)
 
# Run tasks using processes
start_time = time.time()
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print("Parallel time=", end_time - start_time)

PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
Serial time= 4.002037525177002
PID: 2828, Process Name: MainProcess, Thread Name: Thread-6
PID: 2828, Process Name: MainProcess, Thread Name: Thread-7
PID: 2828, Process Name: MainProcess, Thread Name: Thread-8
PID: 2828, Process Name: MainProcess, Thread Name: Thread-9
Threads time= 1.0158934593200684
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
Parallel time= 4.169016361236572


In [3]:
start_time = time.time()
for _ in range(NUM_WORKERS):
    crunch_numbers()
end_time = time.time()
 
print("Serial time=", end_time - start_time)
 
start_time = time.time()
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print("Threads time=", end_time - start_time)
 
 
start_time = time.time()
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print("Parallel time=", end_time - start_time)

PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
PID: 2828, Process Name: MainProcess, Thread Name: MainThread
Serial time= 2.0026726722717285
PID: 2828, Process Name: MainProcess, Thread Name: Thread-10
PID: 2828, Process Name: MainProcess, Thread Name: Thread-11
PID: 2828, Process Name: MainProcess, Thread Name: Thread-12
PID: 2828, Process Name: MainProcess, Thread Name: Thread-13
Threads time= 2.027620553970337
Parallel time= 0.150834321975708


## Matrices

In [34]:
#!/usr/bin/env python
import numpy
import numpy.random
import numpy.linalg
import sys
import time

def init(n):
    a = numpy.matrix(numpy.random.rand(n,n))
    b = numpy.random.rand(n,1)
    b = b.reshape((n,1))
    return a,b

def np_mult(a, b):
    c = numpy.multiply(a,b)
    return c

def manual_mult(a,b):
    c = numpy.matrix(numpy.zeros_like(a))
    for i in range(0,a.shape[0]):
        for j in range(0,a.shape[1]):
             c[i,j] = a[i,j] * b[i]
    return c

def main(argv=None):
    if argv is None:
        argv = sys.argv

    n = int(argv[1])
    niters = int(argv[2])
    a,b = init(n)

    start = time.time()
    for i in xrange(niters):
        cnp = np_mult(a,b)
    end = time.time()

    nptime = (end-start)/niters
    print('Numpy: ', nptime)

    start = time.time()
    for i in xrange(niters):
        cm = manual_mult(a,b)
    end = time.time()

    manualtime = (end-start)/niters
    print('Manual: ', manualtime)
    print('Speed difference = ', manualtime/nptime)
    print('Diff = ', numpy.linalg.norm(cm - cnp)/numpy.linalg.norm(cnp))

    return 0

if __name__ == "__main__":
    sys.exit(main())
    

ValueError: invalid literal for int() with base 10: '-f'

In [35]:
  argv = sys.argv

In [36]:
argv

['c:\\users\\jess\\appdata\\local\\programs\\python\\python37\\lib\\site-packages\\ipykernel_launcher.py',
 '-f',
 'C:\\Users\\Jess\\AppData\\Roaming\\jupyter\\runtime\\kernel-7e9ee413-d461-4837-9050-898481d36f84.json']

In [40]:
    def mv_mult(start_col,end_col,M,V):
    for i in range(0,M.shape[0]):
        for j in range(start_col,end_col):
            M[i,j] = M[i,j] * V[i,0]

    num_threads = multiprocessing.cpu_count()
    threads = []
    extra = temp.shape[1] % num_threads
    start_col = 0
    jump = temp.shape[1] / num_threads
    for i in range(0,num_threads):
        print('starting thread ', i)
        if (i <  extra):
            args = start_col, start_col+jump+1,temp,h
            p = multiprocessing.Process(target=mv_mult,args=args)
            p.start()
            threads.append(p)
            start_col += jump+1
        else:
            args = start_col, start_col+jump,temp,h
            p = multiprocessing.Process(target=mv_mult,args=args)
            p.start()
            threads.append(p)
            start_col += jump

    for i in threads:
        i.join()

In [47]:
!pip install handythread

ERROR: Could not find a version that satisfies the requirement handythread (from versions: none)
ERROR: No matching distribution found for handythread


In [46]:
import numpy as np
import math
def f(x):
    print(x)
    y = [1]*10000000
    [math.exp(i) for i in y]
def g(x):
    print(x)
    y = np.ones(10000000)
    np.exp(y)


from handythread import foreach
from processing import Pool
from timings import f,g

def fornorm(f,l):
    for i in l:
        f(i)
        
time(fornorm(g,range(100)))

time(fornorm(f,range(10)))
time(foreach(g,range(100),threads=2))
time(foreach(f,range(10),threads=2))
p = Pool(2)
time(p.map(g,range(100)))
time(p.map(f,range(100)))

ModuleNotFoundError: No module named 'handythread'