In [None]:
#Introduce delays in programs intentionally so that we could switch to other threads
import time
import sys
print(time.time())
print(time.ctime(time.time()))
time.sleep(5)#Simulate thread being idle
print("End")

In [9]:
from threading import *
import time

def print_time(tname,counter,delay):
    for i in range(counter):
        print("%s %s"%(tname,time.ctime(time.time())))
        time.sleep(delay)

t1=Thread(target=print_time,args=("t1",5,4))
t2=Thread(target=print_time,args=("t2",3,2))

t1.start()
t2.start()

t2 Tue May 10 15:41:16 2022
t1 Tue May 10 15:41:18 2022
t2 Tue May 10 15:41:18 2022
t2 Tue May 10 15:41:20 2022
t1 Tue May 10 15:41:22 2022
t1 Tue May 10 15:41:26 2022
t1 Tue May 10 15:41:30 2022
t1 Tue May 10 15:41:34 2022


1. Multithreading allows python to run many units of work in parallel
2. Unit of work is a function
3. GIL - Global Interpreter Lock in python is a feature that makes sure only one thread executes at one instant of time by the interpreter
4. So in python, how many ever threads you create and how many ever processors you have, only one thread is executing at one instatnt of time
5. When one thread is idle (say waiting for a network resource, user action etc), other thread could start executing
6. So it is juggling between units of work

# Method 1 - threading module - threads creation normal

In [None]:
from threading import *
import time
# Step 1 - Define a function/work for the thread
def print_time(threadName, counter, delay):
    count = 0
    while count < counter:
        time.sleep(delay)
        count += 1
        print ("%s: %s" % (threadName, time.ctime(time.time())))
# Step 2 - Create threads as follows
t1 = Thread(target=print_time, args=("Thread-1",3, 2, ))
#target is the work done by the thread
#args is the arguments requried by the function
t2 = Thread(target=print_time, args=("Thread-2", 4, 4,))
t1.start()#Step 3 - Start the thread
t2.start()


## Method 2 - threading module - threads creation OOP way

In [None]:
import threading
import time
def print_time(threadName, counter, delay):
    while counter:
      time.sleep(delay)
      print ("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1
class myThread (threading.Thread):#Step 1 - Subclass the threading class
    def __init__(self, name, counter,delay):#Step 2 - Call parent class constructor in child
        threading.Thread.__init__(self)
        self.name = name
        self.counter = counter
        self.delay = delay
    def run(self):#Step 3 - Override the run method
        print( "Starting " + self.name)
        print_time(self.name, self.counter, self.delay)
        print ("Exiting " + self.name)
# Step 4 - Create new threads
thread1 = myThread("Thread-1", 2,2)
thread2 = myThread("Thread-2", 3,1)
# Step 5 - Start new Threads
thread1.start()
thread2.start()
#Parent Wait till all children finish
thread1.join()
thread2.join()
print ("Exiting Main Thread")

# Method 3 - multiprocessing module

#multiprocessing is a package that supports spawning processes using an API similar to the threading module
#offers both local and remote concurrency 
#Python's global interpreter lock (GIL) will make sure than only one thread is executed by python interpreter
#This means even when multiple processors are available threads in Python can use only one processor to maintain concurrency
#In multiprocessing each thread or parallel work is created as a separate process there by utilizing multiple processors

#multiprocessing does not work in older versions of Jupyter notebook
#Add python installtion directory to windows path variable
#Just copy paste each of the cell's content to a file and save with .py extension
#In windows command prompt(cmd), navigate to the folder that has the python file and issue

In [None]:
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

In [None]:
from multiprocessing import Process
def worker(num):
    print('Worker:', num)
    for i in range(num):
        print(i)


#If this source file is the main program, 
#interpreter sets the special __name__ variable a value “__main__”. 
#If this file is being imported from another module, 
#__name__ will be set to the module’s name. 

#This line is manadatory for multiprocessing
if __name__ == '__main__':
    for i in range(3):
        p = Process(target=worker, args=(i+2,))
        p.start()

1. On using threads, many threads could access/write to a common file.
2. Python supports some synchronization primitives to avoid multiple threads accessing a common resource
3. We see synchronization primitives like - locks, Rlock, semaphore and events

# Synchronization using locks

1. Lock is the simplest synchronization primitive
2. A Lock has only two states — locked and unlocked. 
3. It is created in the unlocked state and has two principal methods — acquire() and release().

In [1]:
# Synchronizing thread
import threading, time
def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1
class myThread (threading.Thread):
    def __init__(self, name, counter,delay):
        threading.Thread.__init__(self)
        self.name = name
        self.counter = counter
        self.delay = delay  
    def run(self):
        print ("Starting " + self.name)
        # Get lock to synchronize printing
        #threadLock.acquire()
        print_time(self.name, self.counter, self.delay)
        # Free lock to release next thread
        #threadLock.release()
threadLock = threading.Lock()
threads = []
# Create new threads
thread1 = myThread("Thread-1", 1,3)
thread2 = myThread("Thread-2", 2,5)
# Start new Threads
thread1.start()
thread2.start()
# Add threads to thread list
threads.append(thread1)
threads.append(thread2)
# Wait for all threads to complete
for t in threads:
    t.join()
print ("Exiting Main Thread")

Starting Thread-1
Starting Thread-2
Thread-1: Mon May 30 20:43:57 2022
Thread-1: Mon May 30 20:43:58 2022Thread-2: Mon May 30 20:43:58 2022

Thread-1: Mon May 30 20:43:59 2022
Thread-2: Mon May 30 20:44:00 2022
Thread-2: Mon May 30 20:44:02 2022
Thread-2: Mon May 30 20:44:04 2022
Thread-2: Mon May 30 20:44:06 2022
Exiting Main Thread


# Synchronization using Rlock

1. The standard Lock doesn’t know which thread is currently holding the lock. 
2. If the lock is held, any thread that attempts to acquire it will block, even if the same thread itself is already holding the lock.
3. In such cases, RLock (re-entrant lock) is used. 
4. RLocks can prevent unwanted blocking.


In [None]:
1. Example - Do not execute

lock = threading.Lock()

lock.acquire()
num += 1
lock.acquire() # This will block.
num += 2
lock.release()

In [None]:
lock = Threading.RLock()

lock.acquire()
num += 3
lock.acquire() # This won’t block.
num += 4
lock.release()
lock.release() # You need to call release once for each call to acquire.

In [None]:
# Synchronizing thread
import threading, time
def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        threadLock.acquire()
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1
        threadLock.release()
class myThread (threading.Thread):
    def __init__(self, name, counter,delay):
        threading.Thread.__init__(self)
        self.name = name
        self.counter = counter
        self.delay = delay  
    def run(self):
        print ("Starting " + self.name)
        # Get lock to synchronize printing
        threadLock.acquire()
        print_time(self.name, self.counter, self.delay)
        # Free lock to release next thread
        threadLock.release()
threadLock = threading.Lock()
threads = []
# Create new threads
thread1 = myThread("Thread-1", 1,3)
thread2 = myThread("Thread-2", 2,5)
# Start new Threads
thread1.start()
thread2.start()
# Add threads to thread list
threads.append(thread1)
threads.append(thread2)
# Wait for all threads to complete
for t in threads:
    t.join()
print ("Exiting Main Thread")

In [None]:
import threading

class Foo(object):
    lock = threading.RLock()
    def __init__(self):
        self.x = 0
    def add(self,n):
        with Foo.lock:
            self.x += n
    def incr(self):
        with Foo.lock:
            self.add(1)
    def decr(self):
        with Foo.lock:
            self.add(-1)
def adder(f,count):
    while count > 0:
        f.incr()
        count -= 1
    print(f.x)
def subber(f,count):
    while count > 0:
        f.decr()
        count -= 1
    print(f.x)
# Create some threads and make sure it works
COUNT1 = 15
COUNT2 = 10
f = Foo()
t1 = threading.Thread(target=adder,args=(f,COUNT1))
t2 = threading.Thread(target=subber,args=(f,COUNT2))
t1.start()
t2.start()
t1.join()
t2.join()


# Synchronization using semaphore

1. A semaphore is a synchronization construct.
2. Semaphore provides threads with synchronized access to a limited number of resources.
3. A semaphore is just a variable. The variable reflects the number of currently available resources. For example, a parking lot with a display of number of available slots on a specific level of a shopping mall is a semaphore.
A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().
4. The value of semaphore cannot go less than zero and greater then the total number of the available resources.
5. The semaphore is associated with two operations – acquire and release.

In [None]:
# An example python program using semaphore provided by the python threading module
import threading
import time
parkedLock      = threading.Lock()
removedLock     = threading.Lock()
#Counters for total number of parkings and removals
parked          = 0
removed         = 0
#Semaphore
availbleParkings = threading.Semaphore(5)#Only five parking slots available

def ParkCar(): #Parking thread will execute this work
        availbleParkings.acquire()#Decremented by one
        global parkedLock
        parkedLock.acquire()#Acquire lock so that no other thread simultaneously modifies the parkedLock global variable
        global parked
        parked = parked+1
        parkedLock.release()
        print("Parked: %d"%(parked))      
def RemoveCar(): #Removing thread will execute this work
        availbleParkings.release()#Incremented by one
        global removedLock
        removedLock.acquire()
        global removed
        removed = removed+1
        removedLock.release()
        print("Removed: %d"%(removed))       
# Thread that simulates the entry of cars into the parking lot
def parkingEntry():
    # Creates multiple threads inside to simulate cars that are parked
    for i in range(6):
        time.sleep(1)
        incomingCar = threading.Thread(target=ParkCar)
        incomingCar.start()

# Thread that simulates the exit of cars from the parking lot
def parkingExit():
    # Creates multiple threads inside to simulate cars taken out from the parking lot
    for j in range(5):
        time.sleep(15)
        outgoingCar = threading.Thread(target=RemoveCar)
        outgoingCar.start()

# Start the parking eco-system
parkingEntryThread      = threading.Thread(target=parkingEntry)
parkingExitThread       = threading.Thread(target=parkingExit)
parkingEntryThread.start()
parkingExitThread.start()

In [None]:
import threading
import time

done = threading.Semaphore(0)
item = None

def producer():
    global item
    print("I'm the producer and I produce data.")
    print("Producer is going to sleep.")
    time.sleep(10)
    item = "Hello"
    print("Producer is alive. Signaling the consumer.")
    done.release()
def consumer():
    print( "I'm a consumer and I wait for data.")
    print( "Consumer is waiting.")
    done.acquire()#Blocks till some thread calls release
    print( "Consumer got", item)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()


# Synchronization using event

1. This is one of the simplest mechanisms for communication between threads
2. One thread signals an event and other threads wait for it.
3. An event object manages an internal flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true.

In [1]:
import threading
import time
item = None
# A semaphore to indicate that an item is available
available = threading.Semaphore(0)
# An event to indicate that processing is complete
completed = threading.Event()
# A worker thread
def worker():
    while True:
        available.acquire()
        print( "worker: processing", item)
        time.sleep(5)
        print( "worker: done")
        completed.set()#Objects flag set to true
# A producer thread
def producer():
    global item
    for x in range(5):
        completed.clear()       # Clear the event
        item = x                # Set the item
        print( "producer: produced an item")
        available.release()     # Signal on the semaphore
        completed.wait()#Blocks until flag is true
        print( "producer: item was processed")
t1 = threading.Thread(target=producer)
t1.start()
t2 = threading.Thread(target=worker)
t2.setDaemon(True)
t2.start()

producer: produced an item
worker: processing 0

  t2.setDaemon(True)



worker: done
producer: item was processed
producer: produced an item
worker: processing 1
worker: done
producer: item was processed
producer: produced an item
worker: processing 2
worker: done
producer: item was processed
producer: produced an item
worker: processing 3
worker: done
producer: item was processed
producer: produced an item
worker: processing 4
worker: done
producer: item was processed


# Queues - Producer and Consumer Patterns

In [None]:
1. Follows FIFO
2. One type of thread(producer) writes to one end of a queue
3. Another type of thread called consumer reads from other end of the queue
4. All reads and writes are thread safe on its own, so explicit synchronization like locks, rlocks not required.

In [None]:
import threading
import queue
import time
no_of_producers = 2
no_of_consumers = 5
workQueue = queue.Queue(maxsize=10)#Create a queue
inp_to_queue = [[1,2,3],['one','two','three','foura']]
def producer(threadName,indx): #Producer thread work
    lst = inp_to_queue[indx]
    for word in lst:
        workQueue.put(word)#Insert into queue
def consumer(threadName):#Producer thread work
    if not workQueue.empty():
        data = workQueue.get()#Read from queue FIFO
        print ("%s processing %s" % (threadName, data))
        time.sleep(1)
threads_prod = []
threads_cons = []
for i in range(no_of_producers):
    tp = threading.Thread(target = producer, args = ("Producer Thread", i))
    tp.start()
    threads_prod.append(tp)
for j in range(no_of_consumers):
    tc = threading.Thread(target = consumer, args = ("Consumer Thread",))
    tc.start()
    threads_cons.append(tc)

# Same queue program above using OOP

In [None]:
import queue
import threading
import time

def put_data(threadName,indx):
    lst = inp_to_queue[indx]
    for word in lst:
        workQueue.put(word)

class putThread (threading.Thread):
    def __init__(self, lstID, name):
        threading.Thread.__init__(self)
        self.threadID = lstID
        self.name = name
    def run(self):
        print ("Starting " + self.name+str(self.threadID))
        put_data(self.name,self.threadID)
        #print ("Exiting " + self.name)

def get_data(threadName):
    if not workQueue.empty():
        data = workQueue.get()
        print ("%s processing %s" % (threadName, data))
        time.sleep(1)

class getThread (threading.Thread):
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
    def run(self):
        print ("Starting " + self.name+str(self.threadID))
        get_data(self.name)
        #print ("Exiting " + self.name)

In [None]:
no_of_producers = 2
no_of_consumers = 4

workQueue = queue.Queue(maxsize=10)

inp_to_queue = [['apple','orange','guava'],['rose','jasmine','lilly']]
threads=[]

for i in range(no_of_producers):
    th_inp = putThread(i,"Put")
    threads.append(th_inp)
    th_inp.start()

for j in range(no_of_consumers):
    th_op = getThread(j,"Get")
    threads.append(th_op)
    th_op.start()

# Wait for all threads to complete
for t in threads:
    t.join()
print ("Exiting Main Thread")