# Monitors

A monitor is an object that only one thread can execute methods of this object at a time.

In Java Threads there is a `synchronized` modifier making a method a monitor like mutually exclusive.

In python you can use mutexes in the class to achieve that.

In [None]:
from threading import Lock,Thread,Condition,RLock,Semaphore

class Monitor:
    def __init__(self):
        self.mutex = RLock()
        self.count = 0
    
    def increment(self):
        with self.mutex:
            self.count += 1
    
    def get(self):
        with self.mutex:
            return self.count

        
        

In [6]:
def synchronized(method,*p,**kw):
    def f(self,*p,**kw):
        with self.mutex:
            method(self,*p,**kw)
    return f

class Monitor:
    def __init__(self):
        self.mutex = RLock()
        self.count = 0
    @synchronized
    def increment(self):
        self.count += 1
    @synchronized
    def get(self):
        return self.count

In [7]:
a = Monitor()
a.increment()
a.get()

In [9]:
def f(mon):
    for i in range(10000):
        mon.increment()

m = Monitor()

t1 = Thread(target=f, args=(m,))
t2 = Thread(target=f, args=(m,))
t3 = Thread(target=f, args=(m,))
t4 = Thread(target=f, args=(m,))

t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print(m.get())


40000


In [1]:
class Queue:
    def __init__(self,capacity):
        self.capacity = capacity
        self.queue = []
        self.mutex = RLock()
        self.notempty = Lock(0)
        
    def empty(self):
        return len(self.queue) == 0
    
    def full(self):
        return len(self.queue) == self.capacity
    
    def enqueue(self,val):
        with self.mutex:
            while self.full():
                self.mutex.release()
                time.sleep(1)
                self.mutex.acquire()
            self.queue.append(val)
            if len(self.queue) == 1:
                self.notempty.release()
            
    def dequeue(self):
        with self.mutex:
            while self.empty():
                self.mutex.release()
                self.notempty.acquire()
                self.mutex.acquire()
            a=self.queue[0]
            del self.queue[0]

# Producer Consumer 

* A queue which is accessed by two (or more) threads. One end a producer thread inserts items, the other end, consumer thread removes and processes the items.
* They work in an infinite loop.
* If queue is empty or full?
* In full and empty cases, they need to check it until queue has an empty slot or has an item respectively. Polling in an infinite loop wastes too much CPU
* Busy waiting is not a good idea:
```python
   while queue.empty():
        time.sleep(1)  # response time will be slow
        pass
```
* Use synchronization methods semaphores or similar to make other end know that queue is ready (not full or not empty)

# Condition Variables

* In a monitor, condition variables let threads to signal each other while keeping the monitor semantics (only one thread inside).
 
```python
c = Condition(mutex)
c.wait()
```
```wait``` does:
```python
c.mutex.release()
# block on condition
# when unblocked:
c.mutex.acquire()
```

Typical usage:
```python
c.acquire() # or acquire mutex of c on construction
.....
while actual condition:
     c.wait()
```

The notifier cannot guarantee that the condition holds semantically and notified thread can directly assume condition holds.

`c.notify()` will unblock one of the threads blocking on condition.

`c.notifyAll()` will unblock all of them. However they still wait on the mutex after unblocking. They enter monitor one at a time.
asdasd




In [14]:
import random
import time

class PCQueue:
    def __init__(self, capacity=10):
        self.mutex=RLock()
        self.queue = []
        self.capacity = capacity
        self.notempty = Condition(self.mutex)
        self.notfull = Condition(self.mutex)
    def empty(self):
        with self.mutex:
            return len(self.queue) == 0
    def full(self):
        with self.mutex:
            return len(self.queue) == self.capacity
    def enqueue(self,item):
        with self.mutex:
            while len(self.queue) == self.capacity:
                print("queue is full, waiting")
                self.notfull.wait()
            
            self.queue.append(item)
            self.notempty.notify()
            
    def dequeue(self):
        with self.mutex:
            while len(self.queue) == 0:
                print("queue is empty, waiting")
                self.notempty.wait()
                
            val = self.queue[0]
            del self.queue[0]
            self.notfull.notify()
            return val
            

def producer(pcq):
    for i in range(30):
        time.sleep(0.35+random.random()*0.15)
        pcq.enqueue(random.randint(0,100))
        print("enqueued")
    print("producer finished")
        
def consumer(pcq):
    for i in range(30):
        time.sleep(0.15+random.random()*0.2)
        print("dequeued ",pcq.dequeue())
    print("consumer finished")
        
q = PCQueue()

prod = Thread(target=producer, args=(q,))
cons = Thread(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()


queue is empty, waiting
enqueued
dequeued  67
queue is empty, waiting
enqueued
dequeued  76
queue is empty, waiting
enqueued
dequeued  98
queue is empty, waiting
enqueued
dequeued  52
queue is empty, waiting
enqueued
dequeued  20
queue is empty, waiting
enqueued
dequeued  85
queue is empty, waiting
enqueued
dequeued  99
queue is empty, waiting
enqueueddequeued  58

queue is empty, waiting
enqueued
dequeued  40
queue is empty, waiting
enqueued
dequeued  29
queue is empty, waiting
enqueueddequeued  5

queue is empty, waiting
enqueued
dequeued  17
queue is empty, waiting
enqueueddequeued  82

queue is empty, waiting
enqueueddequeued  3

queue is empty, waiting
enqueued
dequeued  28
queue is empty, waiting
enqueued
dequeued  81
queue is empty, waiting
enqueued
dequeued  45
queue is empty, waiting
enqueueddequeued  49

queue is empty, waiting
enqueued
dequeued  65
queue is empty, waiting
enqueued
dequeued  72
queue is empty, waiting
enqueueddequeued  35

queue is empty, waiting
enqueueddequ

In [15]:
prod = Thread(target=producer, args=(q,))
prod2 = Thread(target=producer, args=(q,))
cons = Thread(target=consumer, args=(q,))
cons2 = Thread(target=consumer, args=(q,))
prod.start()
cons.start()
prod2.start()
cons2.start()

prod.join()
cons.join()
prod2.join()
cons2.join()

queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  52
enqueued
dequeued  80
queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  35
enqueued
dequeued  55
queue is empty, waiting
queue is empty, waiting
enqueueddequeued  91

enqueued
dequeued  44
queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  10
enqueued
dequeued  0
queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  6
enqueued
dequeued  76
queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  51
enqueueddequeued  68

queue is empty, waiting
queue is empty, waiting
enqueueddequeued  70

enqueued
dequeued  90
queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  87
enqueued
dequeued  84
queue is empty, waiting
queue is empty, waiting
enqueueddequeued  87

enqueued
dequeued  2
queue is empty, waiting
enqueued
dequeued  19
queue is empty, waiting
enqueued
dequeued  87
queue is empty, waiting
queue is empty, waiting
enqueued
dequeued  49
enqueued
dequ

In [16]:
import multiprocessing as mp
import random
import time

class PCQueueMP:
    def __init__(self, capacity=10):
        self.mutex= mp.RLock()
        self.queue = mp.Array('d',capacity)
        self.capacity = capacity
        self.n = 0
        self.notempty = mp.Condition(self.mutex)
        self.notfull = mp.Condition(self.mutex)
    def empty(self):
        with self.mutex:
            return self.n == 0
    def full(self):
        with self.mutex:
            return self.n == self.capacity
    def enqueue(self,item):
        with self.mutex:
            while self.n >= self.capacity:
                print("queue is full, waiting")
                self.notfull.wait()
            
            self.queue[self.n] = item
            self.n += 1
            self.notempty.notify()
            
    def dequeue(self):
        with self.mutex:
            while self.n == 0:
                print("queue is empty, waiting")
                self.notempty.wait()
            
            val = self.queue[0]
            for j in range(0,self.n-1):
                self.queue[j] = self.queue[j+1]
            self.n -= 1
            self.notfull.notify()
            return val
            

def producer(pcq):
    for i in range(30):
        time.sleep(0.15+random.random()*0.15)
        pcq.enqueue(random.randint(0,100))
        print("enqueued")
    print("producer finished")
        
def consumer(pcq):
    for i in range(30):
        time.sleep(0.35+random.random()*0.2)
        print("dequeued ",pcq.dequeue())
    print("consumer finished")
        
q = PCQueueMP()

prod = mp.Process(target=producer, args=(q,))
cons = mp.Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()


enqueued
queue is empty, waiting
queue is empty, waitingenqueued

queue is empty, waitingenqueued

enqueuedqueue is empty, waiting

enqueuedqueue is empty, waiting

enqueuedqueue is empty, waiting

enqueuedqueue is empty, waiting

enqueuedqueue is empty, waiting

enqueuedqueue is empty, waiting

queue is empty, waitingenqueued

queue is full, waiting


Process Process-2:
Process Process-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)


KeyboardInterrupt: 

  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_2805928/2523583866.py", line 46, in producer
    pcq.enqueue(random.randint(0,100))
  File "/tmp/ipykernel_2805928/2523583866.py", line 53, in consumer
    print("dequeued ",pcq.dequeue())
                      ^^^^^^^^^^^^^
  File "/tmp/ipykernel_2805928/2523583866.py", line 23, in enqueue
    self.notfull.wait()
  File "/tmp/ipykernel_2805928/2523583866.py", line 33, in dequeue
    self.notempty.wait()
  File "/usr/lib/python3.11/multiprocessing/synchronize.py", line 261, in wait
    return self._wait_semaphore.acquire(True, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/synchronize.py", line 261, in wait
    return self._wait_semaphore.acquire(True, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
KeyboardInterrupt


# multiprocessing.Queue and Queue modules

```from multiprocessing import ..., Queue```

```from threading import ....
    import Queue
```

A synchronized object in shared memory. All blocking/unblocking is already implemented.

In [None]:
from multiprocessing import Queue,Process

q = Queue(10)

def producer(q):
    for i in range(100):
        time.sleep(0.15)
        q.put(i)
def consumer(q):
    for i in range(100):
        item = q.get()
        time.sleep(0.05)
        print(item)

prod=Process(target=producer, args=(q,))
cons=Process(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
cons.join()

# Process/Thread Pools

* `[i1, ... , iN]` items and apply `f()` to all of them in parallel to get `[f(i1), f(i2), ... f(N)]` as a result.
* creating `N` threads/process looks logical but resources are limited. `N == 4` it is ok but if `N == 10000`?.
* Instead create `M` processes and compute in groups of
 `M`.


In [None]:
from multiprocessing import Pool

pool = Pool(8)

def f(i):
    time.sleep(0.2+0.3*random.random())
    return i*i

g = pool.map(f, [i for i in range(100)])
print(g)

Implement your own pool?


# Deadlock and Dining Philosophers

In [21]:
from threading import *
from time import *
from random import *
STARTED = 0
THINKING = 1
HUNGRY = 2
EATING = 3
EXITTED = 4

stmess = "0?-*X"


class Philosopher(Thread):
    def __init__(self,id,forks,states, updated):
        Thread.__init__(self) # super().__init__()
        self.id = id
        self.left = forks[0]
        self.right = forks[1]
        self.states = states
        self.states[id] = STARTED
        self.term = False
        self.updated = updated
    def terminate(self):
        self.term = True
        
    def updatestate(self, stt):
        with self.updated:
            self.states[self.id] = stt
            self.updated.notify()
            
    def run(self):
        for i in range(10):
            if self.term:
                break
#            print self.id," is thinking"
            self.updatestate(THINKING)
        
            sleep(random()*1)
            self.updatestate(HUNGRY)
            
            #if self.id % 2 == 0:
            self.left.acquire()
            self.right.acquire()
            #else:
            #    self.right.acquire()
            #    self.left.acquire()
            self.updatestate(EATING)
            
#            print self.id," is eating"
            sleep(random()*4)
            self.left.release()
            self.right.release()
        
        self.updatestate(EXITTED)

print("Enter number of philosopher: ", end='')
n = int(input())

forks = [Lock() for i in range(n)]

phils = []

states = [0 for i in range(n)]

updated = Condition()

for i in range(n):
    phils.append( Philosopher(i,(forks[i],forks[(i+1)%n]),states, updated) )


for phil in phils:
    phil.start() 

while True:
    eflag = True
    for i in range(n):
        if states[i] != EXITTED:
            eflag = False
        print(stmess[states[i]],end='')
    print()
    if eflag:
        break
    try:
        with updated:
            updated.wait()
    except KeyboardInterrupt:
        for phil in phils:
            phil.terminate()

for phil in phils:
    phil.join() 




Enter number of philosopher: 3
???
??*
-?*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
*--
?-*
--*
-*?
-*-
*?-
??*
?-*
--*
-*?
*??
*?-
*--
?-*
--*
-*X
*XX
XXX


# Synchronizing/Watching a Thread/Process

* Have a condition variable for synchronization.
* Send it to Thread/Process
* In the watcher wait for it
* When the model/state changes in thread/process, notify the condition variable.

Call a function in a thread asynchronously (assume there are multiple threads, join() only joins one of them):
```python
def f(x):
    return x*x

def call(c):
    with c[3]:
        c[0] = c[1](c[2])
        c[3].notify()

c = Condition()
# (result, function, input, condition)
result=[None, f, 15, c]
t = Thread(target=call, args=(result))
with c:
    c.wait()
```


In [23]:
from threading import Thread,Condition,Lock
import time

class AsyncCall(Thread):
    def __init__(self,func,args):
        super().__init__()
        self.func = func
        self.args=args
        self.cond=Condition()
        self.ready = False
        self.start()
    def run(self):
        self.value = self.func(self.args)
        with self.cond:
            self.ready = True
            self.cond.notify_all()
    def wait(self):
        with self.cond:
            while not self.ready:
                self.cond.wait()

def f(x):
    time.sleep(3)
    return x*x

c = AsyncCall(f,10000)
print("I can do usefull stuff here...")
c.wait()
print(c.value)

I can do usefull stuff here...
100000000


# Concurrency Overview 

* Watch race conditions! Use locks/semaphores to protect them
* Watch deadlocks. Be careful when holding a lock and try to acquire another.
* Never make assumptions about timing!. Timing of a thread becoming ready, calling some heavy function. OS/|PL scheduler can behave undeterministically.
* Never busy wait!
* Use monitors and condition variables when you need higher level abstractions of synchronization.
  a monitor queue for producer consumer
* Be careful about if your data is shared or not!
  multiprocessing: not shared, use Value/Array/Queue
  threading: all globals and **object** parameters are shared
* Be careful about Global Interpreter Lock:
   If task is I/O intensive threading should work. but if it has cpu intensive mostly -> no parallelism.
   threading: lightweight, shared variables default, easy to manage but worse parallelism
   multiprocessing: parallel, but more expensive, needs explicit shared variables
* If a process/thread has behavior, implement as a derived class
  ```python
     class myclass(Process):   or class myclass(Thread)
  ```
  call `super().__init__()` in constructor
  override `run()` method.
  if a simple function, just start it.
* If only synchronization is required, your classed can be anything, implement a monitor

In [30]:
import time
import random

class ParMap:
    def __init__(self, func):
        self.function = func
    
    @staticmethod    
    def f( func, res, args):
        if type(args) == tuple:
            res[0] = func(*args)
        else:
            res[0] = func(args)
        
    def __call__(self, arglist):
        results = [[None] for a in arglist]
        threads = [Thread(target=self.f, 
                    args = (self.function, results[i], arglist[i])) 
                               for i in range(len(arglist))]
        
        for t in threads: t.start()
        for t in threads: t.join()
        return [r[0] for r in results]

def f(a,b):
    time.sleep(random.random()*3)
    return (a*a+b)

p = ParMap(f)

p([(1,2),(3,4),(9,6),(7,9)])

[3, 13, 87, 58]

In [38]:
from threading import *
class RWLock:
    def __init__(self):
        self.mut = RLock()
        self.rcnt, self.wcnt = 0, 0
        self.canread, self.canwrite = Condition(self.mut), Condition(self.mut)
        
    def racquire(self):
        with self.mut:
            while self.wcnt > 0:
                self.canread.wait()
            self.rcnt += 1       
    def rrelease(self):
        with self.mut:
            self.rcnt -= 1
            # let writers in if i am the last reader
            if self.rcnt == 0:
                self.canwrite.notify()
    def wacquire(self):
        with self.mut:
            while self.rcnt > 0 or self.wcnt > 0:
                self.canwrite.wait()
            self.wcnt += 1
    def wrelease(self):
        with self.mut:
            self.wcnt -= 1
            self.canwrite.notify()
            self.canread.notify_all()
            
    

In [40]:
import random
import time

def readorwrite(name, n, rw):
    for i in range(n):
        if random.random() < 0.9:
            rw.racquire()
            print(name, 'reading', rw.rcnt, rw.wcnt)
            time.sleep(random.random()*2+0.01)
            rw.rrelease()
            print(name, 'stopped reading')
        else:
            rw.wacquire()
            print(name, 'writing' , rw.rcnt, rw.wcnt)
            time.sleep(random.random()*2+0.01)
            rw.wrelease()
            print(name, 'stopped writing')
            
RW = RWLock()

thrds = [Thread(target=readorwrite, args=(n,10,RW)) for n in "ABCDEFG"]

for t in thrds: t.start()
for t in thrds: t.join()

A reading 1 0
B reading 2 0
C reading 3 0
D reading 4 0
E reading 5 0
F reading 6 0
G reading 7 0
F stopped reading
F reading 7 0
E stopped reading
E reading 7 0
A stopped reading
A reading 7 0
B stopped reading
B reading 7 0
D stopped reading
D reading 7 0
E stopped reading
E reading 7 0
G stopped reading
G reading 7 0
C stopped reading
C reading 7 0
A stopped reading
A reading 7 0
F stopped reading
F reading 7 0
D stopped reading
D reading 7 0
B stopped reading
B reading 7 0
E stopped reading
E reading 7 0
D stopped reading
D reading 7 0
B stopped reading
B reading 7 0
D stopped reading
D reading 7 0
G stopped reading
C stopped reading
C reading 6 0
A stopped reading
A reading 6 0
F stopped reading
F reading 6 0
D stopped reading
D reading 6 0
E stopped reading
E reading 6 0
C stopped reading
C reading 6 0
A stopped reading
A reading 6 0
D stopped reading
D reading 6 0
B stopped reading
B reading 6 0
D stopped reading
D reading 6 0
F stopped reading
F reading 6 0
C stopped reading
C 

In [41]:
class Observer:
    def __init__(self,N):
        self.values = [0 for i in range(N)]
        self.mut = RLock()
        self.observers = {}
        
    def register(self, obj, vset):
        with self.mut:
            self.observers[obj] = (vset, Condition(self.mut))
    def unregister(self, obj):
        with self.mut:
            del self.observers[obj]
    
    def wait(self, obj):
        with self.mut:
            if obj in self.observers:
                self.observers[obj][1].wait()
            
    def __getitem__(self, idx):
        with self.mut:
            return self.values[idx]
    def __setitem__(self, idx, v):
        with self.mut:
            self.values[idx] = v
            for obs in self.observers.values():
                if idx in obs[0]:
                    obs[1].notify()
        
    

In [None]:
ob = Observer()

def watcher(obs,s):
    o   