# Advanced Python - Building Scalable Applications

### Module 4

#### Synchronization mechanisms for threads and processes
 - Synchronizing flow-control using ```Barrier```
 - Mutual exclusion patterns using ```Lock``` and ```RLock```
 - Wait/notify patterns using ```Condition``` and ```Event```
 - Bandwidth/Resource management and control using ```Semaphore``` and ```BoundedSemaphore```
 - Producer/Consumer patterns using ```Queue```
 - Using ```threading.local()``` to manage thread-local data

#### Sharing and Exchanging data between processes (Overview)
 - Streaming data using ```Pipe``` and ```Queue```
 - Sharing counters and buffers using ```Value``` and ```Array```
 - Sharing python lists and dictionaries using ```Manager```
 - Creating and managing shared memory using ```multiprocessing.shared_memory``` features


In [4]:
a = [4, 7, 13, 127, 17, 44, 32, 38]
print(list(map(lambda x: x*x, a)))
print([ x*x for x in a])

print(list(filter(lambda x: x % 2, a)))
print([ v for v in a if v % 2])

[16, 49, 169, 16129, 289, 1936, 1024, 1444]
[16, 49, 169, 16129, 289, 1936, 1024, 1444]
[7, 13, 127, 17]
[7, 13, 127, 17]


In [5]:
def is_prime(n):
    for i in range(2, int(n ** 0.5 ) + 1):
        if n % i == 0:
            return False
    return True  
    

In [8]:
a = [4, 7, 13, 127, 17, 44, 32, 38]

list(map(is_prime, a))
list(filter(is_prime, a))

[7, 13, 127, 17]

In [12]:
a = [4, 7, 13, 127, 17, 44, 32, 38]

b = list(map(is_prime, a))
print(a, b, sep="\n")
[ x for x, y in zip(a, b) if y]


[4, 7, 13, 127, 17, 44, 32, 38]
[False, True, True, True, True, False, False, False]


[7, 13, 127, 17]

In [None]:
def sfilter(fn, data):
    r = map(fn, data)
    return [ x for x, y in zip(data, r) if y]
    
def pfilter(fn, data, nworkers):
    pass # TODO: implement a parallel map + filter and return the resultant list

In [21]:
import random

random.sample(range(2, 20), 10)

[9, 15, 16, 3, 5, 8, 12, 17, 4, 11]

In [22]:
from threading import Barrier
b = Barrier(2)

In [25]:
b.wait(timeout=5)

BrokenBarrierError: 

NOTES about Lock and RLock:
  - Both Lock and RLock follow strict mutex discipline
    That is, when a thread/process acquires a Lock successfully, it becomes the owner.
    Now, *only* the owner can release a lock.
    Therefor - A thread/process cannot release a lock that it did not acquire

 - The difference between Lock and RLock:
    - The owner can acquire an RLock multiple times. But the same owner must release the lock as many times it acquired, for threads to successfully acquire the lock.
    - If a Lock is attempted to be acquired multiple times by the same thread - it will deadlock (self-referential dead-lock)
    

In [28]:
from threading import current_thread

In [35]:
from threading import RLock

rl = RLock()
print(rl)

rl.acquire()
print(rl)

rl.acquire()
print(rl, rl._is_owned(), rl._recursion_count())
rl.release()
print(rl)

rl.release()
print(rl)
rl.release()


<unlocked _thread.RLock object owner=0 count=0 at 0x118a3fac0>
<locked _thread.RLock object owner=8662894656 count=1 at 0x118a3fac0>
<locked _thread.RLock object owner=8662894656 count=2 at 0x118a3fac0> True 2
<locked _thread.RLock object owner=8662894656 count=1 at 0x118a3fac0>
<unlocked _thread.RLock object owner=0 count=0 at 0x118a3fac0>


RuntimeError: cannot release un-acquired lock

In [30]:
rl._is_owned()

True

In [None]:
a = [1, 2, 3]
a[0] += 1  # This operation is not atomic, must be scaffolded within a Lock for multiple threads



### Events
Used notifying "state-changes" amongst threads.

In [41]:
from threading import Event

e = Event()   # Initialize a new event and set it to False
print(e.is_set())

e.set()
print(e.is_set())

e.set()
print(e.is_set())

e.set()
print(e.is_set())

e.clear()
print(e.is_set())

print(e.wait(timeout=5))
print(e.is_set())


False
True
True
True
False
False
False


In [42]:
from threading import Thread, current_thread as current, Event

stats = { }
stats_event = Event()

def read_stats():
    stats_event.wait()
    for k, v in stats.items():
        print(f"{current().name}: {k=}, {v=}")

def update_stats():
    stats["test"] = 100
    stats["temp"] = "temp data"
    stats_event.set()


t1 = Thread(target=read_stats)
t2 = Thread(target=read_stats)
t3 = Thread(target=update_stats)

t1.start()
t2.start()


In [43]:
t3.start()

Thread-118 (read_stats): k='test', v=100Thread-119 (read_stats): k='test', v=100
Thread-119 (read_stats): k='temp', v='temp data'

Thread-118 (read_stats): k='temp', v='temp data'


In [47]:
read_stats()

MainThread: k='test', v=100
MainThread: k='temp', v='temp data'


In [48]:
from threading import Thread, Event
from time import sleep

run_counter = Event()

def simple_counter():
    import itertools
    for i in itertools.count():
        if run_counter.wait():
            print("Counting", i)
            sleep(2)

t = Thread(target=simple_counter)
t.start()

In [53]:
run_counter.set()

Counting 17


Counting 18
Counting 19
Counting 20
Counting 21
Counting 22


In [54]:
run_counter.clear()

#### Semaphore
Used for limit band-width / access to a resource to a limited number of threads (based on a count)

A semaphore is initialized with a count (that acts as a limit)

A semaphore can be acquired initially upto the 'count' number of times (based on the initial count)

But a semaphore can be released as many number of times irrespective of its 'count'

Ultimately, a semaphore can be acquired at-least as many times it was last released.

This kind of a semaphore is often times referred to - as an "unbounded" semaphore.

#### There are three types of Semaphores:
  1. Counting Semaphore (semaphore's initial count > 1)
  2. Binary Semaphore (semaphore's initial count == 1)
  3. Null Semaphore (semaphore's initial count == 0)

#### Demonstration of ```queue.Queue```

In [55]:
from queue import Queue
from threading import Thread, Event, current_thread as current
from time import sleep

cancel = Event()

def reader(q):
    t = current()
    while not cancel.wait(5):
        print(f"{t.name}: Waiting on queue.")
        v = q.get()
        print(f"{t.name}: Processing {v}")


workers = []
queue = Queue(10)
for i in range(5):
    w = Thread(target=reader, args=(queue,))
    workers.append(w)
    w.start()


Thread-189 (reader): Waiting on queue.Thread-192 (reader): Waiting on queue.
Thread-193 (reader): Waiting on queue.
Thread-190 (reader): Waiting on queue.

Thread-191 (reader): Waiting on queue.


In [None]:
queue.put(100)

Thread-193 (reader): Processing 100


Thread-189 (reader): Waiting on queue.
Thread-191 (reader): Waiting on queue.
Thread-192 (reader): Waiting on queue.
Thread-193 (reader): Waiting on queue.
