### This notebook is from a linkedin class on python concurrent programming

### sequential/serial execution
* program execute a series of instructions sequentially
* one instruction is executed at any give moment
* speed of the pogram is limited by cpu and how fast it can execute that series of instructions

### parallel programming
* with multiple processes, the instructions can be broken down into independent parts and executed simultaneously by different processes
* components that depend on all parts need the coordinations between the different parts.
* extra complexity is added to coordinate the actions, so the processing speed is not linear with the number of processors.
* parallel execution increases throughput by
  * accomplish a single task faster
  * accomplish more tasks in a given time
  * scale of the problem that can solve. Big computational tasks have to rely on parallel programming to save time, which outweights the cost of added hardware 
  
### multiprocessor architectures
* Flynn's taxonomy (4 classes of computer architecture based on number of concurrent instruction/control streams and number of data streams
  * SISD (single instruction single data)
    + sequential computer with a single processor unit
    + one single instruction at any given moment
  * SIMD (single instruction multiple data)  
    + parallel computer with multiple processor units
    + execute the same instructions at any give momonet, but can operate on different data element
    + for example, both executing chopping, one on onion, one on carrot, and their operations are in sync
    + suitable for applications that perform the same handful of operations on a massive set of data elements, such as in image analysis. modern computers use GPU with SIMD instructions to do that
  * MISD (mutiple instruction, single data)
    + each processor unit independently execute its own separate series of instructions, but all of them are operating on the single stream of data.
    + not a commonly used architecture
  * MIMD (multiple instruction, multiple data)
    + multiple processor units. Every processor unit can process a different series of instructions
    + at the same time, each of those processors can be operating on a different set of data
    + most commonly used architecture in Flynn's taxonomy from multiple core pcs to network clusters in supercomputers.
    + separated further into two parallel programming models:
      + SPMD (single program, multiple data)
        + multiple porcessing units excute a copy of the same single program simultaneously.
        + they can each use a different data.
        + different from SIMD since in SIMD, processing units execute the same instruction at the same time. In SPMD, procssing units just execute the same program
        + the processes can run asynchronously and the program usually includes conditional logic that allows different tasks of the program to only execute the specific parts of the program
        + for example, two processors execute the same recipe, but can execute the different parts of the recipe
        + most common of parallel programming. using multiple processor computer to execute the same program as a MIMD architecture
      + MPMD (multiple program, multiple data)
        + Each processing unit is processing a different program.
        + processors execute indepently on different programs and may on different data. (a head/manager nodes with many worker nodes for function decomposition)
* another aspect to conside to categorize computer architectures is based on how memory is organized and how computer access data
  + memory opertes at a speed that is usually slower than processor speed.
  + when one processor is reading or writing to memory, it only prevents other processors to access that same memory element
  + two main memory architecures for parallel computing
    + shared memory
      + all processors access the same memory with global address space. Although each processor executes its own instructions independently, if one process changes a memory loaction, all the processors will see the change.
      + this doesn't mean all the data are on the same physical device. It could be spread across a cluster of systems. The key is all processors see everything happens in the shared memory space.
      + shared memory architectures have two categories based on how processors are connected to memory and how fast they can access the memory
      + easier to programming since it is easy to access data in shared memory
      + difficult to scale since adding more processors to a shared memory system increases the traffic on the shared memory bus and cost to main the cache coherency with communications between all the parts.
      + programmer is responsible to synchronize memory accesses to ensure correct behavior.
        + uniform memory access (UMA)
          + all processors have equal access to the memory and they can access it equally fast.
          + Symmetric multiprocessing system (SMP) is a typical UMA architecture.
            + two or more identical processor connected to a single shared memory through a system bus (processors connect to cache memory, which connects to system bus, which connects to manin memory, all connections are bi-directional)
            + each of processor core of computer or mobile phone is treated as a separate processor as a SMP architecture.
              + each core has its own cache as a small, very fast piece of memory that only it can see. The core uses it to store data it frequently works with.
              + the challenge is that if a processor copies a copy of data from shared memory and changes it in its local cache, the change needs to be updated back in the shared memory before another processor reads the old value. This issue is called cache coherency. It is handled by the hardware in multicore processors 
        + nonuniform memory access (NUMA)
          + physically connect multiple SMP systems (which is a UMA type architecture) together. The access is non-uniform because some processors will have quicker access to certain parts of the memory than others. (these SMP systems are connected by system bus, and are located on different positions of system bus. It takes longer to access the memory through the bus compared to shared memory within the same SMP). Overall, every processor can still see everything in memory. Note that there is no main memory. The architecture just connect SMP systems and therefore, their memory
    + distributed memory
      + each processor has its own memory with its own address space and there is no global address space. All processors are connected through some sort of network (such as an ethernet).
      + each processor operates independently. if it makes changes to its local memory, that change is not automatically reflected in the memory of other processors. 
      + it is up to programmer to explicitly define how and when data is communicated between the nodes. (difficult)
      + advantage of NUMA is it is scalable
        + adding more processors to the system, you get more memory. This makes it cost-effective to use commodity, off-the-shelf computers and network equipment to build large distributed memory systems. 

### Threads and processes
* process:
  + when a computer runs an application, that instance of the program executing is referred to as a process
    + includes code, data, and state information
    + independent instance of a running program
    + has its own, separate memory address and space
    + can have hundreds of processes at the same time and an operating system's job is to manage all of them
    + sharing resouces between processes will need to use inter-process communication(IPC)
      + sockets and pipes
      + shared memory
      + remote procedure calls
* within each process, there are one or more smaller sub-elements called threads
  + each thread is an independent path of execution through the program
  + a different sequence of instructions
  + only exists as part of a process (subset of a process)
  + basic unit that os manages. Os schedules threads for execution and allocates time on the processor to execute them.
  + threads of the same process share the process's address space so they can access to the same resources and memory, including code variables, and data, making it easy to work together.
  + sharing resources between processes is not as easy as sharing between threads in the same process.
  + threads are light-weight and require less overhead to create and terminate
  + operation system can switch between threads faster than processes  

### concurrency and parallel execution
* concurrency: ability of a program to be broken into parts that can be run indepently of each other. These parts can be executed out of order or partially out of order without impacting the result.
* independent tasks without multiple processors will be executed by switching back and forth between them, but only one task can be executed at a moment. This may give an illustion of parallel execution, but it is just concurrent execution since only one task is executed at a moment.
  + with multiple hardware, such as multiple processors, multiple tasks can be executed simultaneously, then we have parallel execution
* concurrency refers to the program structure that enables to deal with multiple things at once
* parallelism refers to siumultaneous execution that actually doing multiple things at once
* concurrent programming is useful for I/O dependent tasks. when a thread is waiting for I/O response, we can use another thread to accept user's input.
* parallel processing is useful for computational intensive tasks, such as matrix multiplication.

### concurrent python thread
* using threads to handle concurrent tasks in python is straightforward.
* pyhton interpreter will not allow concurrent threads to execute simultaneously and parallel due to GIL (global interpreter lock)
* Global interpreter lock is a mechanism that limits python to only execute one thread at a time when CPython is used as the interpreter
* GIL provide a simple way to provide thread-safe memory management for thread-safe operations.
* multi-thread is still useful for many I/O bound applications since GIL will not lock threads
* for CPU-bound applications, such as intensive computational tasks, GIL can negatively impact performance. 
  + we can implement parallel algorithms as external library functions such as C++ called by python functions.
  + you can also use python multiprocessing package to use multiple processors instead of multiple threads.
    + each process will have its a separate interpreter with its own GIL, so different processors can execute simultaneously
    + communcations between processors are more difficult than between threads
    + uses more resources compared to creating multiple threads
    

In [1]:
import os
import threading

# a simple function that wastes CPU cycles forever
def cpu_waster():
    while True:
        pass

# display information about this process
print('\n  Process ID: ', os.getpid())
print('Thread Count: ', threading.active_count())
for thread in threading.enumerate():
    print(thread)

print('\nStarting 12 CPU Wasters...')
for i in range(12):
    threading.Thread(target=cpu_waster).start()

# display information about this process
print('\n  Process ID: ', os.getpid())
print('Thread Count: ', threading.active_count())
for thread in threading.enumerate():
    print(thread)



  Process ID:  10944
Thread Count:  6
<_MainThread(MainThread, started 6764)>
<Thread(IOPub, started daemon 7740)>
<Heartbeat(Heartbeat, started daemon 1356)>
<ControlThread(Control, started daemon 8664)>
<HistorySavingThread(IPythonHistorySavingThread, started 10936)>
<ParentPollerWindows(Thread-4, started daemon 2876)>

Starting 12 CPU Wasters...

  Process ID:  10944
Thread Count:  18
<_MainThread(MainThread, started 6764)>
<Thread(IOPub, started daemon 7740)>
<Heartbeat(Heartbeat, started daemon 1356)>
<ControlThread(Control, started daemon 8664)>
<HistorySavingThread(IPythonHistorySavingThread, started 10936)>
<ParentPollerWindows(Thread-4, started daemon 2876)>
<Thread(Thread-5 (cpu_waster), started 3156)>
<Thread(Thread-6 (cpu_waster), started 10548)>
<Thread(Thread-7 (cpu_waster), started 7372)>
<Thread(Thread-8 (cpu_waster), started 7884)>
<Thread(Thread-9 (cpu_waster), started 6424)>
<Thread(Thread-10 (cpu_waster), started 3352)>
<Thread(Thread-11 (cpu_waster), started 9220)

### multiprocessing module
* for true parallel programming in python, we need to use multiprocessing rather than mutlithreading
* to use multiprocessing, we do the following from multithreads
  + import multiprocessing 
  + include all code inside __main__
  + replace 
  ```python
  threading.Thread(target=cpu_waster).start()
  ```
  to 
  ```python
  import multiprocessing as mp
  if __name__ == "__main__":
    for i in range(12):
        mp.Process(target=cpu_waster).start()
    
    for thread in threading.enumerate():
        print(thread)
  ``` 
* we need to include the mp process code inside main using the if statement because
  + mp.Process command will load the entire script to find out cpu_waster function and other dependencies
  + basically, each process will run the script. if we don't include the if condition that only main module can spawn new processes, the child processes will continue to spwan their child processes until the system crashes          
* The entire code snippet is attached in the following cell (if we don't include if conditions, line 22 will run forever)

In [None]:
""" Threads that waste CPU cycles """

import os
import threading
import multiprocessing as mp

# a simple function that wastes CPU cycles forever
def cpu_waster():
    while True:
        pass

print('Hi! My name is', __name__)
if __name__ == '__main__':
    # display information about this process
    print('\n  Process ID: ', os.getpid())
    print('Thread Count: ', threading.active_count())
    for thread in threading.enumerate():
        print(thread)

    print('\nStarting 12 CPU Wasters...')
    for i in range(12):
        mp.Process(target=cpu_waster).start()

    # display information about this process
    print('\n  Process ID: ', os.getpid())
    print('Thread Count: ', threading.active_count())
    for thread in threading.enumerate():
        print(thread)


### Scheduler
* Operating system function that assigns processes and threads to run on available CPUs
* scheduler makes it possible for multiple programs to run concurrently on a single processor
* when a process is created and ready to run, it gets loaded into memory and placed in the ready queue
* scheduler gets through the ready processes so they get a chance to execute on the processor
* if there are multiple processors, OS will schedule processes to run on each of them to make the most use of additional resources
* the following are some use cases that scheduler will do to processes:
  + a process will run until it finishes, and scheduler will assign another process on that processor
  + a process might get blocked and have to wait for an I/O event, which will go to a separate I/O waiting queue so another process can run
  + scheduler might determine that a process has spent its fair share of time on the processor, and swap it out for another process from the ready queue, which is called a context switch. In a context switch:
    + OS has to save the state or context of the process or thread that was running to resume them later
    + OS has to load the context of the new process or thread to run
    + context switch is not instaneous. it takes time to save and restore the registers and memory state, scheduler needs a strategy for how frequently it switches between processes.
* scheduling algorithms
  + first come, first served
  + shortest job next
  + priority
  + shortest remaining time
  + round robin
  + multiple-level queues
  + some of these algorithms are preemptive
    + meaning that lower priority processes will be paused or stopped when a high priority process enters the ready state.
    + non-preemptive algorithms allow a process to run once it is in running state, it is allowed to run for its alloted time
* scheduling goals
  + which algorithm to choose depends on the scheduling goals and different algorithms will be used by different OS
    + max throughput
    + max fairness
    + min wait time
    + min latency
  + your program should not rely on expected order of how multiple processes/threads will run
  + your program should not rely on that equal amount of time will be assigned to each process/thread    

In [2]:
""" Two threads chopping vegetables """

import threading
import time

chopping = True

def vegetable_chopper():
    name = threading.current_thread().getName()
    
    # set up local variable to count how many times the while loop executes
    vegetable_count = 0
    while chopping:
        print(f'{name} chopped a vegetable!\n')
        vegetable_count += 1
    print(f'{name} chopped {vegetable_count} vegetables.')

threading.Thread(target=vegetable_chopper, name='Barron').start()
threading.Thread(target=vegetable_chopper, name='Olivia').start()

time.sleep(0.01)    # chop vegetables for 1 second
chopping = False # stop both threads from chopping


Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped a vegetable!

Barron chopped

  name = threading.current_thread().getName()


Olivia chopped 656 vegetables.


The above code shows that
* it is unpredictable which thread will execute for how many times in the while loop
* variable chopping outside vegetable_chopper did control the while loop inside the function
* each thread's execution is chopped into multiple execution sections, which is sechduled by OS

### thread lifecycle
when a program or process starts, it will start as a single thread (main thread)
* main thread can start or spawn additional child threads as part of the same process, but execute independently to do other tasks
* child threads can spawn their child threads. When they finish the executing, they notify their parent threads and terminate
* main thread is usually the last thread to finish execution
* four states of a thread
  + new state 
    + when a new thread is spawned/created
    + the thread doesn't run and doesn't take any CPU resources
    + when creating the thread, a function is assigned to it for it to execute
    + some programming language requires to start a thread after creating it so that it will go to runnable state
  + runnable state
    + OS can schedule the thread to execute
    + through context switches, the thread can swap with a thread to go on one of the availabe processors
  + blocked
    + when a thread needs to wait for an event to occur, such as an external input or a timer, it goes to block state
    + thread will not use any CPU resources
    + OS will resume the thread by putting it on runnable state when the event it waits for occurs
    + when a thread needs to wait for other threads (e.g. its child threads) to complete their jobs, we use join()
      + wait until another thread completes its execution
      + when join() is called, the current thread goes to block state and wait for other thread's job is done
  + terminated 
    + when the thread finishes execution, it will notify its parent thread and goes to terminated state
    + when a thread is abnormally aborted also goes to terminated state

### code example
In the next cell, the code is to demonstrate the difference life cycle stage of threads
* Barron is the main thread, it spawn the child thread, Olivia
* Olivia thread is created by calling the __init__() method of Thread class when ChefOlivia is instantiated by main thread
* Olivia thread start (in runnable state) when main thread calls its start() method
* main thread waits for olivia thread by calling olivia's join() method from within the main thread
  + this block main thread's execution until olivia thread is terminated
* after olivia thread is done, main thread resumes the execution and finishes
* we can check if a thread is alive by calling its is_alive() method
  + when a thread is created, but not runnable, it is not alive
  + when a thread is in runnable state, even if it is sleep, it is alive
  + when a thread is terminated,it is not alive

In [4]:
""" Two threads cooking soup """

import threading
import time

class ChefOlivia(threading.Thread):

    def __init__(self):
        super().__init__()

    def run(self):
        print('Olivia started & waiting for sausage to thaw...')
        time.sleep(3)
        print('Olivia is done cutting sausage.')

# main thread

# start to create Olivia thread
print("Barron started & requesting Olivia's help.")
olivia = ChefOlivia()
print('  Olivia alive?:', olivia.is_alive())

# Olivia thread starts, and then blocked by time.sleep()
# scheduler switch to main thread, but Olivia thread is still alive
print('Barron tells Olivia to start.')
olivia.start()
print('  Olivia alive?:', olivia.is_alive())

# main thread gain the execution while Olivia thread is in time.sleep
print('Barron continues cooking soup.')
time.sleep(0.5)
print('  Olivia alive?:', olivia.is_alive())

# after 3 s, Olivia thread is put to runnable again to execute
print('Barron patiently waits for Olivia to finish and join...')
olivia.join()
print('  Olivia alive?:', olivia.is_alive())

print('Barron and Olivia are both done!')


Barron started & requesting Olivia's help.
  Olivia alive?: False
Barron tells Olivia to start.
Olivia started & waiting for sausage to thaw...
  Olivia alive?: True
Barron continues cooking soup.
  Olivia alive?: True
Barron patiently waits for Olivia to finish and join...
Olivia is done cutting sausage.
  Olivia alive?: False
Barron and Olivia are both done!


### Two ways to create threads in python
* create python threads with classes that inherits Thread class and overwrite its __init__() and run() methods
  + you should only override these two methods
  + you need to call the super().__init__() in the init method
```python
    class MyThreadClass(threading.Thread):
        def __init__(self):
            super().__init__()

        def run(self):
            print('Olivia started & waiting for sausage to thaw...')
            time.sleep(3)
            print('Olivia is done cutting sausage.')

    olivia = ChefOlivia()
    olivia.start()
```

* direct instantiate a Thread object and define the target as the function to run
```python
threading.Thread(target=vegetable_chopper, name='Barron').start()
```
        
    

### Daemon threads
* Garbage Collector
  * automatic memory management running in the background
  * reclaim memory no longer in use by program
* if a main thread spawn a child thread running in the background, when the main thread finishes, it can not exit because it has child threads still executing
  + to solve this problem, we can make the child thread as a daemon (background) thread
    + a daemon thread will not prevent a program/process from terminating if it is still running
    + by default, threads are created as non-daemon. you need to explicitly turn a thread to a daemon thread
    + a daemon thread is called detached from the main thread, it will abruptly stop when main thread exits
      + make sure daemon thread will not have negative impacts when it is prematurely exits
* the following cell shows a daemon thread code example
  + new threads will inherit daemon status from their parent
  + main thread is a normal thread, therefore, child threads spawned from main are normal threads
  + you set up the thread's daemon status (as shown in line 14 of the code) before staring the thread
  + when program ends, remaining daemon threads are abandoned

In [None]:
# demo of daemon threads
""" Barron finishes cooking while Olivia cleans """

import threading
import time

def kitchen_cleaner():
    while True:
        print('Olivia cleaned the kitchen.')
        time.sleep(1)

if __name__ == '__main__':
    olivia = threading.Thread(target=kitchen_cleaner)
    olivia.daemon = True
    olivia.start()

    print('Barron is cooking...')
    time.sleep(0.6)
    print('Barron is cooking...')
    time.sleep(0.6)
    print('Barron is cooking...')
    time.sleep(0.6)
    print('Barron is done!')


### Data Race
* two or more concurrent threads access the same memory loaction
* at least one thread is modifying it
* what happens when a thread updates a value
  1. read the value from memory location
  2. calculate and modify the value
  3. write the calculated value to the memory location
  + any thing happens to the value stored in memory between step 1 and 3 and before step 3 creates data race
  + a potential case is that two threads reads the memory location a the same time, and based on the read value, one thread updated the value, and then the other updates the value using the outdated value before the first thread's update
  + since the timing of thread schedule is not predictable, the result value in memory is not predictable and sometimes are incorrect. especially when there are a lot of updates happens
* the best way to prevent this is to pay attention whenever two or more threads access the same resources

In [None]:
# demonstration of data race
# two threads are writing to the same varialbe (garlic_count) 1 million times
# this creates unpredictable, incosistent results in variable value due to data race
# notice that even though there is only one increment statement, the program will need
# to read, calculate and then write the updated garlic_count value (the three actions are not atomic)
""" Two shoppers adding items to a shared notepad """

import threading

garlic_count = 0

def shopper():
    global garlic_count
    for i in range(10_000_000):
        garlic_count += 1

if __name__ == '__main__':
    barron = threading.Thread(target=shopper)
    olivia = threading.Thread(target=shopper)
    barron.start()
    olivia.start()
    barron.join()
    olivia.join()
    print('We should buy', garlic_count, 'garlic.')

### critical section and mutex(lock)
* critical section
  * a critical section is a code segment that accesses a shared resource such as a data structrue memory or external device and may not operate correctly when multiple threads access it
  * critical section needs to be protected to only allow one thread or process execute on it at a time
  * critical section should not be executed by more than one thread or process at a time
* mutex (lock)
  + mutex (lock) is a mchanism to implement mutual exclusion
  + only allow one thread or process to possess at a time
  + this limits access to critical section
  + when a thread is trying to acquire a lock and find the lock is already token, it will block/wait for it to be available
  + the critical sections (protected sections of code) should be as short as possible
* atomic operation
  + to process to acquire the lock is an atomic operation meaning
    + it executes as a single action, relative to other threads
    + cannot be interrupted by other concurrent threads
* in python, we use the lock object included threading package, as shown in the following cell of the demo code
  + the critical section in line 15 is protected by mutex. in line 14, the lock is aquired, and after the incrementation, released.
  + note that we only need to protect the shortest part of the code (garlic_count += 1) to make it atomic and protected

In [None]:
""" Two shoppers adding items to a shared notepad """

import threading
import time

garlic_count = 0
pencil = threading.Lock()

def shopper():
    global garlic_count
    for i in range(5):
        print(threading.current_thread().getName(), 'is thinking.')
        time.sleep(0.5)
        pencil.acquire()
        garlic_count += 1
        pencil.release()

if __name__ == '__main__':
    barron = threading.Thread(target=shopper)
    olivia = threading.Thread(target=shopper)
    barron.start()
    olivia.start()
    barron.join()
    olivia.join()
    print('We should buy', garlic_count, 'garlic.')


### Deadlock
* reentrant mutex
  * Deadlock: if a thread tries to aquire a lock it already has, then all processes and threads waiting for the lock are unable to continue executing. This is called a deadlock
  * When a program needs to lock a mutex multiple times before unlocing it. you should use reentrant mutex lock 
* a reentrant mutex can be locked multiple times by the same thread
* it record the times it has been locked, and it must be unlocked as many times as it was locked before another thread can unlock it
* one example: we have a function incrementCounter() that locks mutex, and myFunction() calls that function also use lock. The thread executing myFunction will aquire lock and can unlock it multiple times to release the lock

```python
def incrementCounter(){
    lock()
    counter++
    unlock()
}

def myFunction(){
    lock()
    incrementCounter()
    unlock()
}
```

* another use case for reentrant mutex is used by recursive functions that use lock. Therefore, the following are the same
  + reentrant mutex
  + reentrant lock
  + recursive mutex
  + recursive lock
* python's implementation of reentrant lock is RLock, as shown in the following cell
* difference between Lock and RLock in python
  + Lock can be released by a different thread than was used to aquire it
  + RLock must be released by the same thread that acquired it
    + in addition, it must be released the same number of times it was acquired

In [None]:
""" Two shoppers adding garlic and potatoes to a shared notepad """

import threading

garlic_count = 0
potato_count = 0
pencil = threading.RLock()

def add_garlic():
    global garlic_count
    pencil.acquire()
    garlic_count += 1
    pencil.release()

# add_potato calls add_garlic that repeatedly acquire and release lock
    def add_potato():
    global potato_count
    pencil.acquire()
    potato_count += 1
    add_garlic()
    pencil.release()

def shopper():
    for i in range(10_000):
        add_garlic()
        add_potato()

if __name__ == '__main__':
    barron = threading.Thread(target=shopper)
    olivia = threading.Thread(target=shopper)
    barron.start()
    olivia.start()
    barron.join()
    olivia.join()
    print('We should buy', garlic_count, 'garlic.')
    print('We should buy', potato_count, 'potatoes.')


### Try Lock
* When the thread has other tasks to do, it doesnot have to be blocked and wait for lock. The logic of try lock is:
  + it is a non-blocking lock/acquire method for mutex
  + try lock do the following:
    + If the mutex is available, acquires it and return True
    + otherwise, immediately return False so the thread can process other tasks
* in python, the non-blocking lock is implemented by setting blocking=False when acquiring lock, which returns a bool
  + while waiting for lock, the thread can execute the increment of items_to_add that improves the productivity

In [None]:
""" Two shoppers adding items to a shared notepad """

import threading
import time

items_on_notepad = 0
pencil = threading.Lock()

def shopper():
    global items_on_notepad
    name = threading.current_thread().getName()
    items_to_add = 0
    while items_on_notepad <= 20:
        if items_to_add and pencil.acquire(blocking=False): # add item(s) to shared items_on_notepad
            items_on_notepad += items_to_add
            print(name, 'added', items_to_add, 'item(s) to notepad.')
            items_to_add = 0
            time.sleep(0.3) # time spent writing
            pencil.release()
        else: # look for other things to buy
            time.sleep(0.1) # time spent searching
            items_to_add += 1
            print(name, 'found something else to buy.')

if __name__ == '__main__':
    barron = threading.Thread(target=shopper, name='Barron')
    olivia = threading.Thread(target=shopper, name='Olivia')
    start_time = time.perf_counter()
    barron.start()
    olivia.start()
    barron.join()
    olivia.join()
    elapsed_time = time.perf_counter() - start_time
    print('Elapsed Time: {:.2f} seconds'.format(elapsed_time))


### reader-writer lock
* commonly used lock locks all threads to access the critical sections, which is not efficient since threads only read from the section should be safe and shouldn't be blocked
* a reader-writer lock or shared mutex can be locked in two ways
  + locked in a shared read mode that allows multiple threads that only need simultaneous reads to lock it
  + or exclusive write that only allow one thread at a time to write to the resource
  + when switching between these two modes, a thread need to wait
* it is better to use reader-writer lock when there are lot more read threads than write threads 
* in the following cell of code
  + RWLockFair give equal priorities to read and write
  + gen_rlock()- generates a reader lock object
  + gen_wlock()- generates a writer lock object

In [None]:
""" Several users reading a calendar, but only a few users updating it """

import threading
from readerwriterlock import rwlock

WEEKDAYS = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
today = 0

marker = rwlock.RWLockFair()

def calendar_reader(id_number):
    global today
    read_marker = marker.gen_rlock()
    name = 'Reader-' + str(id_number)
    while today < len(WEEKDAYS)-1:
        read_marker.acquire()
        print(name, 'sees that today is', WEEKDAYS[today], '-read count:', read_marker.c_rw_lock.v_read_count)
        read_marker.release()

def calendar_writer(id_number):
    global today
    write_marker = marker.gen_wlock()
    name = 'Writer-' + str(id_number)
    while today < len(WEEKDAYS)-1:
        write_marker.acquire()
        today = (today + 1) % 7
        print(name, 'updated date to ', WEEKDAYS[today])
        write_marker.release()

if __name__ == '__main__':
    # create ten reader threads
    for i in range(10):
        threading.Thread(target=calendar_reader, args=(i,)).start()
    # ...but only two writer threads
    for i in range(2):
        threading.Thread(target=calendar_writer, args=(i,)).start()


### Deaklock
* in multiple threads with multiple locks system, deaklock occurs when each thread is waiting for another thread to take action. For example, to access a resource, each thread needs to acquire two locks. If each of the two threads acquires one lock, and then waiting for the other lock, then neigher of these two threads can make progress and they kare in deadlock.
* this is a common challenge when using mutex to protect critical sections of the code.
* we want program to be free of deadlock to guarantee liveness
* liveness:
  + properties that require a system to make progress
* deadlock sometimes happens, sometimes not. It is diffciult to identify
* one solution is to keep a priority list of the locks, and follow that order to acquire locks for all threads.
   + ensure locks are always taken in the same order by any thread
* another solution is to use lock timeout
  + put a timeout on lock attempts
  + if a thread can not acquire all locks within the time limit:
    + back up and free all locks taken
    + wait for a random amount of time
    + try again
* code in the following cell implements three threads acquiring locks in a consistent order with priorities a > b > c    

In [None]:
""" Three philosophers, thinking and eating sushi """

import threading

chopstick_a = threading.Lock()
chopstick_b = threading.Lock()
chopstick_c = threading.Lock()
sushi_count = 500

def philosopher(name, first_chopstick, second_chopstick):
    global sushi_count
    while sushi_count > 0: # eat sushi until it's all gone
        first_chopstick.acquire()
        second_chopstick.acquire()

        if sushi_count > 0:
            sushi_count -= 1
            print(name, 'took a piece! Sushi remaining:', sushi_count)

        second_chopstick.release()
        first_chopstick.release()
        
if __name__ == '__main__':
    threading.Thread(target=philosopher, args=('Barron', chopstick_a, chopstick_b)).start()
    threading.Thread(target=philosopher, args=('Olivia', chopstick_b, chopstick_c)).start()
    threading.Thread(target=philosopher, args=('Steve', chopstick_a, chopstick_c)).start()


### Abandoned lock
* when a program/thread acquires a lock and then terminates because unexpected reasons, it may not automatically release the lock
* other threads waiting for the lock will never acquire the lock
* the solution is to put the critical section access code in a try block and put the lock release code in a finally section
* the example code is shown in the following cell

In [None]:
""" Three philosophers, thinking and eating sushi """

import threading

chopstick_a = threading.Lock()
chopstick_b = threading.Lock()
chopstick_c = threading.Lock()
sushi_count = 500
some_lock = threading.Lock()

some_lock.acquire()
# try:
#     # do something...
# finally:
#     some_lock.release()
# 
# with some_lock:
#     #do something...

def philosopher(name, first_chopstick, second_chopstick):
    global sushi_count
    while sushi_count > 0: # eat sushi until it's all gone
        first_chopstick.acquire()
        second_chopstick.acquire()
        try:
            if sushi_count > 0:
                sushi_count -= 1
                print(name, 'took a piece! Sushi remaining:', sushi_count)
            if sushi_count == 10:
                print(1/0)
        finally:
            second_chopstick.release()
            first_chopstick.release()

if __name__ == '__main__':
    threading.Thread(target=philosopher, args=('Barron', chopstick_a, chopstick_b)).start()
    threading.Thread(target=philosopher, args=('Olivia', chopstick_b, chopstick_c)).start()
    threading.Thread(target=philosopher, args=('Steve', chopstick_a, chopstick_c)).start()


### Abandoned lock using context manager
* we can utilize python context manager to manage the lock release, as shown in the following cell
* as shown in line 13 and 14, the context manager manages the lock acquire and release.

In [None]:
""" Three philosophers, thinking and eating sushi """

import threading

chopstick_a = threading.Lock()
chopstick_b = threading.Lock()
chopstick_c = threading.Lock()
sushi_count = 500

def philosopher(name, first_chopstick, second_chopstick):
    global sushi_count
    while sushi_count > 0: # eat sushi until it's all gone
        with first_chopstick:
            with second_chopstick:
                if sushi_count > 0:
                    sushi_count -= 1
                    print(name, 'took a piece! Sushi remaining:', sushi_count)

                if sushi_count == 10:
                    print(1/0)

if __name__ == '__main__':
    threading.Thread(target=philosopher, args=('Barron', chopstick_a, chopstick_b)).start()
    threading.Thread(target=philosopher, args=('Olivia', chopstick_b, chopstick_c)).start()
    threading.Thread(target=philosopher, args=('Steve', chopstick_a, chopstick_c)).start()


### Starvation
* when a thread or process is perpetually denied the resources it needs. It can not access the resources it needs
* starvation happens when 
  + threads have different priorities compete for resources, and low priority threads will never access resources
  + there are too many threads competing (as shown in the following cell)

In [None]:
""" Three philosophers, thinking and eating sushi """

import threading

chopstick_a = threading.Lock()
chopstick_b = threading.Lock()
chopstick_c = threading.Lock()
sushi_count = 5000

def philosopher(name, first_chopstick, second_chopstick):
    global sushi_count
    sushi_eaten = 0
    while sushi_count > 0: # eat sushi until it's all gone
        with first_chopstick:
            with second_chopstick:
                if sushi_count > 0:
                    sushi_count -= 1
                    sushi_eaten += 1
                    print(name, 'took a piece! Sushi remaining:', sushi_count)
    print(name, 'took', sushi_eaten, 'pieces')

if __name__ == '__main__':
    for thread in range(50):
        threading.Thread(target=philosopher, args=('Barron', chopstick_a, chopstick_b)).start()
        threading.Thread(target=philosopher, args=('Olivia', chopstick_a, chopstick_b)).start()
        threading.Thread(target=philosopher, args=('Steve', chopstick_a, chopstick_b)).start()


### Livelock
* multiple theads or processors are actively responding to each other to resolve conflict, but that prevents them from making progress
* no threads will make progress because they give up their locks.
* to resolve that, make sure only one thread is taking action chosen by priority or other mechanisms like random selection
* different from deadlock, threads in livelock are actively executing without useful progress.
* in the following cell, the livelock is resolved by assigning a random number of seconds to sleep once a thread release its first lock once the second lock is not available. This gives other threads time to acquire both locks before it acquire its first lock again immediately after it release the frist lock




In [None]:
""" Three philosophers, thinking and eating sushi """

import threading
import time
from random import random

chopstick_a = threading.Lock()
chopstick_b = threading.Lock()
chopstick_c = threading.Lock()
sushi_count = 500

def philosopher(name, first_chopstick, second_chopstick):
    global sushi_count
    while sushi_count > 0: # eat sushi until it's all gone
        first_chopstick.acquire()
        if not second_chopstick.acquire(blocking=False):
            print(name, 'released their first chopstick.')
            first_chopstick.release()
            time.sleep(random()/10)
        else:
            try:
                if sushi_count > 0:
                    sushi_count -= 1
                    print(name, 'took a piece! Sushi remaining:', sushi_count)
            finally:
                second_chopstick.release()
                first_chopstick.release()

if __name__ == '__main__':
    threading.Thread(target=philosopher, args=('Barron', chopstick_a, chopstick_b)).start()
    threading.Thread(target=philosopher, args=('Olivia', chopstick_b, chopstick_c)).start()
    threading.Thread(target=philosopher, args=('Steve', chopstick_c, chopstick_a)).start()


### Synchronization
* The problem of mutex is that it doesn't signal other threads to synchronize actions of threads
* we can use another mechanism call condition variable
  + condition variable servers as a queue or container of threads waiting for a certain condition and be notified
  + condition variable is associated with a mutex
  + condition variable and mutex work together to implement a high level construct called a monitor
* monitor protects critical sections of code with mutual exclusion
  + it provides ability for threads to wait or block until a condition occurs, along with the mechanism to signal those threads when condition is met
* condition variables have three operations:
  + wait
    + before using a condition variable, a thread first acquired the mutex associated with the condition variable
    + it then checks if it is its turn to use the condition varialbe
    + if it is not, it will use the condition variable's wait operation
      + this automatically release lock on the mutex
      + then go sleep and enter waiting queue
      + wait for another thread to signal it to wake up and then reacquire lock
  + signal
    + after acquiring the mutex, if the thread finds it is its turn, it will access the critical section for its task
    + after it completes, it will use the signal operation to wake up a single thread from condition variable queue (waiting queue)
    + depending on the programming language, this wake up operation is also called notify or wake
    + the thread then release its lock on the mutex
  + broadcast
    + broadcast is similar to signal, the difference is that this operation wakes up all threads from condition variable queue
    + it is also called notify all or wake all
* you can use multiple condition variables to manage multiple threads especially when implementing a shared queue or buffer
  + if multiple threads can put items in a queue or take them out, it needs the following
    + mutex to ensure only one thread can add or remove items from the queue at a time
    + we can use two condition variables
      + BufferNotFull
        + this condition varialbe is used when a thread is trying to add an item to the queue, when it is already full
        + The thread will then wait in this condition variable so that it can be notified when the queue is not full
      + BufferNotEmpty
        + this condition variable is used when a thread is trying to remove an item from the queue, but it is empty
        + the thread will then wait in this condition variable to be notified when the queue is not empty
      + these two condition variables enables the thread to signal each other when state of queue changes
* the following cell contains the code for condition variable
  + a mutex name slowcooker_lid is created as the lock of condition variable (soup_taken)
  + soup_taken is the condition variable associated with the slowcooker_lid mutex
    + if not specified, a new RLock obj will be created and associated with the condition variable
  + in function hungry_persion
    + the thread executing the function first acquire the mutex managed by context manager
    + the thread then checks if it is its turn to access the soup_serving variable (critical section) using while loop
    + if while loop condition finds it is not the thread's turn, the thread calls the condition variable's wait() function
      + the current thread is in wait/block state
      + once it is notified by condition variable, it executes hungry_person function to acquire mutex and check condition
    + otherwise, if the soup_servings > 0, the thread access the critical section and decrease the value
    + the thread then notify all threads in the condition variable by calling its notify_all() method
    + the thread releases the mutex
  + the acquire and release lock processes are managed by context manager
  + the common pattern of this usage is the following:
  ```text
  lock.acquire()
  while not (some condition):
    condition_variable.wait()
  
  if the condition is True
  do something by accessing critical section
  
  lock.release()
  ```        
 + by using context manage,in python we can use    
  ```text  
 with lock:
   while not (some condition):
     condition_variable.wait()
     
   do something
 ```   
    

In [None]:
""" Two hungry people, anxiously waiting for their turn to take soup """

import threading

slowcooker_lid = threading.Lock()
soup_servings = 11
soup_taken = threading.Condition(lock=slowcooker_lid)

def hungry_person(person_id):
    global soup_servings
    while soup_servings > 0:
        with slowcooker_lid:
            while (person_id != (soup_servings % 5)) and (soup_servings > 0): # check if it's your turn
                print('Person', person_id, 'checked... then put the lid back.')
                soup_taken.wait()
            if (soup_servings > 0):
                soup_servings -= 1 # it's your turn; take some soup!
                print('Person', person_id, 'took soup! Servings left:', soup_servings)
                soup_taken.notify_all()

if __name__ == '__main__':
    for person in range(5):
        threading.Thread(target=hungry_person, args=(person,)).start()

### Producer-Consumer Pattern
+ Producer(s)
  + add elements to shared data structure
+ Consumer(s)
  + remove elements from shared data structure
+ the shared data structure is usually a queue
+ synchronization challenges
  + enforce mutual exclusion of producers and consumers to make sure only one thread can use the queue at a time
  + prevent producers from trying to add data to a full queue
  + prevent consumers from trying to remove data from an empty queue
  + some programming languages offer unbounded queues to provide unlimited capacity using linked list, but the capacity is still limited by memory
  + the rate of producers to add data to queue may not be consistent, such as burst of network packets
  + we want the average rate of production to be less than the average rate of consumption
* pipeline
  + a chain of processing elements basically consisting of a set of producers and consumers with queues in between
* python queue implemented the synchronization mechanisms for producer-consumer
* the code is shown in the next cell
  + use time.sleep() to simulate I/O bounded tasks in producer and consumer
  + use the queue.Queue class for multi-threading producer-consumer
  + add items to queue by put_nowait() method
  + remove items from queue by get() method

In [None]:
""" Producers serving soup for Consumers to eat """

import queue
import threading
import time

serving_line = queue.Queue(maxsize=5)

def soup_producer():
    for i in range(20): # serve 20 bowls of soup
        serving_line.put_nowait('Bowl #'+str(i))
        print('Served Bowl #', str(i), '- remaining capacity:', \
            serving_line.maxsize-serving_line.qsize())
        time.sleep(0.2) # time to serve a bowl of soup
    serving_line.put_nowait('no soup for you!')
    serving_line.put_nowait('no soup for you!')

def soup_consumer():
    while True:
        bowl = serving_line.get()
        if bowl == 'no soup for you!':
            break
        print('Ate', bowl)
        time.sleep(0.3) # time to eat a bowl of soup

if __name__ == '__main__':
    for consumer in range(2):
        threading.Thread(target=soup_consumer).start()
    threading.Thread(target=soup_producer).start()


### using multi-processing package for computation intensive tasks for producer-consumer
* see the code in the next cell
* this code use multiprocessing package for computation intensive tasks in consumer to use parallel programming
  + use Queue class from multiprocessing package
  + for producer, use the same method of put_nowait()
  + for consumer, use the same method of get()
  + need to transfer serving_line(Queue obj) between consumer and producer

In [None]:
""" Producers serving soup for Consumers to eat """

import queue
import multiprocessing as mp
import time

serving_line = mp.Queue(5)

def cpu_work(work_units):
    x = 0
    for work in range(work_units * 1_000_000):
        x += 1

def soup_producer(serving_line):
    for i in range(20): # serve 20 bowls of soup
        serving_line.put_nowait('Bowl #'+str(i))
        print('Served Bowl #', str(i), '- remaining capacity:', \
            serving_line._maxsize-serving_line.qsize())
        time.sleep(0.2) # time to serve a bowl of soup
    serving_line.put_nowait('no soup for you!')
    serving_line.put_nowait('no soup for you!')

def soup_consumer(serving_line):
    while True:
        bowl = serving_line.get()
        if bowl == 'no soup for you!':
            break
        print('Ate', bowl)
        cpu_work(4) # time to eat a bowl of soup
        
if __name__ == '__main__':
    for consumer in range(2):
        mp.Process(target=soup_consumer, args=(serving_line,)).start()
    mp.Process(target=soup_producer, args=(serving_line,)).start()


### Semaphore
* Semaphore is a synchronization mechanism similar to mutex
* different from mutex, it allows multiple threads to access the resources at the same time
* it includes a counter to track how many times it has been acquired and released by a counter
  + as long as the counter is positive, any thread can acquire the semaphore, which then decrements the counter
  + if the counter is zero, threads trying to acquire semaphore will be blocked and placed in a queue to wait until it is available
  + when a thread is done using the resource, it releases the resource, and increments the counter. Any threads waiting for semaphore will be signaled to wake up
+ types of semaphores
  + counting semaphore, where the count represents how many resoruces are available. It can be used to track limited resources (value >= 0)
    + pool of connections
    + how many items are in a queue
  + binary semaphore (value = 1 or 0)
    + 0 represents locked, and 1 represents unlocked
    +  can be used similar to mutex when threads acquire and release semaphore
    + different from mutex where it can only be acquired/released by the same thread, a semaphore can be acquired/released by different threads
    + the ablility to allow different threads to acquire/release makes semaphore useful to synchrnize actions between threads
  +  a pair of semaphores can be used to synchronize producer and consumer threads to add/remove items from a shared queue
    + one semaphore tracks the number of items in the buffer as fillCount
    + the other semaphore tracks the free spaces as emptyCount
    + to add an item to the buffer, the producer thread
      + first acquires the empty count if its value >0, and then decrements its value, 
      + then it pushes the item to the buffer, 
      + finally releases fillCount to increment its value, and signal other threads waiting for the semaphore
    + for consumer threads to take an item, it
      + first acquires fillCount semaphore if its value >0 and then decrements its value, 
      + then removes the item from the buffer, 
      + finally releases emptyCount semaphore which increments its value, and signal other threads waiting for the semaphore
    + producer and consumer threads each acquires a different semaphore in their first operations
      + when a consumer tries to remove an item from an empty buffer, it will first acquire fillCount, which is zero, and it will be blocked and wait until a producer adds an item and releases its fillCount
      + when a producer tries to add an item to a full buffer, it will first acquire emptyCount, which is zero, and it will be blocked and wait until a consumer removes an item and releases its emptyCount

In [None]:
""" Connecting cell phones to a charger """
# this code example defines a semaphore obj, charger as a counting semaphore
# when semaphore is available and thread acquires the semaphore, it will print
# information and sleep, then release the semaphore,  otherwise, it will be
# blocked and wait for other threads to release the semaphore

import random
import threading
import time

charger = threading.Semaphore(4)

def cellphone():
    name = threading.current_thread().getName()
    charger.acquire()
    print(name, 'is charging...')
    time.sleep(random.uniform(1,2))
    print(name, 'is DONE charging!')
    charger.release()

if __name__ == '__main__':
    for phone in range(10):
        threading.Thread(target=cellphone, name='Phone-'+str(phone)).start()


In [None]:
""" Connecting cell phones to a charger """
# this is the version of the code in the previous cell by
# using the semaphore context manager. Here we set Semaphore(1)
# to use a binary semaphore. It is now similar to a RLock

import random
import threading
import time

charger = threading.Semaphore(1)

def cellphone():
    name = threading.current_thread().getName()
    with charger:
        print(name, 'is charging...')
        time.sleep(random.uniform(1,2))
        print(name, 'is DONE charging!')

if __name__ == '__main__':
    for phone in range(10):
        threading.Thread(target=cellphone, name='Phone-'+str(phone)).start()


### Barriers
* Data race occurs when multiple threads concurrently access the same memory location, if at least one threads write to the memory
  + solution to data race is to ensure mutual exclusion to the resources (memory location)
* A race condition, is the flaw in timing or ordering of a program's execution that causes incorrect behavior
  + even though a mutex is used to allow only one thread to access the resouce, but the order of threads accessing it will lead to different results, we have a problem since the order is unpredictable
  + one example is that we have two threads, one is to multiple the current value by 2, and the other is to add 3
    + it is obvious the order of these two threads accessing a shared variable will lead to different results
    + hard to identify race conditions
    + use sleep statements to modify timing and execution order may identify the problem 
* Barrier provide a solution to race condition
  + a barrier prevents a group of threads from proceeding until enough (or all) threads have reach the barrier
  + a barrier defines threads' actions before and after the barrier. 
    + If threads with "afer barrier" actions will wait after all "pre barrier" actions complete, and then execute
    + in general, it controls the relative order in which threads execute certain operations

In [None]:
""" Deciding how many bags of chips to buy for the party """
# this code demonstrates a race condition where 10 threads access a shared variable (bags_of_chips)
# the shared variable is protected by a mutex. 5 of the threads increase bags_of_chips by 3, and 
# the other 5 multiple bags_of_chips by 2. Since the order of the threads accessing bags_of_chip is
# not predictable, each time, we will get a differnt results when all threads complete

import threading

bags_of_chips = 1 # start with one on the list
pencil = threading.Lock()

def cpu_work(work_units):
    x = 0
    for work in range(work_units*1_000_000):
        x += 1

def barron_shopper():
    global bags_of_chips
    cpu_work(1) # do a bit of work first
    with pencil:
        bags_of_chips *= 2
        print('Barron DOUBLED the bags of chips.')

def olivia_shopper():
    global bags_of_chips
    cpu_work(1) # do a bit of work first
    with pencil:
        bags_of_chips += 3
        print('Olivia ADDED 3 bags of chips.')

if __name__ == '__main__':
    shoppers = []
    for s in range(5):
        shoppers.append(threading.Thread(target=barron_shopper))
        shoppers.append(threading.Thread(target=olivia_shopper))
    for s in shoppers:
        s.start()
    for s in shoppers:
        s.join()
    print('We need to buy', bags_of_chips, 'bags of chips.')


In [None]:
""" Deciding how many bags of chips to buy for the party """
# this code example demonstrates how to resolve the race condition shown in the preivous cell
# by using barrier. we define the after-barrier actions and before-barrier actions by putting
# the code after and before barrier.wait() statement
# in addition, we want all threads to arrive at barrier before the program exits, so we put
# thread.Barrier(10)

import threading

bags_of_chips = 1 # start with one on the list
pencil = threading.Lock()
fist_bump = threading.Barrier(10)

def cpu_work(work_units):
    x = 0
    for work in range(work_units*1_000_000):
        x += 1

def barron_shopper():
    global bags_of_chips
    cpu_work(1) # do a bit of work first
    fist_bump.wait() # define the barrier check point to wait
    with pencil:
        bags_of_chips *= 2
        print('Barron DOUBLED the bags of chips.')

def olivia_shopper():
    global bags_of_chips
    cpu_work(1) # do a bit of work first
    with pencil:
        bags_of_chips += 3
        print('Olivia ADDED 3 bags of chips.')
    fist_bump.wait() # define barrier checkpoint to wait

if __name__ == '__main__':
    shoppers = []
    for s in range(5):
        shoppers.append(threading.Thread(target=barron_shopper))
        shoppers.append(threading.Thread(target=olivia_shopper))
    for s in shoppers:
        s.start()
    for s in shoppers:
        s.join()
    print('We need to buy', bags_of_chips, 'bags of chips.')


### Thread pool
* thread pool
  + creates and maintains a collection of worker threads
  + reuses existing threads to execute tasks
  + assigns tasks to threads available in the pool
  + reduce the overhead to create new threads when the time takes to complete a task is shorter than creating a thread
* using python ThreadPoolExecutor class defined in concurrent.futures module
  + high-level interface for asynchronously running tasks
  + useful for IO-bound tasks
  + use ProcessPoolExecutor for CPU-bound tasks
* the following cell shows the code using ThreadPoolExecutor and ProcessPoolExecutor  

In [None]:
""" Chopping vegetables with a ThreadPool """

# thread pool is used to process IO bound tasks
# this code section uses ThreadPoolExecutor with 5 threads to process 100 tasks
# defined by vegetable_chopper function. In version 1, we explicitly initialized
# thread pool and shutdown it. 
# In version 2, we used context manager to manage the instantiation and shutdown
# of thread pool

import threading
from concurrent.futures import ThreadPoolExecutor

def vegetable_chopper(vegetable_id):
    name = threading.current_thread().getName()
    print(name, 'chopped vegetable', vegetable_id)

if __name__ == '__main__':
    # version 1
    pool = ThreadPoolExecutor(max_workers=5)
    for vegetable in range(100):
        pool.submit(vegetable_chopper, vegetable)
    pool.shutdown()
    
    # version 2
#     with ThreadPoolExecutor(max_workers=5) as pool:
#     for vegetable in range(100):
#         pool.submit(vegetable_chopper, vegetable)

In [None]:
""" Chopping vegetables with a ThreadPool """

# process pool is used to process CPU bound tasks
# this code section uses ProcessPoolExecutor with 5 processes to process 100 tasks
# defined by vegetable_chopper function using context manager to manage the 
# instantiation and shutdown of process pool
# for this demo, all tasks will be processed by the main thread of each process,
# instead of printing out thread name, we printed out process id (pid)


import threading
from concurrent.futures import ProcessPoolExecutor
import os

def vegetable_chopper(vegetable_id):
    # print out process id
    name = os.getpid()
    print(name, 'chopped a vegetable', vegetable_id)

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=5) as pool:
        for vegetable in range(100):
            pool.submit(vegetable_chopper, vegetable)

### Future
* future is a placeholder for a result that will be available later
* it provides a mechanism to access the result of an asynchronous operation
* Executor.submit() returns an instance of Future class that encapsulates the asynchronous execution of callable task
* future class contains several methods to check the status of task execution, and cancel it if needed
  + cancel()
  + cancelled()
  + running()
  + done()
  + result()
    + return the value returned by the call
    + if the process has not completed, the method will block the processing until it obtains the value

In [None]:
""" Check how many vegetables are in the pantry """
# this code section returns a future obj by pool.submit() method
# when executing future.result(), the main thread was blocked for
# several seconds after how_many_vegetables() function sleeps for 3 s
# and then printed the results out

from concurrent.futures import ThreadPoolExecutor
import time

def how_many_vegetables():
    print('Olivia is counting vegetables...')
    time.sleep(3)
    return 42

if __name__ == '__main__':
    print('Barron asks Olivia how many vegetables are in the pantry.')
    with ThreadPoolExecutor() as pool:
        future = pool.submit(how_many_vegetables)
        print('Barron can do others things while he waits for the result...')
        print('Olivia responded with', future.result())


### Divide and Conque algorithms can be used for parallel programming
* Each subproblem can be processed by a separate thread/process
  + if the subproblem is the base case, solve it (maybe by sequential computation), otherwise, divide furhter with parallel computing of the subproblems
* they then combine together
* return results

In [None]:
""" Recursively sum a range of numbers """
# this code section demonstrates parallel programing using ProcessPoolExecutor for
# a divide and conque algorithm. 
# the function accepts the low and high value for a range to calculate its sum

# if the input pool is None, that means we haven't initialize a process pool, so we
# initialize a processPoolExecutor managed by context manager, and recursively call
# the function using the low, high and created pool obj. 
# the returned results are stored in a future obj list
# we then use the as_completed to return the values of feature list when it completes
# execution, and sum the values in each future obj using generator

# otherwise, if the function already has the pool obj, then we first check if it is
# the base case where the low and high values are in the range of 100000, if so, we return
# a single element list containing only one future result, otherwise, we separate the range
# into two sub ranges, and recursively call the function for sub-ranges, and then
# concatenate the returned future lists for the two sub-range calculations and return the 
# concatenated future list

from concurrent.futures import ProcessPoolExecutor, as_completed

def recursive_sum(lo, hi, pool=None):
    if not pool:
        with ProcessPoolExecutor() as executor:
            futures = recursive_sum(lo, hi, pool=executor)
            return sum(f.result() for f in as_completed(futures))
    else:
        if hi - lo <= 100_000: # base case threshold
            return [pool.submit(sum, range(lo,hi))]
        else:
            mid = (hi + lo) // 2 # middle index for splitting
            left = recursive_sum(lo, mid, pool=pool)
            right = recursive_sum(mid, hi, pool=pool)
            return left + right

if __name__ == '__main__':
    total = recursive_sum(1, 1_000_000_000)
    print('Total sum is', total)


### Evaluating parallel performace
* weak scaling
  + adding more processor/threads while keeping the fixed problem size per processor/thread
  + as a result, we can accomplish more work in the same time frame
* strong scaling
  + adding more processor/threads while keeping the total problem size fixed
  + as a result, we can accomplish the same amount of work faster
* in either weak or strong scaling case, we increase the overall throughput as defined by # tasks/time
* another metrics is latency, which is the time to take to complish a task, as defined by time/task
* examples of throughput and latency:
  + latency: 6 minutes for processing each cake
  + throughput: process 10 cakes per hour
  + adding more processors will not change latency, but will improve throughput
* other metrics to evaluate parallel performace
  + speedup = (sequential execution time)/(parallel execution time with N workers) as how much we can speed up the process
    + Amdahl's law
      + overall speedup = 1/((1-p) + p/s)
        + p: portion of program that is parallelizable
        + s: speed up of the parallized portion
        + if p = 0.95 and s = 2, overall speed up = 1.9
      + provides the upper boundary of how much we can gain by adding more processors. 
      + the sequentail part (here 5%) of the program impose a limit on overall speedup (< 20) even if we add 1 M processors
      + Depeding on p of a program, consider how much we can gain by parallel computing using more processors
  + efficiency
    + how well additional resources are utilized
    + efficiency = speedup/(# of processors) 

In [None]:
""" Measure the speedup of a parallel algorithm """
# this code section provides the framework that can be used to compare
# performace gain in execution time using parallel programming compared to
# sequetial version of the program

# multiple processors were used to compare to sequential version
# note that we should not use the perfomace of the multiprocessing
# version runing a single CPU to represent sequential version since
# we still have all the overhead even with a single CPU. Instead, we
# should use a separate program optimized for sequential execution

# several things to note in the program
# 1. warm up the program before collecting data
# 2. use time.perf_counter to record the start and end of the execution, and get execution time by subtraction
# 3. run multiple times and sum the execution times for all the runs, then calculte the average
# 4. calculate speed up using the execution times on sequential and parallel computing
# 5. evaluate the efficiency by diving the speedup by # of processors avaialbe in computer.

from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
import time

""" sequential implementation """
def seq_sum(lo, hi):
    return sum(range(lo, hi))

""" parallel implementation """
def par_sum(lo, hi, pool=None):
    if not pool:
        with ProcessPoolExecutor() as executor:
            futures = par_sum(lo, hi, pool=executor)
            return sum(f.result() for f in as_completed(futures))
    else:
        if hi - lo <= 10_000: # base case threshold
            return [pool.submit(sum, range(lo,hi))]
        else:
            mid = (hi + lo) // 2 # middle index for splitting
            left = par_sum(lo, mid, pool=pool)
            right = par_sum(mid, hi, pool=pool)
            return left + right

if __name__ == '__main__':
    NUM_EVAL_RUNS = 1
    SUM_VALUE = 100_000_000

    print('Evaluating Sequential Implementation...')
    sequential_result = seq_sum(1, SUM_VALUE) # "warm up"
    sequential_time = 0
    for i in range(NUM_EVAL_RUNS):
        start = time.perf_counter()
        seq_sum(1, SUM_VALUE) # sequential summing
        sequential_time += time.perf_counter() - start
    sequential_time /= NUM_EVAL_RUNS

    print('Evaluating Parallel Implementation...')
    parallel_result = par_sum(1, SUM_VALUE)  # "warm up"
    parallel_time = 0
    for i in range(NUM_EVAL_RUNS):
        start = time.perf_counter()
        par_sum(1, SUM_VALUE)
        parallel_time += time.perf_counter() - start
    parallel_time /= NUM_EVAL_RUNS

    if sequential_result != parallel_result:
        raise Exception('sequential_result and parallel_result do not match.')
    print('Average Sequential Time: {:.2f} ms'.format(sequential_time*1000))
    print('Average Parallel Time: {:.2f} ms'.format(parallel_time*1000))
    print('Speedup: {:.2f}'.format(sequential_time/parallel_time))
    print('Efficiency: {:.2f}%'.format(100*(sequential_time/parallel_time)/mp.cpu_count()))


### Parallel design of system
* four stages to design parallel systems
  + partitioning
    + break the problem down into discrete pieces of work that can be distributed to multiple tasks
    + decompose the problems into as many as possible smaller tasks
    + two ways
      + domain/data decomposition
        + focuse on dividing data associated with the problem into many smaller, if possible, equal-size partitions
        + then consider the computations to be performed and associate them with the partitioned data
      + fucntional decomposition
        + divide the computational work that a program needs to do into separate tasks
        + then consider the data requirements of each tasks
      + combination of these two strategies   
  + communication
    + coordinate task execution and share information or data
      + not necessarily needed for every task
      + point to point communications
        + a small number of tasks need to communicate with each other
        + occurs between neighbouring tasks
        + for each link of the communcation, one task act as producer and the other the consumer of data
        + local point to point communications occur when each task only communicates with small number of other tasks
      + collective communication
        + if your tasks need to communicate with a large number of tasks
        + boradcast: a task talks to all other tasks
        + scatter: a task talk to a group, or communicate different pieces of data to each member of the process
      + another way to categorize communications is by synchronous and asynchronous communications
        + synchronous communication or block communication
          + tasks wait until entire communication is complete
          + tasks can not do other work while in progress
        + asynchronous communication or nonblocking communication
          + tasks do not wait for communication to complete
          + can do other work while in progress. 
          + It can begin doing other work immediately regardless of when the receiving task gets that message
      + overhead
        + overhead is caused since compute time and resources are spent on communication rather than processing data
      + latency
        + time to spend message from A to B in microseconds
      + bandwidth
        + amount of data communicated per second (GB/s)
        
  + agglomeration
    + combine tasks and replicate data/computation to increase efficiency
    + granularity = time spent on computation/ time spent on communication
    + fine-grained parallelism
      + large amount of small tasks
      + advantage: good distribution of workload (load balancing) among processors to maximize their usage
      + disadvantage: increase overhead for communications and synchroization
        + decreases the computation-to-communication ratio
    + coarse-grained parallelism
      + small number of large tasks
      + advantage: high computation-to-communication ratio
      + disadvantage: inefficient load balancing where certain processors process the bulk of data where others are idle
    + summary
      + a well designed parallel program should adapt to changes in the number of processors
      + use compile time or run time parameters to control granularity rather than hard coded parameters
      
  + mapping
    + specify where each task will execute
    + does not apply to single-core processors or automated task scheduling
    + not need to consider if everything runs on a desktop computer since OS will assign tasks to CPU cores
    + applies if using a distributed system or specialized hardware with many parallel processors for larg scale problems
    + main goal is to minimize the toal execution time, usually by two strategies
      + place tasks that can execute concurrently on different processors to increase concurrency
      + place tasks communicate frequently on the same processor to increase locality
      + may use dynamic load balancing to periodically generate mapping strategies vs changes in computation and communications
    