# Concurrency ‚ö°Ô∏è

### Threads / async / multiprocessing

### Asincron√≠a vs paralelismo

* enviar/recibir datos a trav√©s de la red
* leer el contenido de un archivo dentro de nuestro programa
* escribir datos de nuestro programa en el disco
* esperar a que finalice una operaci√≥n en una API remota
* esperar a que finalice una operaci√≥n en una base de datos 
* etc.

### Que puede ser paralelizado y que no

Cuando llegamos a los detalles, solo el `multiprocessing` realmente ejecuta estos hilos de procesamiento literalmente al mismo tiempo.

`threading` y `asyncio` se ejecutan en un √∫nico proceso y, por lo tanto, solo se ejecutan uno a la vez. Simplemente encuentran formas de turnarse para acelerar el proceso general. A√∫n as√≠ llamamos a esto concurrencia.

* threading: multitarea apropiativa (OS. El SO decide.) https://es.wikipedia.org/wiki/Multitarea_apropiativa  
* asyncio: multitarea cooperativa (Tu. Cada proceso cede el control.) https://es.wikipedia.org/wiki/Multitarea_cooperativa

### Memoria compartida vs replicaci√≥n

Cosas m√°s lentas que la CPU; I/O o network bound.

![I/O - networking](https://files.realpython.com/media/IOBound.4810a888b457.png)

No asociado a I/O, mucha computaci√≥n; CPU bound.

![](https://files.realpython.com/media/CPUBound.d2d32cb2626c.png)

In [1]:
sites = [
    "https://www.yahoo.com/",
    "http://www.cnn.com",
    "http://www.python.org",
    "http://www.jython.org",
    "http://www.pypy.org",
    "http://www.perl.org",
    "http://www.cisco.com",
    "http://www.facebook.com",
    "http://www.twitter.com",
    "http://www.macrumors.com/",
    "http://arstechnica.com/",
    "http://www.reuters.com/",
    "http://abcnews.go.com/",
    "http://www.cnbc.com/",
    "http://olympus.realpython.org/dice",
    "https://realpython.com/",
]

In [3]:
import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(duration)

Read 332919 from https://www.yahoo.com/
Read 1122956 from http://www.cnn.com
Read 49061 from http://www.python.org
Read 10287 from http://www.jython.org
Read 6761 from http://www.pypy.org
Read 12310 from http://www.perl.org
Read 92271 from http://www.cisco.com
Read 132310 from http://www.facebook.com
Read 299504 from http://www.twitter.com
Read 332984 from http://www.macrumors.com/
Read 87428 from http://arstechnica.com/
Read 197151 from http://www.reuters.com/
Read 202455 from http://abcnews.go.com/
Read 903057 from http://www.cnbc.com/
Read 274 from http://olympus.realpython.org/dice
Read 38714 from https://realpython.com/
10.796195030212402


### Threading

In [8]:
import concurrent.futures
import requests
import threading
import time

# Threading.local() creates an object that look like a global but is specific to each individual thread.
thread_local = threading.local()


# requests.Session() is not thread-safe
def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    # with -> create and free thread pool
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(duration)

Read 49061 from http://www.python.org
Read 10287 from http://www.jython.org
Read 1123136 from http://www.cnn.com
Read 12310 from http://www.perl.org
Read 92271 from http://www.cisco.com
Read 6761 from http://www.pypy.org
Read 333170 from http://www.macrumors.com/
Read 132370 from http://www.facebook.com
Read 333952 from https://www.yahoo.com/
Read 197386 from http://www.reuters.com/
Read 274 from http://olympus.realpython.org/dice
Read 202455 from http://abcnews.go.com/
Read 295510 from http://www.twitter.com
Read 87428 from http://arstechnica.com/
Read 903401 from http://www.cnbc.com/
Read 38714 from https://realpython.com/
2.221498966217041


La parte del `Thread`. Eso es solo un hilo de procesamiento que mencionamos anteriormente. `Pool` es donde comienza a ponerse interesante. Este objeto va a crear un grupo de subprocesos, cada uno de los cuales puede ejecutarse simult√°neamente. Finalmente, el `Executor` es la parte que controlar√° c√≥mo y cu√°ndo se ejecutar√° cada uno de los hilos del grupo.

![](https://files.realpython.com/media/Threading.3eef48da829e.png)

In [19]:
import concurrent.futures


# counter = 0


def increment_counter(fake_value):
    global counter
    for _ in range(100):
        counter += 1


fake_data = [x for x in range(5000)]

counter = 0

with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
    executor.map(increment_counter, fake_data)


print(counter)

324143


### Async

The general concept of asyncio is that a single Python object, called the event loop, controls how and when each task gets run. The event loop is aware of each task and knows what state it‚Äôs in. In reality, there are many states that tasks could be in, but for now let‚Äôs imagine a simplified event loop that just has two states.

The ready state will indicate that a task has work to do and is ready to be run, and the waiting state means that the task is waiting for some external thing to finish, such as a network operation.

Your simplified event loop maintains two lists of tasks, one for each of these states. It selects one of the ready tasks and starts it back to running. That task is in complete control until it cooperatively hands the control back to the event loop.

When the running task gives control back to the event loop, the event loop places that task into either the ready or waiting list and then goes through each of the tasks in the waiting list to see if it has become ready by an I/O operation completing. It knows that the tasks in the ready list are still ready because it knows they haven‚Äôt run yet.

Once all of the tasks have been sorted into the right list again, the event loop picks the next task to run, and the process repeats. Your simplified event loop picks the task that has been waiting the longest and runs that. This process repeats until the event loop is finished.

An important point of asyncio is that the tasks never give up control without intentionally doing so. They never get interrupted in the middle of an operation. This allows us to share resources a bit more easily in asyncio than in threading. You don‚Äôt have to worry about making your code thread-safe.

**Any function that calls await needs to be marked with async. You‚Äôll get a syntax error otherwise.**

No hay que preocuparse del n√∫mero de threads que crear

https://markhneedham.com/blog/2019/05/10/jupyter-runtimeerror-this-event-loop-is-already-running/

In [25]:
import asyncio
import time
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        # result = await response.content_length
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
    # You can share the session across all tasks, so the session is created here as a context manager.
    # The tasks can share the session because they are all running on the same thread.
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            # ensure_future also takes care of starting the tasks
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        # print(tasks)
        # Once all the tasks are created, this function uses asyncio.gather()
        # to keep the session context alive until all of the tasks have completed.
        await asyncio.gather(*tasks, return_exceptions=True)


start_time = time.time()

await download_all_sites(sites)

duration = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")

Read 12310 from http://www.perl.org
Read 49061 from http://www.python.org
Read 44646 from http://www.reuters.com/
Read 3555 from http://www.jython.org
Read None from http://www.macrumors.com/
Read 274 from http://olympus.realpython.org/dice
Read 19370 from http://www.cisco.com
Read 153224 from http://www.cnn.com
Read None from http://www.facebook.com
Read 149830 from http://www.cnbc.com/
Read None from https://realpython.com/
Read None from http://abcnews.go.com/
Read 6761 from http://www.pypy.org
Read None from https://www.yahoo.com/
Read None from http://arstechnica.com/
Read 52759 from http://www.twitter.com
Downloaded 16 sites in 1.1341047286987305 seconds


![](https://files.realpython.com/media/Asyncio.31182d3731cf.png)

**ATENCI√ìN**

**ATENCI√ìN**

**ATENCI√ìN**

Las siguientes celdas demuestran como descargar un set de im√°genes aleatorias de la API de [unsplash](https://unsplash.com/). Una de las celdas las escribe en una carpeta `images/` la otra en `images2/`.


En la carpeta donde est√° este notebook est√°n estas dos carpetas vacias. Pero revisa todo bien para que no se sobreescriba nada que no quieras.

Las celdas est√°n aqu√≠ para demostrar como descargar muchas im√°genes de forma as√≠ncrona usando [aiohtpp](https://docs.aiohttp.org/en/stable/) en el primer caso, y [httpx](https://github.com/encode/httpx) en el segundo.

In [6]:
from itertools import repeat

from string import ascii_lowercase, printable
from random import choice


def random_string(string_length=15):
    """Generate a random string of fixed length """
    letters = ascii_lowercase
    return "".join(choice(letters) for i in range(string_length))


import asyncio
import aiohttp
import aiofiles


async def download_site(session, url):
    async with session.get(url) as response:
        c = await response.read()
        await write_file(c)


async def write_file(content):

    filename = "images/" + random_string() + ".jpg"
    async with aiofiles.open(filename, mode="wb") as f:
        await f.write(content)


urls = [
    "https://source.unsplash.com/1600x900/?nature,water," + random_string(6)
    for _ in range(50)
]


async def dl():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)

        await asyncio.gather(*tasks, return_exceptions=True)


start = time.perf_counter()
await dl()
print(f"total = {time.perf_counter() - start}")

CPU times: user 2 ¬µs, sys: 1e+03 ns, total: 3 ¬µs
Wall time: 22.2 ¬µs
total = 4.9483451249998325


In [7]:
from itertools import repeat

from string import ascii_lowercase
from random import choice


def random_string(string_length=15):
    """Generate a random string of fixed length """
    letters = ascii_lowercase
    return "".join(choice(letters) for i in range(string_length))


import asyncio
import time
import httpx
import aiofiles


async def download_site(client, url):
    r = await client.get(url)
    await write_file(r.content)


async def write_file(content):

    filename = "images2/" + random_string() + ".jpg"
    async with aiofiles.open(filename, mode="wb") as f:
        await f.write(content)


urls = [
    "https://source.unsplash.com/1600x900/?nature,water," + random_string(10)
    for _ in range(50)
]


async def dl():
    async with httpx.AsyncClient() as client:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(download_site(client, url))
            tasks.append(task)

        await asyncio.gather(*tasks, return_exceptions=True)


start = time.perf_counter()
await dl()
print(f"total = {time.perf_counter() - start}")

total = 4.706697888000235


Ejercicio opcional, leer y entender este c√≥digo: https://pybay.com/site_media/slides/raymond2017-keynote/async_examples.html

In [26]:
# adapted from https://gist.github.com/bradmontgomery/81d71e415b0ff693f00408388590acb9

import hashlib
import sys

from concurrent.futures import ProcessPoolExecutor
from time import sleep, time


def t1(n):
    """Silly function whose time increases as n does, it increases linearly."""
    for i in range(n):
        if i % 2 == 0:
            sleep(0.5)


def t2(n):
    """A somewhat CPU-intensive task."""
    for i in range(n):
        hashlib.pbkdf2_hmac("sha256", b"password", b"salt", 100000)


def do_work(n):
    """Function that does t1 and t2 in serial."""
    start = time()
    t1(n)
    t2(n)
    end = time()
    print("Work for {} finished in {}s".format(n, round(end - start, 2)))


def serial():

    start = time()
    for x in range(10):
        do_work(x)
    end = time()
    print("All work finished in {}s".format(round(end - start, 2)))


def parallel():
    start = time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        inputs = range(10)
        for x, result in zip(inputs, executor.map(do_work, inputs)):
            pass
    end = time()
    print("All work finished in {}s".format(round(end - start, 2)))

In [27]:
serial()

Work for 0 finished in 0.0s
Work for 1 finished in 0.59s
Work for 2 finished in 0.66s
Work for 3 finished in 1.26s
Work for 4 finished in 1.4s
Work for 5 finished in 1.93s
Work for 6 finished in 2.05s
Work for 7 finished in 2.58s
Work for 8 finished in 2.69s
Work for 9 finished in 3.25s
All work finished in 16.42s


In [28]:
parallel()

Work for 0 finished in 0.0s
Work for 1 finished in 0.62s
Work for 2 finished in 0.71s
Work for 3 finished in 1.27s
Work for 4 finished in 1.35s
Work for 5 finished in 1.95s
Work for 6 finished in 2.05s
Work for 7 finished in 2.73s
Work for 8 finished in 2.86s
Work for 9 finished in 3.26s
All work finished in 5.91s


### N¬∫

In [29]:
import multiprocessing as mp

print("Number of processors: ", mp.cpu_count())

Number of processors:  4


In [30]:
import os

os.cpu_count()

4

### Ejercicio

* Escribe un script que identifique todas las im√°genes de un √°rbol de carpetas.
* Debemos obtener una lista con todas las rutas de archivo de las imag√©nes.
* Crear una funci√≥n que covierta una imagen a 128x128. Usar la librer√≠a **Pillow**, ya viene instalada en vuestra distribuci√≥n de Anaconda creo.

```python
import PIL
````

* Tras convertir una imagen, todas deben estar guardadaes en una misma carpeta. Por ejemplo al final habr√° una carpeta que se llame "miniaturas" que contendr√° todas las im√°genes convertidas.
* Cada imagen debe convertirla en un thumbnail (128x128) y guardarlas en una misma carpeta.
* Cuando guardemos la imagen debemos guardarla con su nombre original a√±adiendo "_thumbnail".
    Por ejemplo `imagen.jpg` -> `imagen_thumbnail.jpg`

Intentar usar un f-string para el path `(f"carpeta/{}_{}.jpg")`.

* **Importante**: una vez tengamos la lista con todas nuestras rutas de archivo. Hay que usar procesamiento en paralelo para convertir las im√°genes. Por ejemplo un ThreadPoolExecutor o ProcessPoolExecutor.

**Extra**:

En el m√≥dulo `functools` de Python existe una cosa que se llama `partial`. Esta funci√≥n nos permite crea lo que se llaman funciones parciales. Si tenemos una funci√≥n que por ejemplo acepta 3 argumentos, crear una funci√≥n parcial ser√≠a *"duplicar"* est√° funci√≥n pero haciendo que uno de estos par√°metros sea fijo. Y obtendr√≠amos una funci√≥n. Por ejemplo:

* Tengo una funci√≥n: `convertir_miniatura(resolucion, ruta)`
* Puedo hacer `miniatura128 = partial(convertir_miniatura, 128)`.
* Esto √∫ltimo me devolver√≠a otra funci√≥n, que ahora puedo utilizar directamente con: `miniatura128("/Users/r/.../imagen.jpg")`. Tendremos a nuestra disposici√≥n una nueva funci√≥n que es igual que la original pero como si uno de sus par√°metros estuviera fijo.


`functools.partial` + executors Pillow + paths (download images)


Ejercicio adaptado de: https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python

### Ejercicio pistas

In [46]:
import os

filelist = []

for _, _, _ in os.walk("tree"):
    # os.walk itera sobre 3 par√°metros, cuales son?
    # hacer algo
    pass

In [1]:
from pathlib import Path

p = Path("tree/fdjtoupvvurxgrd.jpg")

In [2]:
p.absolute()  # <<-- es una funci√≥n, hay que poner ()

PosixPath('/Users/r/Projects/courses/python/avanzado/entregable/tree/fdjtoupvvurxgrd.jpg')

In [3]:
p.name  # es un m√©todo, NO hay que poner ()

'fdjtoupvvurxgrd.jpg'

In [4]:
p.stem  # es un m√©todo, NO hay que poner ()

'fdjtoupvvurxgrd'

In [5]:
p.suffix

'.jpg'

In [17]:
nuevo_nombre = p.stem + "_thumbnail" + p.suffix

In [13]:
miniaturas = Path("miniaturas/")

In [51]:
miniaturas/nuevo_nombre

PosixPath('miniaturas/848_thumbnail.pdf')

In [9]:
import os
from pathlib import Path

In [11]:
lista_de_paths = []
for root, dirs, files in os.walk("tree"):
    lista_de_paths = lista_de_paths + [os.path.join(root, x) for x in files if x.endswith((".jpg"))]

In [17]:
[str(ruta.absolute()) for ruta in Path("tree").rglob("*.jpg")]

['/Users/r/Projects/courses/python/avanzado/entregable/tree/jkrlyxsbonbrdjb.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/qynjddmvbflfmfl.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/pjeyfssssqovicw.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/rvqjyhrsggnkgmf.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/mtshvhafwuveczn.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/hriejwwwxnlslnr.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/ufufkvakojndran.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/rxvjgoudtzlgska.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/fdjtoupvvurxgrd.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/tjbcluxsujxgcnw.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/msqfostmedkgorc.jpg',
 '/Users/r/Projects/courses/python/avanzado/entregable/tree/E5E2Q06R/ogdottndbujeyrq.jpg',
 '/User

In [20]:
filelist = lista_de_paths.copy()

In [5]:
from pathlib import Path
import PIL
from PIL import Image

filelist = [str(ruta.absolute()) for ruta in Path("tree/").rglob("*jpg")]


def miniaturizar(path):
    size = (128, 128)  # 128x128
    p = Path(path).absolute()
    nuevo_nombre = p.stem + "_thumbnail" + p.suffix
    miniaturas = Path("miniaturas/").absolute()
    save = miniaturas / nuevo_nombre
    image = Image.open(p)
    image.thumbnail(size)
    image.save(save)


from concurrent.futures import ProcessPoolExecutor
import os

max_nucleos = os.cpu_count()

with ProcessPoolExecutor(max_workers=max_nucleos) as executor:
    executor.map(miniaturizar, filelist)

In [22]:
%time
for ruta in filelist:
    miniaturizar(ruta)

CPU times: user 4 ¬µs, sys: 0 ns, total: 4 ¬µs
Wall time: 7.87 ¬µs


In [23]:
%time
from concurrent.futures import ProcessPoolExecutor
import os

# with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# n¬∫ maximo de cpus lo calcula solo ^^^^^^^^^^^^^
with ProcessPoolExecutor(max_workers=4) as executor:
    executor.map(miniaturizar, filelist)

CPU times: user 3 ¬µs, sys: 0 ns, total: 3 ¬µs
Wall time: 5.96 ¬µs


### Alternativa

Usando una [funci√≥n parcial](https://docs.python.org/3/library/functools.html#functools.partial)

In [2]:
import os

In [3]:
filelist = []
for root, dirs, files in os.walk("res"):
    filelist = filelist + [os.path.join(root, x) for x in files if x.endswith((".jpg"))]

In [29]:
import logging
from pathlib import Path
from time import time
from functools import partial

from concurrent.futures import ProcessPoolExecutor

from PIL import Image

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

logger = logging.getLogger(__name__)


def create_thumbnail(img, size, save):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension. E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    path = img
    print("Current image:", path)
    print("Save path:", save)
    print("Size:", size)
    path = Path(path)
    name = path.stem + "_thumbnail" + path.suffix
    thumbnail_path = Path(save) / name
    image = Image.open(path)
    image.thumbnail(size)
    image.save(thumbnail_path)

In [31]:
thumbnail_128 = partial(create_thumbnail, size=(128, 128), save="save")

In [None]:
# aplicado a una image
thumbnail_128(img=filelist[0])

In [32]:
ts = time()
# Partially apply the create_thumbnail method, setting the size to 128x128
# and returning a function of a single argument.


# Create the executor in a with block so shutdown is called when the block
# is exited.
with ProcessPoolExecutor() as executor:
    executor.map(thumbnail_128, filelist)


logging.info("Took %s", time() - ts)

Current image: res/materials/opencv-color-spaces/images/nemo5.jpg
Current image: res/materials/opencv-color-spaces/images/nemo0.jpg
Current image: res/materials/opencv-color-spaces/images/nemo4.jpg
Current image: res/materials/opencv-color-spaces/images/nemo1.jpg
Save path: save
Save path: save
Save path: save
Save path: save
Size: (128, 128)
Size: (128, 128)
Size: (128, 128)
Size: (128, 128)
Current image: res/materials/opencv-color-spaces/images/nemo3.jpg
Save path: save
Size: (128, 128)
Current image: res/materials/opencv-color-spaces/images/nemo2.jpg
Save path: save
Size: (128, 128)
Current image: res/advanced_python/Jupyter_to_Software.pdf
Current image: res/advanced_python/functions/exercises_and_solutions.pdf
Save path: save
Save path: save
Current image: res/advanced_python/functions/exercises.pdf
Size: (128, 128)
Save path: save
Current image: res/ngcm_pandas_2017/notebooks/images/ebola.jpg
Size: (128, 128)
Save path: save
Size: (128, 128)
Current image: res/ngcm_pandas_2017/n

2019-11-26 12:10:25,795 - root - INFO - Took 0.8244948387145996


[Donald Knuth](https://en.wikipedia.org/wiki/Donald_Knuth): ‚ÄúPremature optimization is the root of all evil (or at least most of it) in programming.‚Äù

M√°s info y fuentes:

* https://realpython.com/python-concurrency/ üëàüèº
* https://realpython.com/intro-to-python-threading/
* https://www.youtube.com/watch?v=9zinZmE3Ogk
* https://pybay.com/site_media/slides/raymond2017-keynote/index.html
* https://realpython.com/async-io-python/  üëàüèº lectura recomendada, async es un tema complejo y tiene su curva de apredizaje
* https://realpython.com/intro-to-python-threading/
* https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python
* https://stackoverflow.com/questions/49005651/how-does-asyncio-actually-work/51116910#51116910
* https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/
* https://stackabuse.com/python-async-await-tutorial/