# Setup: LED's, RGB LED, Buttons

In [2]:
import threading
import asyncio
import time

from pynq.overlays.base import BaseOverlay

In [None]:
base = BaseOverlay("base.bit")
btns = base.btns_gpio
#leds = base.leds
#rgbled = base.rgbleds[4]
leds = [base.leds[0],base.leds[1],base.leds[2],base.leds[3],base.rgbleds[4]]

In [None]:
#testing each LED & buttons
cond=True

async def flash_leds():
    global cond
    while cond:
        for led in leds:
            for i in range(5):
                try:
                    led.on(6) #6 is yellow
                except:
                    led.on()
                await asyncio.sleep(0.2)
                led.off()
                await asyncio.sleep(0.2)

async def get_btns(_loop):
    global cond
    while True:
        await asyncio.sleep(0.01)
        if btns.read() != 0: 
            cond = False
            _loop.stop()

loop = asyncio.new_event_loop()
loop.create_task(flash_leds())
loop.create_task(get_btns(loop))
loop.run_forever()
loop.close()
for led in leds:
    led.off()
print("Done.")

# Part 1: The 5 philosophers with sharing forks

In [2]:
base = BaseOverlay("base.bit")
btns = base.btns_gpio
leds = [base.leds[0],base.leds[1],base.leds[2],base.leds[3],base.rgbleds[4]]

In [None]:
#functions
def blink(t, d, n):
    '''
    Function to blink the LEDs
    Params:
      t: number of times to blink the LED
      d: duration (in seconds) for the LED to be on/off
      n: index of the LED (0 to 4)
    '''
    for i in range(t):
        try:
            leds[n].on(6) #6 is yellow
        except:
            leds[n].on()
        time.sleep(d)
        leds[n].off()
        time.sleep(d)
    leds[n].off()

def philosopher_t(_left, _right, num):
    '''
    _left: resource/fork on philospher's left
    _right: resource/fork on philospher's right
    num: index representing LED/philosopher/thread number
    '''
    #using_left = False
    #using_right = False
    eaten = False
    using_left = _left.acquire(False)
    using_right = _right.acquire(False)
    while True:
        if btns.read() != 0:
            break
        if using_left and using_right:
            if not eaten:
                print(f"Philosopher {num} is eating\n")
                eaten = True
                blink(10, 0.04, num) #blink faster - 0.4 sec
            else:
                print(f"Philosopher {num} is napping\n")
                _left.release()
                _right.release()
                using_left = False
                using_right = False
                eaten = False
                blink(5, 0.2, num) #blink slower - 1 sec
        elif not using_left and using_right:
            print(f"only right fork available. keep starving\n")
            leds[num].off()
            using_left = _left.acquire(False)
        elif using_left and not using_right:
            print(f"only left fork available. keep starving\n")
            leds[num].off()
            using_right = _right.acquire(False)
        elif not using_left and not using_right:
            print(f"Philosopher {num} is starving\n")
            using_left = _left.acquire(False)
            using_right = _right.acquire(False)
        else: pass
        
# Initialize and launch the threads
threads = []
forks = [threading.Lock(),threading.Lock(),threading.Lock(),threading.Lock(),threading.Lock()]

for i in range(5):
    if i == 0:
        t = threading.Thread(target=philosopher_t, args=(forks[4], forks[i], i))
    else:
        t = threading.Thread(target=philosopher_t, args=(forks[i-1], forks[i], i))
    threads.append(t)
    t.start()

for t in threads:
    name = t.name #getName() deprecated
    t.join()
    print('{} joined'.format(name))

# Part 2: Philosophers sharing forks with randint

In [3]:
from random import randint

base = BaseOverlay("base.bit")
btns = base.btns_gpio
leds = [base.leds[0],base.leds[1],base.leds[2],base.leds[3],base.rgbleds[4]]

In [None]:
#functions
def blink(t, d, n):
    '''
    Function to blink the LEDs
    Params:
      t: number of times to blink the LED
      d: duration (in seconds) for the LED to be on/off
      n: index of the LED (0 to 4)
    '''
    for i in range(t):
        try:
            leds[n].on(6) #6 is yellow
        except:
            leds[n].on()
        time.sleep(d)
        leds[n].off()
        time.sleep(d)
    leds[n].off()

def philosopher_t(_left, _right, num):
    '''
    _left: resource/fork on philospher's left
    _right: resource/fork on philospher's right
    num: index representing LED/philosopher/thread number
    '''
    #using_left = False
    #using_right = False
    eaten = False
    using_left = _left.acquire(False)
    using_right = _right.acquire(False)
    while True:
        times = randint(5,10)
        if btns.read() != 0:
            break
        if using_left and using_right:
            if not eaten:
                print(f"Philosopher {num} is eating\n")
                eaten = True
                blink(times, 0.04, num) #blink faster - 0.4 sec
            else:
                print(f"Philosopher {num} is napping\n")
                _left.release()
                _right.release()
                using_left = False
                using_right = False
                eaten = False
                blink(times, 0.2, num) #blink slower - 2 sec
        elif not using_left and using_right:
            print(f"only right fork available. keep starving\n")
            leds[num].off()
            using_left = _left.acquire(False)
        elif using_left and not using_right:
            print(f"only left fork available. keep starving\n")
            leds[num].off()
            using_right = _right.acquire(False)
        elif not using_left and not using_right:
            print(f"Philosopher {num} is starving\n")
            using_left = _left.acquire(False)
            using_right = _right.acquire(False)
        else: pass
        
# Initialize and launch the threads
threads = []
forks = [threading.Lock(),threading.Lock(),threading.Lock(),threading.Lock(),threading.Lock()]

for i in range(5):
    if i == 0:
        t = threading.Thread(target=philosopher_t, args=(forks[4], forks[i], i))
    else:
        t = threading.Thread(target=philosopher_t, args=(forks[i-1], forks[i], i))
    threads.append(t)
    t.start()

for t in threads:
    name = t.name #getName() deprecated
    t.join()
    print('{} joined'.format(name))