# Threading
---
In other programmign languages such as C and Java threading allows multiple tasks to run concurrently. Threads can run simultaneously or in parallel (multiple cpu's being used at once)

Python has a global interpreter lock which prevents threads from being run simultaneously (in parrallel). To bypass this, Python switches rapidly between each task, giving the illusion of multi-threading.

## Index
1. Initializing & Running Threads
2. Running Multiple Threads 
3. Subclassing and the Thread class run() method
4. Subclassing and the Thread class __init__ method
5. Locks
6. Queues
7. Event Threading
8. Image Downloading Example

### A Note on Multiprocessing
---
The Multiprocessesing module is very similar to the threading module, however \_\_name__ == '\_\_main__' is required, therefore this module won't work 
within jupyter notebook.

To see some examples of multiprocessing, go to multiprocessing_ex.py in same folder as this notebook

In [1]:
# Modules used in this notebook
import time
import os
import requests
import queue
import threading
import numpy as np

## 1. Initializing & Running Threads


In [7]:
# To create a thread, there must be a target function for the thread to use
def sleeper(n, name):
    print(f'{name} Sleeping For {n} Seconds\n')
    time.sleep(n)
    print(f'{name} Finished Sleeping\n')

# Iniitialize threads with threading.Thread(target = function, 
#                                           name   = threadname,
#                                           arge   = function args)
t1 = threading.Thread(target = sleeper,
                      name = 't1', 
                      args = (5, 't1'))

# Run threads using start()
t1.start() 

# join() blocks rest of code execution until thread is finished
# t1.join()

# These functions will run while thread is operation(unless t1.join() used)
print('hello')
print('hello')

t1 Sleeping For 5 Seconds

hello
hello
t1 Finished Sleeping



## 2. Running Multiple Threads

In [6]:
threads_list = []

start = time.time()

for i in range(5):
    t = threading.Thread(target = sleeper, 
                         name = f't{i}', 
                         args = (5, f't{i}'))
    threads_list.append(t)
    t.start()
    print(f'{t.name} has started')
    
# All threads will finish before continuing on with program by using join()
for t in threads_list:
    t.join()
    
end = time.time()
print(f'all threads executed in: {end-start}')

# Test to see that all threads finished running
print('All threads run')

t0 Sleeping For 5 Seconds
t0 has started

t1 Sleeping For 5 Seconds

t1 has started
t2 Sleeping For 5 Seconds
t2 has started

t3 Sleeping For 5 Seconds
t3 has started

t4 Sleeping For 5 Seconds

t4 has started
t0 Finished Sleeping

t1 Finished Sleeping

t2 Finished Sleeping
t3 Finished Sleeping


t4 Finished Sleeping

all threads executed in: 5.019181251525879
All threads run


## Subclassing

Subclassing allows for tweaking details of each thread instance. 
Below is the official documentation for thread objects:<br>
https://docs.python.org/3/library/threading.html

## 3. Sublcassing and the Thread class run() method

In [17]:
# The run() method is automatically called by the start() method.
# The run() method is responsible for executing the function passed into the
# thread objects target parameter, below the sleeper() function:
# threading.Thread(target = sleeper, args = (5,))

# Note: Quick way to get source code of a method in jupyter notebook
# threading.Thread.run??

# This is the run() source code
def run(self):
    """Method representing the thread's activity.

    You may override this method in a subclass. The standard run() method
    invokes the callable object passed to the object's constructor as the
    target argument, if any, with sequential and keyword arguments taken
    from the args and kwargs arguments, respectively."""
    try:
        if self._target:
            self._target(*self._args, **self._kwargs)
    finally:
        del self._target, self._args, self._kwargs

In [18]:
# Because the run method was designed to allow subclassing, the above 
# run() function can be altered

# Original run() method from Python source code
'''
def run(self):
    try:
        if self._target:
            self._target(*self._args, **self._kwargs)
    finally:
        del self._target, self._args, self._kwargs
'''

def sleeper(n, name):
    print(f'{name} Sleeping For {n} Seconds\n')
    time.sleep(n)
    print(f'{name} Finished Sleeping\n')

# Create custom sublcass (inherting from parent threading.Thread class)
class myThread(threading.Thread):
    #note that __init__ not altered here, keeping same as parent class
    #only run() method will be changed
    def run(self):
        print(f'{self.getName()} download started') #tweak 1
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs
            print(f'{self.getName()} executed successfully') #tweak 2
            
for i in range(4):
    t = myThread(target = sleeper, 
                 name = f't{i+1}', 
                 args = (3, f't{i+1}'))
    t.start()

t1 download started
t1 Sleeping For 3 Seconds

t2 download started
t2 Sleeping For 3 Seconds

t3 download started
t3 Sleeping For 3 Seconds

t4 download started
t4 Sleeping For 3 Seconds

t1 Finished Sleeping

t1 executed successfully
t2 Finished Sleeping

t2 executed successfully
t3 Finished Sleeping

t3 executed successfully
t4 Finished Sleeping

t4 executed successfully


## 4. Subclassing and the Thread class \_\_init__ method

In [22]:
# Per Python documentation, the Thread Class' __init__ method can also be 
# tweaked. Below is the source code
# threading.Thread.__init__??

Source:   
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        """This constructor should always be called with keyword arguments. Arguments are:

        *group* should be None; reserved for future extension when a ThreadGroup
        class is implemented.

        *target* is the callable object to be invoked by the run()
        method. Defaults to None, meaning nothing is called.

        *name* is the thread name. By default, a unique name is constructed of
        the form "Thread-N" where N is a small decimal number.

        *args* is the argument tuple for the target invocation. Defaults to ().

        *kwargs* is a dictionary of keyword arguments for the target
        invocation. Defaults to {}.

        If a subclass overrides the constructor, it must make sure to invoke
        the base class constructor (Thread.__init__()) before doing anything
        else to the thread.

In [19]:
# Original __init__ method
# def __init__(self, group=None, target=None, name=None, 
#              args=(), kwargs=None, *, daemon=None):

# Here, tweaks to __init__ are number and func used instead of name and target
class MyThread(threading.Thread):
    def __init__(self, number, func, args):
        #becuase this class inherits from parent, must call __init__self()
        threading.Thread.__init__(self)
        self.number = number
        self.func = func
        self.args = args
        
    def run(self):
        print(f'thread {self.number} download started') #tweak 1
        self.func(*self.args)  # *allows for tuple args (it will unpack them)
        print(f'thread {self.number} executed successfully') #tweak 2   
    
def double(number, cycles):
    for i in range(cycles):
        number += number
    print(number)
    
threads_list = []

for i in range(10):
    t = MyThread(number = i+1, func = double, args = [i, 3])
    threads_list.append(t)
    t.start()
    
for t in threads_list:
    t.join()

thread 1 download started
0
thread 1 executed successfully
thread 2 download started
8
thread 2 executed successfully
thread 3 download started
16
thread 3 executed successfully
thread 4 download started
24
thread 4 executed successfully
thread 5 download started
32
thread 5 executed successfully
thread 6 download started
40
thread 6 executed successfully
thread 7 download started
48
thread 7 executed successfully
thread 8 download started
56
thread 8 executed successfully
thread 9 download started
64
thread 9 executed successfully
thread 10 download started
72
thread 10 executed successfully


## 5. Locks
Locks are used when multiple threads could access a global variable at once to prevent the threads from running out of order

In [23]:
# In theory, running these 4 functions below with four separate threads should
# produce a final answer of 0 (if run in order no matter what the count)
# however, this doesn't happend because each thread has access to the global
# variable x and can end up running out of order which causes incorrect 
# value calculation order

x = 0
COUNT = 100000

# To use a lock, first must create it by using with lock: 
# each thread will only have access to the global variable x 
# if that particualr thread  is unlocked
# note that lock: is succint code that replaces
# lock.acquire & lock.release(), that method would look like:
'''
def add_2():
    global x
    lock.acquire()
    for i in range(COUNT):
        x += 2
    lock.release()
'''

lock = threading.Lock()

def add_2():
    global x
    with lock:
        for i in range(COUNT):
            x += 2

def add_3():
    global x
    with lock:
        for i in range(COUNT):
            x += 3
        
def sub_4():
    global x
    with lock:
        for i in range(COUNT):
            x -= 4
        
def sub_1():
    global x
    with lock:
        for i in range(COUNT):
            x -= 1

            
t1 = threading.Thread(target = add_2)
t2 = threading.Thread(target = add_3)
t3 = threading.Thread(target = sub_4)
t4 = threading.Thread(target = sub_1)

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

# Should get 0 with lock, to see without, just remove with lock from funcs
print(x)

0


## 6. Queues
The queue module is useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.

In [24]:
# Create a queue
q = queue.Queue() 

# put() adds items to a queue
q.put(5)

# get() will return the most recent input from the queue
# note that get() will run infinitely if queue is empty
print(q.get())

# After q runs, it will be empty as it only contains a single item
# if .get() is called after it runs, an infinite loop will occur.
# empty() can be used as a check prior to get() from a queue
# in order to prevent this from a happening (see FIFO example in cell below)
print(q.empty())

5
True


**There are 3 types of queues:**
1. **FIFO** (First-in, First-out)
2. **LIFO** (Last-in, First-out)
3. **Priority** (Lowest values returned first, no matter the order added to queue)

### 1. FIFO (First-in, First-out)

In [28]:
# FIFO is the Queue() object default
q = queue.Queue()

print('First In First Out (FIFO)')
for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end = '\n')
    
print()

First In First Out (FIFO)
0
1
2
3
4



### 2. LIFO (Last-in, First-out)

In [29]:
print('Last In First Out (LIFO)')
q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end = '\n')

print()

Last In First Out (LIFO)
4
3
2
1
0



### 3. Priority (Lowest values returned first, no matter the order added to queue)

In [30]:
print('Priority Queue')
q = queue.PriorityQueue()
q.put(1)
q.put(3)
q.put(4)
q.put(2)

for i in range(q.qsize()):
    print(q.get())
    
print()

# Tuples can be used to explicity set non-numerical string data queue order
print('Priority Queue w/Strings')
q = queue.PriorityQueue()
q.put((1, 'Priority 1'))
q.put((3, 'Priority 3'))
q.put((4, 'Priority 4'))
q.put((2, 'Priority 2'))

for i in range(q.qsize()):
    print(q.get()[1])

Priority Queue
1
2
3
4

Priority Queue w/Strings
Priority 1
Priority 2
Priority 3
Priority 4


## 7. Event Objects (Allow threads to communicate through flags and switches)

Events are one of the simplest mechanisms for communication between threads: one thread signals (or flags) an event and other threads wait for it. 

An event object manages an internal flag that can be set to true with the set() method and reset to false with the clear() method. 

The wait() method blocks until the flag is true.

In [31]:
def flag():
    time.sleep(3)
    event.set() # sets event to true, allows thread to run
    print('The Final Countdown Begun\n')
    time.sleep(5)
    event.clear() # sets event to false

# This function will not start until event.set is true
def start_operations():
    event.wait()
    while event.is_set():
        print('Starting random integer task')
        x = np.random.randint(1,16)
        time.sleep(.5)
        print(x)
        if x == 13:
            print('True')
    print('Event has been cleared, random operation stops')

# Create event object
event = threading.Event()
t1 = threading.Thread(target = flag)
t2 = threading.Thread(target = start_operations)

t1.start()
t2.start()

The Final Countdown Begun
Starting random integer task

6
Starting random integer task
15
Starting random integer task
6
Starting random integer task
13
True
Starting random integer task
2
Starting random integer task
3
Starting random integer task
6
Starting random integer task
1
Starting random integer task
10
Starting random integer task
10
Event has been cleared, random operation stops


## 8. Image Downloading Examples

In [32]:
# I got these images from: https://homepages.cae.wisc.edu/~ece533/images/

# Single Thread Example 
image_names = ['airplane', 'arctichare', 
               'baboon', 'boat',
               'cat', 'fruits', 
               'frymire', 'girl', 
               'monarch', 'mountain', 
               'pool', 'peppers', 
               'serrano','tulips']

image_links = []

for i, name in enumerate(image_names):
    image_links.append(f'https://homepages.cae.wisc.edu/~ece533/images/{name}.png')

def download_images(image_links, session=None):
    if not session:
        session = requests.Session()
        
    for i, link in enumerate(image_links):
        try:
            r = session.get(link)
        except(requests.exceptions.RequestException, UnicodeError) as e:
            print(e)
        
        print(f'id: {i} | {image_names[i]}.png downloaded')
        
        with open(os.path.join(os.getcwd(), f'img\\{image_names[i]}.png'), 'wb') as f:
            f.write(r.content)

start = time.time()
download_images(image_links)
end = time.time()
print(f'All Images Downloaded In: {end - start}')

id: 0 | airplane.png downloaded
id: 1 | arctichare.png downloaded
id: 2 | baboon.png downloaded
id: 3 | boat.png downloaded
id: 4 | cat.png downloaded
id: 5 | fruits.png downloaded
id: 6 | frymire.png downloaded
id: 7 | girl.png downloaded
id: 8 | monarch.png downloaded
id: 9 | mountain.png downloaded
id: 10 | pool.png downloaded
id: 11 | peppers.png downloaded
id: 12 | serrano.png downloaded
id: 13 | tulips.png downloaded
All Images Downloaded In: 1.7112743854522705


In [2]:
# Download Images Using Multithreading
image_count = 0

def download_images(q, session=None): 
    global image_count
    
    if not session:
        session = requests.Session()
    
    while not q.empty():     
        # While q not empty, try to connect to next queue link
        try:
            r = session.get(q.get(block=False)) 
            image_count += 1
            q.task_done()
        # If image unable to download or some other error, increase count
        # Anyways becuase gone from queue
        except(requests.exceptions.RequestException, UnicodeError) as e:
            print(e)
            image_count += 1
            q.task_done()
            continue    
        # Save the image to desired directory
        with open(os.path.join(os.getcwd(), f'img\\image_{image_count}.png'), 'wb') as f:
            f.write(r.content)    

image_names = ['airplane', 'arctichare', 
               'baboon', 'boat',
               'cat', 'fruits', 
               'frymire', 'girl', 
               'monarch', 'mountain', 
               'pool', 'peppers', 
               'serrano','tulips']
image_links = []

# 1. Create the links from the image names and store in image_links list
for i, name in enumerate(image_names):
    image_links.append(f'https://homepages.cae.wisc.edu/~ece533/images/{name}.png')

# 2. Create queue of image links FIFO (doesn't matter here)
q = queue.Queue()

# 3. Populate queue with the links (note, step 1 and this could be combined)
#    Done this way for the sake of clarity
for i, link in enumerate(image_links):
    q.put(link)
       
threads = []
start = time.time()

# 4. Create the threads (here 6), set targe to download_images function
#    with the queue of links as the function argument
for i in range(6):
    t = threading.Thread(target = download_images, args = (q,))
    threads.append(t)
    t.start()

q.join() #DIAL THIS, WHY JOINED???? QUEUE

for t in threads:
    t.join()
    print(t.name, 'has joined')
    
end = time.time()
print(f'images downloaded in: {end - start}')

Thread-6 has joined
Thread-7 has joined
Thread-8 has joined
Thread-9 has joined
Thread-10 has joined
Thread-11 has joined
images downloaded in: 0.837482213973999
