In [18]:
import threading
import time
import random
from pynq.lib import LED, RGBLED, Button
from pynq.overlays.base import BaseOverlay

from datetime import datetime
base = BaseOverlay("base.bit")
btns = base.btns_gpio

# global control to kill loops
keep_running = True

def philosopher_task(id, left_fork, right_fork, led):
    global keep_running
    
    #functions to handle the difference between LEDs and RGB
    def set_led(state):
        if id == 4: # RGB LED
            led.write(2 if state else 0)
        else: # Green LEDs
            led.on() if state else led.off()

    while keep_running:
        ### STATE: STARVING ###
        set_led(False)
        
        # DEADLOCK PREVENTION:
        # if everyone grabs their left fork at once, lock up.
        # philosopher 4 grabs Right then Left to break the cycle.
        first, second = (left_fork, right_fork) if id < 4 else (right_fork, left_fork)
        
        with first:
            with second:
                # ## STATE: EATING ###
                # higher rate blink (0.1s)
                eat_duration = random.randint(3, 6)
                start_time = time.time()
                while time.time() - start_time < eat_duration and keep_running:
                    set_led(True); time.sleep(0.1)
                    set_led(False); time.sleep(0.1)
        
        ### STATE: NAPPING ###
        # lower rate blink (0.5s)
        # RULE Nap < eat to avoid constant starvation
        nap_duration = random.randint(1, 2)
        start_time = time.time()
        while time.time() - start_time < nap_duration and keep_running:
            set_led(True); time.sleep(0.5)
            set_led(False); time.sleep(0.5)

    set_led(False) # ensure LED is off when thread closes   

In [20]:
from pynq import GPIO

# init 4 Green LEDs
philosopher_leds = [base.leds[i] for i in range(4)] + [base.rgbleds[4]]

# init buttons (button 0 will be the stop button)
btns = base.buttons

# init 5 forks (locks)
forks = [threading.Lock() for _ in range(5)]

print("Hardware and Locks init.")

Hardware and Locks initialized.


In [24]:
keep_running = True
threads = []

# launch the 5 threads
for i in range(5):
    # FORK LOGIC: Phil 0 uses Fork 0 and 1; Phil 4 uses Fork 4 and 0
    t = threading.Thread(target=philosopher_task, 
                         args=(i, forks[i], forks[(i+1)%5], philosopher_leds[i]))
    threads.append(t)
    t.start()

print("Running. Press Button 0 to STOP.\n")

try:
    # main loop monitors the button
    while keep_running:
        if btns[0].read() == 1:
            print("\nButton 0 pressed. Shutting down...")
            keep_running = False
        time.sleep(0.1)
except KeyboardInterrupt:
    keep_running = False

# wait for all threads to finish their current loop and exit
for t in threads:
    t.join()

print("Threads stopped. Resources Released.\n")

Running. Press Button 0 to STOP.


Button 0 pressed. Shutting down...
Threads stopped. Resources Released.

