<img src="https://www.mines.edu/webcentral/wp-content/uploads/sites/267/2019/02/horizontallightbackground.jpg" width="100%"> 
### CSCI250 Python Computing: Building a Sensor System
<hr style="height:5px" width="100%" align="left">

# Concurrent code execution

# Objective
* introduce concurrent code execution
* use Python threads
* use Python queues

# Resources
* [Python threading](https://docs.python.org/3/library/threading.html)
* [Python queuing](https://docs.python.org/3/library/queue.html)

# Definition

**Concurrency** means that 
* multiple tasks are executed within the same time frame
* the tasks are unrelated and may be dependent on one-another

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time, threading, queue

# `threading` module

Implements a system that enables execution in concurrent threads.

**Threads** use code sequences executed independently.

`threading` functions provide access to thread information.

## `threading.active_count()`
Returns the number of `Thread` objects currently alive.

In [None]:
threading.active_count()

## `threading.main_thread()`
Returns the main `Thread` object (from which Python was started).

In [None]:
threading.main_thread()

## `threading.current_thread()`
Return the current `Thread` controlled by the user.

In [None]:
threading.current_thread()

## `threading.get_ident()`
Return a non-zero integer that uniquely identifies the current thread.

In [None]:
threading.get_ident()

## `threading.enumerate()`
Returns a list of all `Thread` objects currently alive.

In [None]:
threading.enumerate()

# `threading` demo
We will use a function that simply waits for a specified time.

In [None]:
def myFunc(i,t):              # receives a task ID and the wait time
    time.sleep(t)             # wait the specified time
    print("%3d %6.2f"%(i,t))  # display the ID and the wait time

In [None]:
i = 0    # task ID
t = 2.0  # delay time
myFunc(i, t)

## `threading.Thread()`
A class that defines threads and thread operations.

`threading.Thread(target, name, args)`

* `target`: a function
* `name`: the thread name
* `args`: an argument tuple for the function invocation

We will work primarily with `target` and `args`.

In [None]:
threading.Thread?

In [None]:
# define a thread
t = threading.Thread( target = myFunc, args = (0,1) )
print(t)

## `Thread.start()`
Starts the activity of a thread object.

A thread is considered **alive** once its activity starts. 

In [None]:
# define a thread (waits for 1s)
t = threading.Thread( target = myFunc, args = (0,1) )

print('# of threads before thread start = ',threading.active_count())
t.start()
print('# of threads  after thread start = ',threading.active_count())
time.sleep(2)
print('# of threads  after thread end   = ',threading.active_count())

## `Thread.is_alive()`
Returns a boolean indicating whether a thread is alive.

In [None]:
# define a thread (waits for 2s)
t = threading.Thread( target = myFunc, args = (0,2) )

# start the thread
t.start()

# check thread status 
for i in range(5):
    print( i, t.is_alive() )
    time.sleep(1)       # check every 1s

We can define multiple threads, then initiate them at once.

In [None]:
ta = threading.Thread( target = myFunc, args = (0,2.0) )
tb = threading.Thread( target = myFunc, args = (1,3.0) )
tc = threading.Thread( target = myFunc, args = (2,5.0) )

In [None]:
print('alive threads',threading.active_count())

# start all threads in close succession
ta.start() #  short time
tb.start() # medium time
tc.start() #   long time

print('alive threads',threading.active_count())

We can check thread status over time.

In [None]:
ta = threading.Thread( target = myFunc, args = (0,2.0) )
tb = threading.Thread( target = myFunc, args = (1,3.0) )
tc = threading.Thread( target = myFunc, args = (2,5.0) )

In [None]:
# start all threads in close succession
ta.start() #  short time
tb.start() # medium time
tc.start() #   long time

for i in range(10):     # loop 10 times
    print( i, ta.is_alive(),tb.is_alive(),tc.is_alive())
    time.sleep(1)       # check every 1s

# `queue` module
Implements a multi-producer, multi-consumer system.
* **producers**: add elements to a queue
* **consumers**: remove elements from a queue

Used to pass data between code executed in separate threads.

## `Queue` class
A class that defines a FIFO queue and associated operations.

## `Queue.put()`
Put an item into the queue.

## `Queue.get()`
Remove and return an item from the queue.

## `Queue.task_done()`
Indicate that a formerly enqueued task is complete.

##  `Queue.join()`
Blocks execution until all items in the `Queue` have been processed.

Experiment with the `queue` class using a **fast producer** function,...

In [None]:
import string
def myA(Q): # puts items into a queue named Q
    message = 'Hello Colorado School of Mines students enrolled in CSCI 250!'
    
    for item in (message.split()):
        Q.put(item)                          # put into the queue
        print('>A>',item)                    # announce a 'put' action
        time.sleep(0.5)                      # short delay

...,and a couple of **slow consumer** functions:

In [None]:
def myB(Q): # gets items from a queue named Q
    while True:        
        item = Q.get()                       # get from the queue
        print('\t <B<',item)                 # announce a 'get' action
        time.sleep(1.0)                      # medium delay between get   
        Q.task_done()                        # mark task completion
        
def myC(Q): # gets items from a queue named Q
    while True:        
        item = Q.get()                       # get from the queue        
        print('\t <C<',item)                 # announce a 'get' action
        time.sleep(3.0)                      # long delay between get 
        Q.task_done()                        # mark task completion

We will start all threads in close succession ("at once"):
* A ads to Q faster than B and C can extract from Q
* Q gets longer because the consumers cannot keep up
* when A is done, B and C continue to consume from Q
* B (medium delay) extracts more from Q than C (long delay)

In [None]:
# define a queue
Q = queue.Queue()

# define threads for producer (A) and consumers (B,C)
tA = threading.Thread(target=myA, args=(Q,)) # put   in Q;  short delay
tB = threading.Thread(target=myB, args=(Q,)) # get from Q; medium delay
tC = threading.Thread(target=myC, args=(Q,)) # get from Q;   long delay

In [None]:
# start all programs as independent threads
tA.start() 
tB.start() 
tC.start()

# keep all threads working until the queue is empty
Q.join() 

<img src="https://www.dropbox.com/s/7vd3ezqkyhdxmap/demo.png?raw=1" width="10%" align="left">

# Demo
Set-up a producer and a consumer, and pass data between them:

* the producer adds to the queue and waits;
* the consumer gets from the queue when data are available;
* the process ends when the queue is empty.

In [None]:
# define producer
def myPROD(Q, n):

    for i in range(n):
        print('P: put',i*i)  # announce a 'put' action
        Q.put(i*i)           # put into the queue
        time.sleep(1)        # wait 1s
        
# define consumer
def myCONS(Q, n):
    
    for i in range(n):
        h = Q.get()           # get from the queue
        print('C: get',h)     # announce a 'get' action
        Q.task_done()         # mark task completion

In [None]:
# create the queue to pass messages
Q = queue.Queue()

# set how many messages to pass
n = 5

# define the threads
tP = threading.Thread(target=myPROD, args=(Q,n)) # producer
tC = threading.Thread(target=myCONS, args=(Q,n)) # consumer

# start the threads
tP.start()
tC.start()

# keep working until the queue is empty
Q.join()

<img src="https://www.dropbox.com/s/7vd3ezqkyhdxmap/demo.png?raw=1" width="10%" align="left">

# Demo
Set-up three processes: a producer, a consumer, and an observer. 

* the producer passes data to the consumer through a queue
* the observer monitors the queue status (length)

In [None]:
# define the producer
# completes task in workT, waits for dt     
def myPRO(Q, workT, dt):
    
    t = 0.0
    while t < workT:
        Q.put(t)       # add to queue
        time.sleep(dt)
        t += dt

In [None]:
# define consumer:
# completes task in workT; waits for dt
def myCON(Q, workT, dt):
    
    t = 0.0
    while t < workT:
        h = Q.get()    # extract from queue
        time.sleep(dt)
        Q.task_done()    
        t += dt

In [None]:
# define observer
# executes task for workT; waits for dt
def myOBS(Q, workT, dt):
    nt = int(workT / dt)
    
    t = np.empty(nt)              # time
    q = np.empty(nt)              # queue size
    
    to = time.time()
    for i in range(nt):
        q[i] = len(Q.queue)       # get queue size
        t[i] = time.time() - to   # record the time
        time.sleep(dt)
    
    # plot queue size vs. time
    plt.figure(figsize=(15,5))
    plt.plot(t,q,'r.')

    plt.title('queue monitor')
    plt.xlabel('t(s)')
    plt.ylabel('Q')
    
    plt.ylim([0,200])
    plt.xlim([0,workT])
    plt.grid()
    
    plt.show()

### 1 producer, 1 consumer

In [None]:
pDURATION = 1.0   # producer time
pDELAY    = 0.01  # producer delay

cDURATION = 3.0   # consumer time
cDELAY    = 0.03  # consumer delay

oDURATION = 6.0   # observer time
oDELAY    = 0.001 # observer delay

In [None]:
Q = queue.Queue() 

O  = threading.Thread(target=myOBS, args=(Q, oDURATION, oDELAY)) # observer
P  = threading.Thread(target=myPRO, args=(Q, pDURATION, pDELAY)) # producer
C  = threading.Thread(target=myCON, args=(Q, cDURATION, cDELAY)) # consumer

O.start()    # observer
P.start()    # producer
C.start()    # consumer

Q.join()

### 2 producers, 2 consumers

In [None]:
Q = queue.Queue() 

O  = threading.Thread(target=myOBS, args=(Q, oDURATION, oDELAY)) # observer
P1 = threading.Thread(target=myPRO, args=(Q, pDURATION, pDELAY)) # producer 1
P2 = threading.Thread(target=myPRO, args=(Q, pDURATION, pDELAY)) # producer 2
C1 = threading.Thread(target=myCON, args=(Q, cDURATION, cDELAY)) # consumer 1
C2 = threading.Thread(target=myCON, args=(Q, cDURATION, cDELAY)) # consumer 2

O.start()    # observer
P1.start()   # producer 1
time.sleep(0.5)
P2.start()   # producer 2
C1.start()   # consumer 1
time.sleep(1.0)
C2.start()   # consumer 2

Q.join()