# Simple concurrency examples

## Two tasks sharing some data

### Using threads

In [None]:
import threading
import time
import sys

a = ''

def task_1():
    global a
    for i in range(10):
        print('o', end='', flush=True)
        a += 'o'
        print(a)
        time.sleep(1) # Blocking -> yield to other thread
        
def task_2():
    global a
    for i in range(20):
        print('O', end='', flush=True)
        a += 'O'
        print(a)
        time.sleep(0.6) # Blocking -> yield to other thread
        
thread_1 = threading.Thread(target=task_1)
thread_2 = threading.Thread(target=task_2)

thread_1.start()
thread_2.start()
print("(Both threads have started)")

thread_1.join() # Wait for thread_1 to finish
thread_2.join()
print("\nBoth threads have finished")

print(a)

 Notice that both threads share the same process memory space.

### Using processes

In [None]:
# This code does not work ...

import multiprocessing
import time
import sys

a = '' # ... because each process has its own 'a'

def task_1():
    global a
    for i in range(10):
        print('o', end='', flush=True)
        a += 'o'
        print(a)
        time.sleep(1) # Blocking -> yield to other process
        
def task_2():
    global a
    for i in range(20):
        print('O', end='', flush=True)
        a += 'O'
        print(a)
        time.sleep(0.6) # Blocking -> yield to other process
        
process_1 = multiprocessing.Process(target=task_1)
process_2 = multiprocessing.Process(target=task_2)

process_1.start()
process_2.start()
print("(Both processes have started)")

process_1.join()
process_2.join()
print("\nBoth processes have finished")

print(a)

#### But ... why `a` has not been modified? Why the processed do not share `a`?

By definition, processes must [__fork__](https://en.wikipedia.org/wiki/Fork_(system_call) (make a copy of itself, that is, the code and the used memory) before start running. In the previous example, the Python interpreter forks twice and the two childs are run in parallel while the parent process waits for their completition. Neither, the child processes nor the parent process share their global state (where `a` is defined).

### How to share data between processes?

There are several options. One of them is to use a [shared memory `Manager()`](https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes):

In [None]:
import multiprocessing
import time
import sys
import ctypes

def task_1(a):
    for i in range(10):
        print('o', end='', flush=True)
        a.value += 'o'
        time.sleep(1) # Blocking -> yield to other process
        
def task_2(a):
    for i in range(20):
        print('O', end='', flush=True)
        a.value += 'O'
        time.sleep(0.6) # Blocking -> yield to other process

manager = multiprocessing.Manager()
# See https://docs.python.org/3/library/ctypes.html#module-ctypes
a = manager.Value(ctypes.c_char_p, "")    

process_1 = multiprocessing.Process(target=task_1, args=(a,))
process_2 = multiprocessing.Process(target=task_2, args=(a,))

process_1.start()
process_2.start()
print("(Both processes have started)")

process_1.join()
process_2.join()
print("\nBoth processes have finished")

print(a.value)

### Using coroutines

In [13]:
import time
import sys

a = ''

def task_1():
    global a
    for i in range(20):
        print('o', end='', flush=True)
        a += 'o'
        yield
        time.sleep(0.5)
        
def task_2():
    global a
    for i in range(20):
        print('O', end='', flush=True)
        a += 'O'
        yield
        time.sleep(0.25)

t1 = task_1()
t2 = task_2()
        
for i in range(20):
    t1.__next__()
    t2.__next__()

print("\na =", a)

oOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoO
a = oOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoO


## A pipeline

### Using threads

In [33]:
import threading
import queue 

intermediate_queue = queue.Queue()
consumer_queue = queue.Queue()

def producer():
    for i in range(10):
        intermediate_queue.put(i) # Queue is thread safe
        print("produced", i)
    print("producer: done")

def intermediate():
    while True:
        i = intermediate_queue.get()
        consumer_queue.put(i+1)
        if i == 9:
            break
    print("intermediate: done") 

def consumer():
    while True: 
        i = consumer_queue.get()
        print("consumed", i)
        if i == 10:
            break
    print("consumer: done") 

p = threading.Thread(target=producer)
i = threading.Thread(target=intermediate)
c = threading.Thread(target=consumer)

p.start()
i.start()
c.start()

produced 0
produced 1
producedconsumed 1
consumed 2
consumed 3
 2
produced 3
produced 4
produced 5
produced 6
produced 7
produced 8
produced 9
producer: done
intermediate: done
consumed 4
consumed 5
consumed 6
consumed 7
consumed 8
consumed 9
consumed 10
consumer: done


### Using processes

In [34]:
import multiprocessing

intermediate_queue = multiprocessing.Queue()
consumer_queue = multiprocessing.Queue()

def producer():
    for i in range(10):
        intermediate_queue.put(i) # Queue is thread safe
        print("produced", i)
    print("producer: done")

def intermediate():
    while True:
        i = intermediate_queue.get()
        consumer_queue.put(i+1)
        if i == 9:
            break
    print("intermediate: done") 

def consumer():
    while True: 
        i = consumer_queue.get()
        print("consumed", i)
        if i == 10:
            break
    print("consumer: done") 

p = multiprocessing.Process(target=producer)
i = multiprocessing.Process(target=intermediate)
c = multiprocessing.Process(target=consumer)

p.start()
i.start()
c.start()

produced 0
produced 1
consumed 1
produced 2
consumed 2
produced 3
produced 4
consumed 3
produced 5
produced 6
consumed 4
produced 7
consumed 5
produced 8
consumed 6
produced 9
consumed 7
producer: done
intermediate: done
consumed 8
consumed 9
consumed 10
consumer: done


### Using coroutines

In [35]:
def producer(next_task):
    for i in range(10):
        next_task.send(i) 
        print("produced", i)
    next_task.close()
    print("producer: done")

def intermediate(next_task): 
    try: 
        while True: 
            i = (yield)
            next_task.send(i+1)
    except GeneratorExit: 
        print("intermediate: done") 

def consumer():
    try: 
        while True: 
            i = (yield) 
            print("consumed", i) 
    except GeneratorExit: 
        print("consumer: done") 

c = consumer()
c.__next__() # Advance until the first yield
i = intermediate(c)
i.__next__() # Advance until the first yield
producer(i)

consumed 1
produced 0
consumed 2
produced 1
consumed 3
produced 2
consumed 4
produced 3
consumed 5
produced 4
consumed 6
produced 5
consumed 7
produced 6
consumed 8
produced 7
consumed 9
produced 8
consumed 10
produced 9
intermediate: done
producer: done
