In [12]:
from contextlib import contextmanager
from threading  import Lock, Thread
import random
from time import sleep
import operator as _

class SharedExclusive(object):

    def __init__(self):
        self.write_lock_obj = Lock()
        self.num_r = 0 # Number of read_locks

    def r_acquire(self):
        print("\nRead_lock Called:")
        if self.write_lock_obj.locked():
            print("Wait for write_lock")
            
            # waiting until write_lock released...
            while self.write_lock_obj.locked():
                print(".",end="")
            self.num_r += 1
            print(f"read item's qeue num:{self.num_r}")
        else:
            self.num_r += 1
            print(f"read item's qeue num:{self.num_r}")

    def r_release(self):
        if self.num_r > 0:
            self.num_r -= 1
            print(f"read item's qeue num after release:{self.num_r}")

    @contextmanager
    def r_locked(self):
        try:
            self.r_acquire()
            yield
        finally:
            self.r_release()

    def w_acquire(self):
        print("\nWrite_lock Called:")
        if self.num_r:
            print("Wait for read_lock")
            
            # waiting until read_lock released...
            while self.num_r:
                print("*",end="")
            print('\nWrite lock after read lock released:\n')
            self.write_lock_obj.acquire()
            
        elif self.write_lock_obj.locked():
            print("Wait for write_lock")
            self.write_lock_obj.acquire()
        else:
            self.write_lock_obj.acquire()

    def w_release(self):
        if self.write_lock_obj.locked():
            self.write_lock_obj.release()

    @contextmanager
    def w_locked(self):
        try:
            self.w_acquire()
            yield
        finally:
            self.w_release()
            
    def create_local_variable(self, block):
        variable = 0
        ldic = locals()
        try:
            exec(f"variable = self.{block['variable']}", ldic)
            # print(f"variable {block['variable']} already exists!")
        except AttributeError:
            exec(f"self.{block['variable']} = 0")
            exec(f"variable = self.{block['variable']}", ldic)
            # print(f"variable {block['variable']} created!")
        finally:
            variable = ldic['variable']
        return variable
    
            
class Concurrency:
    def __init__(self, locks):
        self.locks = locks
        
        self.PURPLE = "\033[95m"
        self.CYAN = "\033[96m"
        self.DARKCYAN = "\033[36m"
        self.BLUE = "\033[94m"
        self.GREEN = "\033[92m"
        self.YELLOW = "\033[93m"
        self.RED = "\033[91m"
        self.BOLD = "\033[1m"
        self.END = "\033[0m"
    
    def print(self, sig, statement, *colors):
        cc = ""
        cc = "".join([color for color in colors])
        print()
        print(
            "{mix}[{sig}]{end} {statement}".format(
                sig=sig, mix=cc, end=self.END, statement=statement
            )
        )
        
    def create_transaction(self, transaction, T_num):
        for block in transaction:
            print()
            self.print("<<<", f"T{T_num}, {block['variable']}", self.GREEN)
            operation_name = block['operations'][0]
            lock_class = self.locks.get(block['variable']) # Fetch variable object from sharedExclusive class.
            if operation_name == 'read_item':
                with lock_class.r_locked():
                    self.print("!!", f"Acquire read_lock for var:{block['variable']!r} of T:{T_num!r}", self.BLUE)
                    variable = lock_class.create_local_variable(block)
                    self.print("=>", f"{block['variable']}:{variable} of T:{T_num!r}", self.YELLOW)
                    
                    random_number = random.uniform(0.1,1.5)
                    self.print("~~", f"Sleep for {random_number:.2f} s", self.PURPLE)
                    sleep(random_number)
                self.print(">>>", f"Release read_lock for var:{block['variable']!r} of T:{T_num!r}", self.RED)
                    
            elif operation_name == 'write_item':
                operation_task = block['operations'][1]
                with lock_class.w_locked():
                    self.print("!!", f"Acquire write_lock for var:{block['variable']!r} of T:{T_num!r}", self.BLUE)
                    variable = lock_class.create_local_variable(block)
                    self.print("=>", f"{block['variable']}:{variable} of T:{T_num!r}", self.YELLOW)
                    operator, operand = operation_task.split()
                    mapper = {
                        '+' : _.add,
                        '-' : _.sub,
                        '*' : _.mul,
                        '/' : _.truediv,
                    }
                    func = mapper.get(operator, None)
                    if func:
                        variable = func(variable, float(operand))
                    random_number = random.uniform(0.1,1.5)
                    self.print("~~", f"Sleep for {random_number:.2f} s", self.PURPLE)
                    sleep(random_number)
                    exec(f"lock_class.{block['variable']} = variable")
                self.print(">>>", f"Release write_lock for var:{block['variable']!r} of T:{T_num!r}", self.RED)

In [13]:
def create_and_run_threads(schedule, concurrency):
    for index, transaction in enumerate(schedule):
        exec(f"t{index} = Thread(target=concurrency.create_transaction, args=({transaction},{index+1}))")
    index += 1
    for i in range(index):
        exec(f"t{i}.start()")
    for i in range(index):
        exec(f"t{i}.join()")

def get_variables(schedule):
    vars = []
    for transaction in schedule:
        for block in transaction:
            vars.append(block['variable'])
    # Remove duplicates
    return list(set(vars))

def create_lock_objects(vars, SharedExclusive):
    objects = {}
    for var in vars:
        ldic = locals()
        exec(f"objects[var] = SharedExclusive()", ldic)
    return ldic['objects']

def print_variables(locks):
    print('\nAll used variables and values:')
    for var, obj in locks.items():
        print(var, "=", getattr(obj, var))
        
def main():
    transaction = [
        {
            'variable': "y",
            'operations': [
                'write_item', 
                '- 10'
            ]
        },
        {
            'variable': "x",
            'operations': [
                'write_item', 
                '+ 20'
            ]
        },
    ]
    
    # for simplicity we assume all transactions are same.
    schedule = [transaction for _ in range(5)]
    vars = get_variables(schedule)
    locks = create_lock_objects(vars, SharedExclusive)
    concurrency = Concurrency(locks)
    create_and_run_threads(schedule, concurrency)
    print_variables(locks)

In [14]:
main()



[92m[<<<][0m T1, y

Write_lock Called:

[94m[!!][0m Acquire write_lock for var:'y' of T:1

[93m[=>][0m y:0 of T:1

[95m[~~][0m Sleep for 0.88 s


[92m[<<<][0m T2, y

Write_lock Called:
Wait for write_lock



[92m[<<<][0m T4, y

[92m[<<<][0m T3, y

Write_lock Called:
Wait for write_lock

Write_lock Called:
Wait for write_lock


[92m[<<<][0m T5, y

Write_lock Called:
Wait for write_lock


[94m[!!][0m Acquire write_lock for var:'y' of T:2

[93m[=>][0m y:-10.0 of T:2

[95m[~~][0m Sleep for 0.63 s
[91m[>>>][0m Release write_lock for var:'y' of T:1


[92m[<<<][0m T1, x

Write_lock Called:

[94m[!!][0m Acquire write_lock for var:'x' of T:1

[93m[=>][0m x:0 of T:1

[95m[~~][0m Sleep for 0.82 s


[91m[>>>][0m Release write_lock for var:'y' of T:2[94m[!!][0m Acquire write_lock for var:'y' of T:3

[93m[=>][0m y:-20.0 of T:3



[92m[<<<][0m T2, x

Write_lock Called:
Wait for write_lock

[95m[~~][0m Sleep for 0.38 s


[91m[>>>][0m Release write_lock for