### Python concurrency cheat sheet

#### Concurrent Pattern 1, parallel execution of tasks
* the total amount of work is known prior to execution
* no state sharing between tasks 

In [38]:
import asyncio
import time
from typing import List
import concurrent.futures
import random


def run_task(name: str):
    time.sleep(random.random())
    return f'{name} done'

async def run_task_async(name: str):
    await asyncio.sleep(random.random())
    return f'{name} done'

tasks = [str(i) for i in range(5)]

In [39]:
def batch_futures_threaded(tasks: List[str]):
    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        futures = []
        for task_name in tasks:
            futures.append(executor.submit(run_task, task_name))
        
        for f in concurrent.futures.as_completed(futures): # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future
            print(f.result())

batch_futures_threaded(tasks)

0 done
1 done
3 done
2 done
4 done


In [40]:
async def batch_futures_asyncio(tasks: List[str]):
    coroutines = [run_task_async(task_name) for task_name in tasks]
    for future in asyncio.as_completed(coroutines):
        print(await future)

await batch_futures_asyncio(tasks)

3 done
4 done
1 done
2 done
0 done


In [41]:
async def gather_asyncio(tasks: List[str]):
    coroutines = [run_task_async(task_name) for task_name in tasks]
    return await asyncio.gather(*coroutines)

print(await gather_asyncio(tasks))

['0 done', '1 done', '2 done', '3 done', '4 done']


In [42]:
import queue
from threading import Thread

def manual_gather_threading(tasks: List[str]):
    def task_wrapper(task_name: str, q: queue.Queue):
        result = run_task(task_name)
        q.put(result)

    q = queue.Queue()
    threads = [Thread(target=task_wrapper, args=(task_name,q)) for task_name in tasks]
    [t.start() for t in threads]
    [t.join() for t in threads]
    
    print_q = []
    while not q.empty():
        print_q.append(q.get())
        q.task_done()
    return print_q

manual_gather_threading(tasks)

['2 done', '4 done', '1 done', '3 done', '0 done']

#### Pattern: Crawl / Producer-Consumer

In [9]:
class PerfectBinaryTree:
    def __init__(self, depth=0):
        self.arr = list(range(2 ** (depth + 1) - 1))

    def get_children(self, node):
        if node < len(self.arr)//2:
            return [2*node+1, 2*node+2]
        return []

In [26]:
from threading import Lock
import time, asyncio, queue, concurrent.futures, random


class AtomicCounter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0

    def increment(self):
        with self.lock:
            self.count += 1
    
    def value(self):
        result = 0
        with self.lock:
            result = self.count
        return result


def run_task(node: int, t: PerfectBinaryTree):
    time.sleep(random.random())
    return t.get_children(node)

async def run_task_async(node: int, t: PerfectBinaryTree):
    await asyncio.sleep(random.random())
    print(f'{node} done')
    return t.get_children(node)

t = PerfectBinaryTree(4)

In [24]:
def run_traverse_tree_threaded(t: PerfectBinaryTree):
    def feedback_wrapper(node: int, q: queue.Queue, t: PerfectBinaryTree, ac: AtomicCounter):
        results = run_task(node, t)
        ac.increment()
        for r in results:
            q.put(r)

    ac = AtomicCounter()
    q = queue.Queue()
    q.put(0)

    futures = []
    with concurrent.futures.ThreadPoolExecutor(10) as executor:
        while True:
            try:
                node = q.get(timeout=3)
                f = executor.submit(feedback_wrapper, node, q, t, ac)
                futures.append(f)
                q.task_done()
            except queue.Empty:
                break
        q.join()
    print(ac.value())
    return futures
futures = run_traverse_tree_threaded(t)


31


In [35]:
async def run_traverse_tree_asyncio(t: PerfectBinaryTree):
    q = [0]
    while q:
        coroutines = [run_task_async(node, t) for node in q]
        gathered = await asyncio.gather(*coroutines)
        children = [node for output in gathered for node in output]
        q = children

await run_traverse_tree_asyncio(t)
        

0 done
1 done
2 done
5 done
4 done
6 done
3 done
14 done
11 done
12 done
9 done
7 done
8 done
13 done
10 done
19 done
30 done
29 done
26 done
20 done
24 done
16 done
15 done
28 done
22 done
27 done
17 done
21 done
25 done
23 done
18 done
