# Python Synchronization

## Threading and Synchronization

Threading in python can be achieved by 2 modules:

- `_thread`: Old methods of threading
- `threading`: New threading methods

Alongside threading, these modules also include synchronization objects: locks, semaphores, and events.

In [1]:
import _thread, time

Starting a new thread is done using `start_new_thread`, Given a function and some parameters as parameter, it will execute the function using available threads

In [2]:
def MyPrint(sleepPeriod,name,count):
    for i in range(0,count):
        print (name+"=>"+str(i))
        time.sleep(sleepPeriod)

#main thread
_thread.start_new_thread (MyPrint, (1,"Thread #1", 3))
_thread.start_new_thread (MyPrint, (2,"Thread #2", 2))
_thread.start_new_thread (MyPrint, (3,"Thread #3", 1))
time.sleep(10)

Thread #1=>0
Thread #3=>0
Thread #2=>0
Thread #1=>1
Thread #1=>2
Thread #2=>1


Locks are used in case objects are not thread-safe

In [3]:
lock = _thread.allocate_lock()
def MyPrint(sleepPeriod,name,count):
    global lock
    for i in range(0,count):
        lock.acquire()
        print (name+"=>"+str(i))
        lock.release()
        time.sleep(sleepPeriod)

_thread.start_new_thread (MyPrint, (1,"Thread #1", 3))
_thread.start_new_thread (MyPrint, (2,"Thread #2", 3))
_thread.start_new_thread (MyPrint, (3,"Thread #3", 4))
time.sleep(10)

Thread #1=>0
Thread #3=>0
Thread #2=>0
Thread #1=>1
Thread #2=>1
Thread #1=>2
Thread #3=>1
Thread #2=>2
Thread #3=>2
Thread #3=>3


Locks can be used with a with statement (In case acquire and release are called in `__enter__` and `__exit__` code)

In [4]:
lock = _thread.allocate_lock()
def MyPrint(sleepPeriod,name,count):
    global lock
    for i in range(0,count):
        with lock:
            print (name+"=>"+str(i))
        time.sleep(sleepPeriod)

_thread.start_new_thread (MyPrint, (1,"Thread #1", 3))
_thread.start_new_thread (MyPrint, (2,"Thread #2", 3))
_thread.start_new_thread (MyPrint, (3,"Thread #3", 4))
time.sleep(10)

Thread #1=>0
Thread #3=>0
Thread #2=>0
Thread #1=>1
Thread #2=>1
Thread #1=>2
Thread #3=>1
Thread #2=>2
Thread #3=>2
Thread #3=>3


Locks are used to wait for a thread to finish

In [5]:
lock = _thread.allocate_lock()
lock.acquire()
def MyPrint(sleepPeriod,name,count):
    global lock
    for i in range(0,count):
        print (name+"=>"+str(i))
        time.sleep(sleepPeriod)
    lock.release()

_thread.start_new_thread (MyPrint, (1,"Thread #1", 3))
print ("Waiting for a thread to finish ...")
lock.acquire()
print ("Thread finished")

Waiting for a thread to finish ...
Thread #1=>0
Thread #1=>1
Thread #1=>2
Thread finished


Exceptions not caught in a different thread than the main thread will NOT stop the program

In [6]:
def MyPrint(sleepPeriod,name,count):
    global lock
    for i in range(-count,count):
        print (name+"=>"+str(10/i))
    time.sleep(sleepPeriod)
_thread.start_new_thread (MyPrint, (1,"Thread #1", 3))
for i in range(0,10):
    print ("Main thread : "+str(i))
    time.sleep(1)

Exception ignored in thread started by <function MyPrint at 0x7fdd50184900>:
Traceback (most recent call last):
  File "/tmp/ipykernel_77242/3194620473.py", line 4, in MyPrint
ZeroDivisionError: division by zero


Main thread : 0
Thread #1=>-3.3333333333333335
Thread #1=>-5.0
Thread #1=>-10.0
Main thread : 1
Main thread : 2
Main thread : 3
Main thread : 4
Main thread : 5
Main thread : 6
Main thread : 7
Main thread : 8
Main thread : 9


**Using Threading**

The `threading` module uses high level function for threads and synchronization.

The class `Thread` is used to derive a base thread object. When Deriving a Thread the following methods must be implemented
- `run()` What is running when the thread starts
- `__init__()` Constructor

A `Thread` class also implements the following methods

- `start()`: starts a thread
- `join()`: waits until the thread terminates
- Getters and setters for name attribute of the thread
- `is_alive()`: returns True if the thread is alive, False otherwise`

In [7]:
import threading

In [8]:
def WaitSomeSeconds(seconds):
    time.sleep(seconds)

t = threading.Thread(target=WaitSomeSeconds, args = (5,))
t.start()
print("Wait for the thread to complete ...")
t.join()

Wait for the thread to complete ...


In [9]:
def WaitSomeSeconds(seconds,x,y):
    time.sleep(seconds)
    print(x+y)

t = threading.Thread(target=WaitSomeSeconds, args = (5,10,20))
t.start()
print("Wait for the thread to complete ...")
t.join()

Wait for the thread to complete ...
30


Using a subclass of thread

In [11]:
class MyThread(threading.Thread):
    def __init__(self,seconds):
        threading.Thread.__init__(self)
        self.seconds = seconds
    def run(self):
        time.sleep(self.seconds)

In [12]:
t = MyThread(3)
t.start()
print("Wait for the thread to complete ...")
t.join()

Wait for the thread to complete ...


## Synchronization

`threading` modules uses the following synchronization objects:

- lock
- rlock
- Condition object
- Semaphore
- Event
- Timer
- Barrier

**locks**

Allows synchronized access to resource

- acquire: How long the lock has to wait till acquired
- release: Releases the lock (Will return an error if we call an unlocked lock)

In [14]:
l = threading.Lock()
def ThreadFnc(lock,n_list,start):
    for i in range(0,10):
        lock.acquire()
        n_list+=[start+i]
        lock.release()
        time.sleep(1)

lst = []
t1 = threading.Thread(target=ThreadFnc, args=(l,lst,100))
t2 = threading.Thread(target=ThreadFnc, args=(l,lst,1000))
t1.start ()
t2.start ()
t1.join ()
t2.join ()

print(lst)

[100, 1000, 101, 1001, 102, 1002, 103, 1003, 104, 1004, 105, 1005, 1006, 106, 107, 1007, 1008, 108, 1009, 109]


With `with` block

In [15]:
l = threading.Lock()
def ThreadFnc(lock,n_list,start):
    for i in range(0,10):
        with lock: n_list+=[start+i]
        time.sleep(1)

lst = []
t1 = threading.Thread(target=ThreadFnc, args=(l,lst,100))
t2 = threading.Thread(target=ThreadFnc, args=(l,lst,1000))
t1.start ()
t2.start ()
t1.join ()
t2.join ()

**rlocks**

Reentrant locks: Same thread can lock a resource multiple times

- Acquire and release
- Also support with block

In [16]:
l = threading.Lock() # Program will never end, a lock was put on thread1 when thread 2 calls it's function. This causes a deadlock
def ThreadFnc1(lock):
    with lock: print("fnc_1_called")
def ThreadFnc2(lock):
    with lock:
        print("fnc_2_called")
        ThreadFnc1(lock)

t1 = threading.Thread(target=ThreadFnc1, args=(l,))
t2 = threading.Thread(target=ThreadFnc2, args=(l,))
t1.start ()
t2.start ()
t1.join ()
t2.join ()

fnc_1_called
fnc_2_called


KeyboardInterrupt: 

In [17]:
l = threading.RLock()
def ThreadFnc1(lock):
    with lock: print("fnc_1_called")
def ThreadFnc2(lock):
    with lock:
        print("fnc_2_called")
        ThreadFnc1(lock)

t1 = threading.Thread(target=ThreadFnc1, args=(l,))
t2 = threading.Thread(target=ThreadFnc2, args=(l,))
t1.start ()
t2.start ()
t1.join ()
t2.join ()

fnc_1_called
fnc_2_called
fnc_1_called


**Condition Object**

A notification system for all systems based on a condition

- acquire
- release
- wait
- wait_for (Python3)
- notify
- notify_all

Also support workings with `with` blocks

In [18]:
c = threading.Condition()
number = 0
def ThreadConsumer():
    global number,c
    with c:
        if number==0: c.wait()
        print("Consume: "+str(number))
        number = 0

def ThreadProducer():
    global number,c
    with c:
        time.sleep(2)
        number = 5
        c.notify()

t1 = threading.Thread(target=ThreadConsumer)
t2 = threading.Thread(target=ThreadProducer)
t1.start ()
t2.start ()
t1.join ()
t2.join ()

Consume: 5


In [19]:
c = threading.Condition()
number = 0
def ThreadConsumer():
    global number,c
    with c:
        c.wait_for(lambda: number!=0)
        print("Consume: "+str(number))
        number = 0
def ThreadProducer():
    global number,c
    with c:
        time.sleep(2)
        number = 5
        c.notify()

t1 = threading.Thread(target=ThreadConsumer)
t2 = threading.Thread(target=ThreadProducer)
t1.start ()
t2.start ()
t1.join ()
t2.join ()

Consume: 5


**Semaphores**

Access to a limited nr of threads over a resource

- acquire, release methods
- support `with` blocks

In [20]:
s = threading.Semaphore(4)
def WorkerThread(id):
    global s
    with s:
        print("Thread-#"+str(id)+" enter")
        time.sleep(1)
        print("Thread-#"+str(id)+" exit")

t = []
for i in range(0,10):
    t += [threading.Thread(target=WorkerThread, args=(i,))]

for _th in t: _th.start ()
for _th in t: _th.join ()

Thread-#0 enter
Thread-#1 enter
Thread-#2 enter
Thread-#3 enter
Thread-#0 exit
Thread-#1 exit
Thread-#5 enter
Thread-#2 exit
Thread-#4 enter
Thread-#3 exit
Thread-#6 enter
Thread-#7 enter
Thread-#5 exit
Thread-#8 enter
Thread-#4 exit
Thread-#9 enter
Thread-#6 exit
Thread-#7 exit
Thread-#8 exit
Thread-#9 exit


**Timer**

Object deriver from Thread. Allows for a specified code to run at a specific time. Timer also has the cancell method to stop the time

In [21]:
def TimerFunction(mesaj):
    print (mesaj)

timer = threading.Timer(5,TimerFunction,("test after 5 seconds",))
timer.start()
timer.join()
print("Done")

Done


**Event**

Method to synchronize execution between 2 or more threads

- set -> signal the current state
- clear -> clear the current event state
- wait -> Wait till an event is signaled
- is_set -> Check if the event is signaled

Cannot be used with `with` blocks
For 2 threads, 2 events are used

In [22]:
e1 = threading.Event()
e2 = threading.Event()
e1.set()
def AddNumber(start,event1,event2,lista):
    for i in range(start,10,2):
        event1.wait()
        event1.clear()
        lista += [i]
        event2.set()

l = []
t1 = threading.Thread(target=AddNumber, args=(1,e1,e2,l))
t2 = threading.Thread(target=AddNumber, args=(2,e2,e1,l))
t1.start()
t2.start()
t1.join()
t2.join()
print (l)

[1, 2, 3, 4, 5, 6, 7, 8, 9]


**Barrier**

Mechanism that imposes a wait time for multiple threads to start at the same time

- wait -> Wait till the number
- reset -> Reset the barrier
- abort -> Abort current barrier
- parties -> nr of threads that pass the barrier

Cannot use with blocks
Python3 Exclusive

In [23]:
b = threading.Barrier(2)

def WorkerThread(b,id):
    b_id = b.wait()
    print("#"+str(id)+" pass the barier => "+str(b_id))
    time.sleep(2)
    print("#"+str(id)+" exit")

t = []
for i in range(0,10):
    t += [threading.Thread(target=WorkerThread, args=(i,))]

for _th in t: _th.start ()
for _th in t: _th.join ()

Exception in thread Thread-46 (WorkerThread):
Exception in thread Thread-47 (WorkerThread):
Exception in thread Thread-48 (WorkerThread):
Exception in thread Thread-49 (WorkerThread):
Exception in thread Thread-50 (WorkerThread):
Exception in thread Thread-51 (WorkerThread):
Traceback (most recent call last):
  File [35m"/usr/lib/python3.13/threading.py"[0m, line [35m1043[0m, in [35m_bootstrap_inner[0m
    [31mself.run[0m[1;31m()[0m
    [31m~~~~~~~~[0m[1;31m^^[0m
  File [35m"/home/mrbogdanovich/git/Python_Works/.venv/lib/python3.13/site-packages/ipykernel/ipkernel.py"[0m, line [35m788[0m, in [35mrun_closure[0m
    [31m_threading_Thread_run[0m[1;31m(self)[0m
    [31m~~~~~~~~~~~~~~~~~~~~~[0m[1;31m^^^^^^[0m
  File [35m"/usr/lib/python3.13/threading.py"[0m, line [35m994[0m, in [35mrun[0m
    [31mself._target[0m[1;31m(*self._args, **self._kwargs)[0m
    [31m~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
[1;35mTypeError[0m: [35mWorkerThread